"Artificial Intelligence without Big Data Analytics is lame, and Big Data Analytics without Artificial Intelligence is blind." Dr. O. Aly, Computer Science.
The purpose of this discussion is to
discuss and analyze the impact of XML on MapReduce. The discussion addresses
the various techniques and approaches proposed by various research studies for
processing large XML document using MapReduce.
The XML fragmentation process in the absence and presence of MapReduce
is also discussed to provide a better understanding of the complex process of XML large documents using a distributed scalable MapReduce environment.
XML Query Processing Using MapReduce
XML format has been used to store data
for multiple applications (Aravind & Agrawal, 2014). Data needs
to be ingested into Hadoop and get analyzed to obtain value from the XML data (Aravind & Agrawal, 2014). Hadoop
ecosystem needs to understand XML when it gets ingested into it and be able to interpret it (Aravind & Agrawal, 2014). MapReduce is
a building block of Hadoop ecosystem. In the age of Big Data, XML documents are
expected to be very large and to be
scalable and distributed. The process of
XML queries using MapReduce requires the decomposition of a big XML document and distribute portions to
different nodes. The relational approach
is not appropriate as it is expensive because transforming a big XML document into relational database
tables can be extremely time consuming and θ-joins
among relational table (Wu, 2014). Various research studies have proposed
various approaches to implement native XML query processing algorithms using
MapReduce.
(Dede, Fadika, Gupta, & Govindaraju, 2011) have discussed and analyzed the scalable and distributed processing of scientific XML data, and how the MapReduce model should be used in XML metadata indexing.
The study has presented performance results using two MapReduce
implementations of Apache Hadoop framework and proposed framework of LEMO-MR. The study has provided an indexing framework
that is capable of indexing and efficiently searching large-scale scientific XML datasets. The framework has been tailed for integration with any framework that uses the
MapReduce model to meet the scalability and variety requirements.
(Fegaras, Li, Gupta, & Philip, 2011) have also discussed and analyzed query optimization
in a MapReduce environment. The study has
presented a novel query language for large-scale analysis of XML data on a
MapReduce environment, called MRQL for MapReduce Query Language, that is designed to capture most common data analysis
tasks which can be optimized. XML data
fragmentation is also discussed in this
study. When using a parallel data computation, it expects the
input data to be fragmented into small
manageable pieces, that determine the granularity of the computation. In a MapReduce environment, each map worker is assigned a data split that consists of data
fragments. A map worker processes these
data one fragment at a time. The fragment is a relational tuple for
relational data that is structured, while for a text file, a fragment can be a single line in the file. However, for hierarchical data and nested
collections data such as XML data, the fragment
size and structure depend on the actual application that processes these
data. For instance, XML data may consist
of some XML documents, each one containing
a single XML element, whose size may exceed the memory capability of a map worker.
Thus, when processing XML data, it is recommended to allow custom fragmentation to meet a wide range of
applications requirements. (Fegaras et al., 2011) have argued that Hadoop provides a simple input
format for XML fragmentation based on a single tag name. XML document data can be split, which may
start and end at arbitrary points in the document, even in the middle of tag
names. (Fegaras et al., 2011; Sakr & Gaber, 2014) have indicated that this input format allows reading the document as a stream of string fragments, so that each string will contain a
single complete element that has the requested tag name. XML parser can then be used to parse these
strings and convert them to objects. The
fragmentation process is complex because the requested elements may cross data split boundaries and these data splits may
reside in different data nodes in the data file system (DFS). Hadoop DFS is the implicit solution for this problem allowing to scan beyond a data
split to the next, subject to some overhead for transferring data between
nodes. (Fegaras et al., 2011) have proposed XML fragmentation technique that was
built on top of the existing Hadoop XML input format, providing a higher level of abstraction and better
customization. It is a higher level of abstraction
because it constructs XML data in the MRQL
data model, ready to be processed by MRQL queries instead of deriving a string
for each XML element (Fegaras et al., 2011).
(Sakr & Gaber, 2014) have also discussed briefly another language that has been proposed to support distributed XML processing using the MapReduce framework, called ChuQL. It presents a MapReduce-based extension for the syntax, grammar, and semantics of XQuery, the standard W3C language for querying the XML documents. The implementation of ChuQL takes care of distributing the computation to multiple XQuery engines running in Hadoop nodes, as described by one or more ChuQL MapReduce expressions. The representation of the “word count” example program in the ChuQL language using its extended expressions where the MapReduce expression is used to describe a MapReduce job. The clauses of input and output are used to read and write onto HDFS respectively. The clauses of rr and rw are used for describing the record reader and writer respectively. The clauses of the map and reduce represent the standard map and reduce phases of the framework where they process XML values or key/value pairs of XML values to match the MapReduce model which are specified using XQuery expressions. Figure 1 show the word count example in ChQL using XML in distributed environment.
Figure 1. The Word Count Example Program in ChQL Using XML in Distributed Env. (Sakr & Gaber, 2014).
(Vasilenko & Kurapati, 2014) have discussed and analyzed the efficient processing
of XML documents in Hadoop MapReduce environment. They argued that the most common approach to
process XML data is to introduce a custom solution based on the user-defined functions or scripts. The common
choices vary from introducing an ETL process for extracting the data of
interest to the transformation of XML
into other formats that are natively supported by Hive. They have addressed a generic approach to
handling XML based on Apache Hive architecture.
The researchers have described an approach that complements the existing
family of Hive serializers and de-serializers for other popular data formats,
such as JSON, and makes it much easier
for users to deal with the large XML dataset format. The implementation included logical splits
for the input files each of which is assigned
to an individual Mapper. The mapper
relies on the implemented Apache Hive XML
SerDe to break the split into XML fragments using a specified start/end byte sequences. Each fragment corresponds to a
single Hive record. The fragments are
handled by the XML processor to extract value for the record column utilizing specified XPath queries. The reduce phase was not required in this implementation (Vasilenko & Kurapati, 2014).
(Wu, 2014) have discussed and analyzed the partitioning
XML documents and distributing XML fragments into different compute nodes,
which can introduce high overhead in XML fragment transferring from one node to
another during the MapReduce process execution.
The researchers have proposed a technique to use MapReduce to distribute
labels in inverted lists in a computing cluster
so that structural joins can be parallelly performed to process queries. They have also proposed an optimization
technique to reduce the computing space in the proposed framework to improve
the performance of query processing. They
have argued that their approaches are different from the current shred and
distributed XML document into different nodes in a cluster approach. The process includes
reading and distributing the inverted lists that are required for input queries during the query processing, and
their size is much smaller than the size of the whole document. The process also includes the partition of
the total computing space for structural joins so that each sub-space can be
handled by one reducer to perform structural joins. The researchers have also proposed a pruning-based
optimization algorithm to improve the performance of their approach.
Conclusion
This discussion has addressed the XML query processing using MapReduce
environment. The discussion has addressed the various techniques and
approaches proposed by various research studies for processing large XML
document using MapReduce. The XML
fragmentation process in the absence and presence of MapReduce has also been
discussed to provide a better understanding of
the complex process of XML large documents using a distributed scalable MapReduce environment.
Dede, E., Fadika,
Z., Gupta, C., & Govindaraju, M. (2011). Scalable and distributed processing of scientific XML data. Paper
presented at the Grid Computing (GRID), 2011 12th IEEE/ACM International
Conference on.
Fegaras, L., Li,
C., Gupta, U., & Philip, J. (2011). XML
Query Optimization in Map-Reduce.
Sakr, S., &
Gaber, M. (2014). Large Scale and big data:
Processing and Management: CRC Press.
Vasilenko, D.,
& Kurapati, M. (2014). Efficient processing of xml documents in hadoop map
reduce.
Wu, H.
(2014). Parallelizing structural joins to
process queries over big XML data using MapReduce. Paper presented at the
International Conference on Database and Expert Systems Applications.
The purpose of this project is to discuss
and analyze advanced processing techniques for Big Data. There are various processing systems such as
Iterative Processing, Graph Processing, Stream Processing also known as Event
Processing or Real-Time Processing, and Batch Processing. A MapReduce-based
framework such as Hadoop supports the Batch-Oriented Processing. MapReduce lacks the built-in support for the
Iterative Processing which requires parsing
datasets iteratively, large Graph Processing, and Stream Processing. Thus, various models such as Twister, and iMapReduce are introduced to improve the
Iterative Processing of the MapReduce, and Surfer, Apache Hama, Pregel,
GraphLab for large Graph Processing. Other
models are also introduced to support the Stream Processing such as Aurora,
Borealis, and IBM InfoSphere Streams.
This project focuses the discussion and the analysis of the Stream Processing models of Aurora and
Borealis. The discussion and the
analysis of Aurora model includes an overview of the Aurora model as Streaming
Processing Engine (SPE), followed by the Aurora Framework and the fundamental
components of the Aurora topology. The
Query Model of Aurora, which is known as Stream Query Algebra “SQuAI,” supports seven operators constructing
the Aurora network and queries for expressing its stream processing requirements. The discussion and analysis also include the
SQuAl and the Query Model, the Run-Time Framework and the Optimization systems
to overcome bottlenecks at the network.
The Aurora* and Medusa as Distributed Stream Processing are also discussed and analyzed. The second SPE is Borealis which is a
Distributed SPE. The discussion and the
analysis of the Borealis involved the framework, the query model, and the optimization techniques to overcome
bottlenecks at the network. A comparison
between Aurora and Borealis is also discussed
and analyzed.
When dealing with Big Data, its different characteristics
and attributes such as volume, velocity, variety, veracity, and value must be taken into consideration (Chandarana & Vijayalakshmi, 2014). Thus, different types of the framework are required to run different types
of analytics (Chandarana & Vijayalakshmi, 2014). The workload of the large-scale data
processing has different types of workloads (Chandarana & Vijayalakshmi, 2014). Organizations deploy a combination of
different types of workloads to achieve the business goal (Chandarana & Vijayalakshmi, 2014). These various types of workloads involve
Batch-Oriented Processing, Online-Transaction Processing, Stream Processing,
Interactive ad-hoc Query and Analysis (Chandarana & Vijayalakshmi, 2014), and Online
Analytical Processing (Erl, Khattak, & Buhler, 2016).
For
the Batch-Oriented Processing, a MapReduce-based
framework such as Hadoop can be deployed for recurring tasks such as
large-scale Data Mining or Aggregation (Chandarana & Vijayalakshmi, 2014; Erl et al.,
2016; Sakr & Gaber, 2014). For the OLTP such as user-facing e-commerce
transactions, the Apache HBase can be deployed (Chandarana & Vijayalakshmi, 2014). The OLTP system processes
transaction-oriented data (Erl et al., 2016). For the Stream Processing, Storm framework
can be deployed to handle stream sources such as social media feeds or sensor
data (Chandarana & Vijayalakshmi, 2014). For the Interactive ad-hoc Query and
Analysis, the Apache Drill framework can be deployed (Chandarana & Vijayalakshmi, 2014). For the OLAP, which form an integral part of
Business Intelligence, Data Mining, and Machine Learning, the systems are used
for processing data analysis queries (Erl et al., 2016).
Apache
Hadoop framework allows distributed processing for large data sets across
clusters of computers using simple programming models (Chandarana & Vijayalakshmi, 2014). The Apache Hadoop framework involves four
major modules; Hadoop Core, Hadoop Distributed Files System (HDFS), Hadoop
YARN, and Hadoop Map Reduce. The Hadoop
Core is used as the common utilities
which support other modules. The HDFS
module provides high throughput access to application data. The Hadoop YARN module is for job scheduling
and resource management. The Hadoop MapReduce is for parallel processing of large-scale
dataset (Chandarana & Vijayalakshmi, 2014).
Moreover,
there are various processing systems such as Iterative Processing (Schwarzkopf, Murray, & Hand, 2012), Graph Processing,
and Stream Processing (Sakr & Gaber, 2014; Schwarzkopf et al., 2012). The Iterative Processing systems utilize the in-memory caching (Schwarzkopf et al., 2012). Many data analysis application requires the
iterative processing of the data which includes algorithms for text-based
search and machine learning. However, because MapReduce lacks the built-in support
for iterative processing which requires parsing datasets iteratively (Zhang, Chen, Wang, & Yu, 2015; Zhang, Gao, Gao, &
Wang, 2012),
various models such as Twister, HaLoop,
and iMapReduce are introduced to improve the iterative processing of the
MapReduce (Zhang et al., 2015). With regard to
the Graph Processing, MapReduce is suitable for processing flat data
structures, such as vertex-oriented tasks and propagation is optimized for
edge-oriented tasks on partitioned graphs. However,
to improve the programming models for large graph processing, various models
such as Surfer (Chen, Weng, He, & Yang, 2010; Chen et al., 2012), GraphX (Gonzalez et al., 2014), Apache Hama, GoldenOrb, Giraph, Phoebus, GPS (Cui, Mei, & Ooi, 2014), Pregel (Cui et al., 2014; Hu, Wen, Chua, & Li, 2014; Sakr &
Gaber, 2014),
and GraphLab (Cui et al., 2014; Hu et al., 2014; Sakr & Gaber, 2014). With regard to
the Steam Processing, because MapReduce is design for Batch-Oriented
Computation such as log analysis and text processing (Chandarana & Vijayalakshmi, 2014; Cui et al., 2014; Erl
et al., 2016; Sakr & Gaber, 2014; Zhang et al., 2015; Zhang et al., 2012),
and is not adequate for supporting real-time stream processing tasks (Sakr & Gaber, 2014) various Steam Processing models are introduced such as DEDUCE, Aurora, Borealis, IBM Spade,
StreamCloud, Stormy (Sakr & Gaber, 2014), Twitter Storm (Grolinger et al., 2014; Sakr & Gaber, 2014), Spark Streaming,
Apache Storm (Fernández et al., 2014; Gupta, Gupta, & Mohania, 2012;
Hu et al., 2014; Scott, 2015),
StreamMapReduce (Grolinger et al., 2014), Simple Scalable
Streaming System (S4) (Fernández et al., 2014; Grolinger et al., 2014; Gupta et
al., 2012; Hu et al., 2014; Neumeyer, Robbins, Nair, & Kesari, 2010-639),
and IBM InfoSphere Streams (Gupta et al., 2012).
The project focuses on two models of the Stream
Processing. The discussion and the
analysis will be on Aurora stream
processing systems and Borealis stream processing systems. The discussion and the analysis will also
address their characteristics, architectures, performance optimization
capability, and scalability. The project
will also discuss and analyze the performance
bottlenecks, the cause of such bottlenecks and the strategies to
remove these bottlenecks. The project
begins with a general discussion on the Stream Processing.
Stream Processing Engines
Stream Processing is defined by (Manyika et al., 2011) as technologies
designed to process large real-time streams of event data. The Stream Processing allows applications
such as algorithms trading in financial services, RFID even processing
applications, fraud detection (Manyika et al., 2011; Scott, 2015), process
monitoring, and location-based services in telecommunications (Manyika et al., 2011). Stream Processing reflects the Real-Time
Streaming, and also known as “Event Stream Processing” (Manyika et al., 2011). The
“Event Stream Processing” is also known as “Streaming Analytics” which is used
to process customer-centric data “on the fly” without the need for long-term
storage (Spiess, T’Joens, Dragnea, Spencer, & Philippart,
2014).
In the Real-Time mode, the data is processed in-memory
because it is captured before it gets persisted to the disk (Erl et al., 2016). The response time ranges from a sub-second to under a minute (Erl et al., 2016). The Real-Time mode reflects the velocity
feature and characteristics of Big Data
datasets (Erl et al., 2016). When Big Data is processed using the
Real-Time or Even Stream Processing, the data arrives
continuously in a stream, or at an interval
in events (Erl et al., 2016). The individual data for streaming is small.
However, the continuous nature leads to such streamed data result in very large
datasets (Erl et al., 2016; Gradvohl, Senger, Arantes, &
Sens, 2014).
Real-Time mode also involves “Interactive Mode” (Erl et al., 2016). The “Interactive
Mode” refers to the Query Processing in the Real-Time (Erl et al., 2016).
The systems of the Event Stream Processing (ESP) are designed
to provide high-performance analysis of
streams with low latency (Gradvohl et al., 2014). The first Event Stream Processing (ESP)
systems, which were developed in the early 2000s, include Aurora, Borealis, STREAM,
TelegraphCQ, NiagaraCQ, and Cougar (Gradvohl et al., 2014). During that time, the systems were
centralized systems namely running on a single server aiming to overcome the
issues of stream processing by the traditional database (Gradvohl et al., 2014). Tremendous efforts have been exerted to
enhance and improve the data stream processing from centralized stream
processing systems to stream processing engines with the ability to distribute
queries among a cluster of nodes (Sakr & Gaber, 2014). This next discussion will focus on two of the
scalable processing of streaming data; Aurora and Borealis.
Aurora Streaming Processing Engine
Aurora
was introduced through a project effort from Brandeis University, Brown
University, and MIT (Abadi et al., 2003; Sakr & Gaber, 2014). The prototype of Aurora implementation was
introduced in 2003 (Abadi et al., 2003; Sakr & Gaber, 2014). The GUI interface of Aurora is based on Java
allowing construction and execution of Aurora networks, which supports the
construction of arbitrary Aurora networks and query (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora system is described as a processing
model to manage data streams for monitoring applications, which are
distinguished substantially from the traditional business data processing (Abadi et al., 2003; Sakr & Gaber, 2014). The main aim of the monitoring applications
is to monitor continuous streams of data (Abadi et al., 2003; Sakr & Gaber, 2014). As an example of these Monitoring, Applications is the military
applications which monitor readings from sensors worn by soldiers such as blood
pressure, heart rate, position, and so forth.
Another example of these Monitoring Applications includes the financial
analysis applications which monitor the stock data streams reported from
various stock exchanges (Abadi et al., 2003; Sakr & Gaber, 2014). The Tracking Applications which monitor the
location of large numbers of the object
are other types of Monitoring Applications (Abadi et al., 2003; Sakr & Gaber, 2014).
Due
to the nature of the Monitoring Applications, they can benefit from the
Database Management System (DBMS) because of the high volume of monitored data
and the requirement of the query for these applications (Abadi et al., 2003; Sakr & Gaber, 2014). However, the existing DBMS systems are unable
to fully support such applications because DBMS systems target Business
Applications and not Monitoring Applications (Abadi et al., 2003; Sakr & Gaber, 2014). DBMS gets its data from humans issuing
transactions, while the Monitoring Applications get their data from external
sources such as sources (Abadi et al., 2003; Sakr & Gaber, 2014). The role of DBMS when supporting the
Monitoring Applications is to detect and alert humans of any abnormal
activities (Abadi et al., 2003; Sakr & Gaber, 2014). This model is
described as DBMS-Active, Human-Passive (DAHP) Model (Abadi et al., 2003; Sakr & Gaber, 2014). This model is different from the traditional
DBMS model which is described as
Human-Active, DBMS-Passive (HADP) Model, where humans initiate queries and transactions on the DBMS passive repository (Abadi et al., 2003; Sakr & Gaber, 2014).
Besides, the Monitoring Applications require not only the latest value of the object but also the historical values (Abadi et al., 2003; Sakr & Gaber, 2014). The Monitoring Applications are trigger-oriented applications to send the alert message when abnormal activities are detected (Abadi et al., 2003; Sakr & Gaber, 2014). Besides, the Monitoring Applications requires approximate answers due to the nature of the data stream processing where data can get lost or omit for processing reasons. The last characteristic of the Monitoring Applications involves the Real-Time requirement and the Quality-of-Service (QoS). Table 1 summarizes these five major characteristics of the Monitoring Applications, for which Aurora systems are designed to manage data streams.
Table 1: Monitoring Applications Characteristics.
1.1 Aurora Framework
The traditional DBM could not be used to implement these Monitoring Applications with these challenging characteristics (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014). The prevalent requirements of these Monitoring Applications are the data and information streams, triggers, imprecise data, and real-time (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014). Thus, Aurora systems are designed to support these Monitoring Applications with these challenging characteristics and requirements (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014). The underlying concept of the Aurora System Model is to process the incoming data streams as an application administrator and use boxes and arrows paradigm as a data-flow system, where the tuples flow through a loop-free, directed graph of processing operations (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014). The output streams are presented to applications which get programmed to deal with the asynchronous tuples in an output stream (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora System Model also maintains historical storage to support ad-hoc queries (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora systems handle data from a variety of sources such as computer programs which generate values at regular or irregular intervals or hardware sensors (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014). Figure 1 illustrates the Aurora System Model reflecting the input data stream, the operator boxes, the continuous and ad-hoc queries, and the output to applications.
Figure 1: Overview of Aurora System Model and Architecture. Adapted from (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).
1.2 Aurora Query Model: SQuAl Using Seven Primitive Operations
The Aurora Stream
Query Algebra (SQuAl) supports seven operators which are used to
construct Aurora networks and queries for expressing its stream processing
requirements (Abadi et al., 2003; Sakr & Gaber, 2014). Many of these operations have analogs in the
relational query operation. For
instance, the “filter” operator in Aurora Query Algebra, which applies any
number of predicates to each incoming tuple, routing the tuples based on the satisfied
predicates, is like the relational operator “select” (Abadi et al., 2003; Sakr & Gaber, 2014). The
“aggregate” operators in Aurora Query Algebra computes stream aggregation to
address the fundamental push-based nature of data streams, applying a function
such as a moving average across a window of values in a stream (Abadi et al., 2003; Sakr & Gaber, 2014). The windowed operations are required when the data is stale or time
imprecise (Abadi et al., 2003; Sakr & Gaber, 2014). The application administrator in the Aurora
System Model can connect the output of one box to the input of several others
which implements the “implicit split” operations rather than the “explicit
split” of the relational operations (Abadi et al., 2003; Sakr & Gaber, 2014). Besides, the Aurora System Model contains an
“explicit union” operation where two streams can be put together (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora System Model also represents a
collection of streams with a common
schema, called “Arcs” (Abadi et al., 2003; Sakr & Gaber, 2014). The Arc does not have any specific number of
streams which makes it easier to have streams come and goes without any
modifications to the Aurora network (Abadi et al., 2003; Sakr & Gaber, 2014).
In Aurora Query Model, the stream is an append-only
sequence of tuples with uniform schema, where each tuple in a stream has a
timestamp for QoS calculations (Abadi et al., 2003; Sakr & Gaber, 2014). When using Aurora Query Model, there is no
arrival order assumed which help in gaining latitude for producing outputs out
of order for serving high-priority tuples first (Abadi et al., 2003; Sakr & Gaber, 2014). Moreover, this no arrival order assumption
also helps in redefining the windows for attributes, and in merging multiple
streams (Abadi et al., 2003; Sakr & Gaber, 2014). Some operators are described as “order-agnostic” such as such as Filter, Map, and
Union. Some
other operators are described as “order-sensitive” such as BSort, Aggregate,
Join, and Resample where they can only be guaranteed to execute with finite
buffer space and in a finite time if they can assume some ordering over their
input streams (Abadi et al., 2003; Sakr & Gaber, 2014).
Thus, the order-sensitive operators require order specification
arguments which indicate the arrival order of the expected tuple (Abadi et al., 2003; Sakr & Gaber, 2014).
The Aurora Query Model supports three main operations modes: (1) the continuous queries of the real-time processing, (2) the views, and (2) the ad-hoc queries (Abadi et al., 2003; Sakr & Gaber, 2014). These three operations modes utilize the same conceptual building blocks technique processing flows based on QoS specifications (Abadi et al., 2003; Sakr & Gaber, 2014). In Aurora Query Model, each output is associated with two-dimensional QoS graphs which specify the utility of the output with regard to several performance-related and quality-related attributes (Abadi et al., 2003; Sakr & Gaber, 2014). The stream-oriented operators which constitute the Aurora network and queries are designed to operate in a data flow mode where data elements are processed as they appear on the input (Abadi et al., 2003; Sakr & Gaber, 2014).
1.3 Aurora Run-Time Framework and Optimization
The
main purpose of the Aurora run-time operations is to process data flows through
a potentially large workflow diagram (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora Run-Time Architecture involves five
main techniques: (1) the QoS data structure, (2) the Aurora Storage Management
(ASM), (3) the Run-Time Scheduling (RTS), (4) the Introspection, and (5) the
Load Shedding.
The
QoS is a multi-dimensional function which involves response times, tuple drops,
and values produced (Abadi et al., 2003; Sakr & Gaber, 2014). The ASM is designed to store all tuples
required by the Aurora network. The ASM
requires two main operations; one to manage storage for the tuples being passed through an Aurora network, and the
second operations must maintain extra tuple storage which may be required at the connection point. Thus, the
ASM involves two main management operations: (1) the Queue Management, and (2)
the Connection Point Management (Abadi et al., 2003; Sakr & Gaber, 2014).
The RTS in Aurora is challenging because of the need to simultaneously address several issues such as large system scale, real-time performance requirements, and dependencies between box executions (Abadi et al., 2003; Sakr & Gaber, 2014). Besides, the processing of tuple in Aurora spans many scheduling and execution steps, where the input tuple goes through many boxes before potentially contributing to an output stream, which may require secondary storage (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora systems reduce the overall processing costs by using two main non-linearities when processing tuples: “Interbox Non-Linearity,” and the “Intrabox Non-Linearity” techniques. The Aurora systems take advantages of the Non-Linearity technique in both the Interbox and the Intrabox tuple processing through the “Train Scheduling” (Abadi et al., 2003; Sakr & Gaber, 2014). The “Train Scheduling” is a set of scheduling heuristics which attempt (1) to have boxes queue as many tuples as possible without processing, thus generating long tuple trains, (2) to process complete trains at once, thus using the “Intrabox Non-Linearity” technique, and (3) to pass them to subsequent boxes without having to go to disk, thus employing the “Interbox Non-linearity” technique (Abadi et al., 2003; Sakr & Gaber, 2014). The primary goal of the “Train Scheduling” is to minimize the number of I/O operations performed per tuple. The secondary goal of the “Train Scheduling is to minimize the number of box calls made per tuple. With regard to the Introspection technique, Aurora systems employ static and dynamic or run-time introspection techniques to predict and detect overload situation (Abadi et al., 2003; Sakr & Gaber, 2014). The purpose of the static introspection technique is to determine if the hardware running the Aurora network is sized correctly. The dynamic analysis which is based on the run-time introspection technique uses timestamps for all tuples (Abadi et al., 2003; Sakr & Gaber, 2014). With regard to the “Load Shedding”, Aurora systems reduces the volume of the tuple processing via the load shedding if an overload is detected as a result of the static or dynamic analysis, by either dropping the tuples or filtering the tuples (Abadi et al., 2003; Sakr & Gaber, 2014). Figure 2 illustrates the Aurora Run-Time Architecture, adapted from (Abadi et al., 2003; Sakr & Gaber, 2014).
Figure 2:
Aurora Run-Time Framework. Adapted from (Abadi et al., 2003).
The Aurora optimization techniques involve two main optimization systems: (1) the dynamic continuous query optimization, and (2) the ad-hoc query optimization. The dynamic continuous query optimization involves the inserting projections, the combining boxes, and the reordering boxes optimization techniques (Abadi et al., 2003). The ad-hoc query optimization involves the historical information because Aurora semantics require the historical sub-network to be run first. This historical information is organized in a B-tree data model (Abadi et al., 2003). The initial boxes in an ad-hoc query can pull information from the B-tree associated with the corresponding connection point (Abadi et al., 2003). When the historical operation is finished, the Aurora optimization technique switches the implementation to the standard push-based data structures and continues processing in the conventional mode (Abadi et al., 2003).
1.4 Aurora* and Medusa for Distributed Stream Processing
The Aurora System is a centralized stream processor. However, in (Cherniack et al., 2003). Aurora* and Medusa are proposed for distributed processing. Several architectural issues must be addressed for building a large-scale distributed version of a stream processing system such as Aurora. In (Cherniack et al., 2003), the problem is divided into two categories: intra-participant distribution, and inter-participant distribution. The intra-participant distribution involves small-scale distribution within one administrative domain which can be handled by the proposed model of Aurora* (Cherniack et al., 2003). The inter-participant distribution involves large-scale distribution across administrative boundaries, which is handled by the proposed model of Medusa (Cherniack et al., 2003).
2. Borealis Streaming Processing Engine
Borealis
is described as the second generation of the Distributed SPE which also got
developed at Brandeis University, Brown University and MIT (Abadi et al., 2005; Sakr & Gaber, 2014). The Borealis streaming model inherits the
core functionality of the stream processing from Aurora model, and the core
functionality of the distribution from Medusa model (Abadi et al., 2005; Sakr & Gaber, 2014). The Borealis model is an expansion and
extension of both models to provide more advanced capabilities and
functionalities which are commonly required by newly-emerging stream processing
applications (Abadi et al., 2005; Sakr & Gaber, 2014). Borealis is regarded to be the successor to
Aurora (Abadi et al., 2005; Sakr & Gaber, 2014).
The second generation of the SPE has three main requirements which are critical and at the same time challenging. The first requirement involves the “Dynamic Revision of Query Results” (Abadi et al., 2005; Sakr & Gaber, 2014). Applications are forced to live with imperfect results because corrects or updates to previously processed data are only available after the fact unless the system has techniques to revise its processing and results to take into account newly available data or updates (Abadi et al., 2005; Sakr & Gaber, 2014). The second requirement for the second generation of the SPE involves “Dynamic Query Modification,” which allows runtime with low overhead, fast and automatic modification (Abadi et al., 2005; Sakr & Gaber, 2014). The third requirement for the second generation of SPE involves “Flexible and Highly-Scalable Optimization,” where the optimization problem will be more balanced between the sensor-heavy and server-heavy optimization. The more flexible optimization structure is needed to deal with a large number of devices and perform cross-network sensor-heavy server-heavy resource management and optimization (Abadi et al., 2005; Sakr & Gaber, 2014). However, this requirement for such optimization framework has two additional challenges. The first challenge is the ability to simultaneously optimize different QoS metrics such as processing latency, throughput, or sensor lifetime (Abadi et al., 2005; Sakr & Gaber, 2014). The second challenge of such flexible optimization structure and framework is the ability to perform optimizations at different levels of granularity at the node level, sensor network level, a cluster of sensors and server level and so forth (Abadi et al., 2005; Sakr & Gaber, 2014). These advanced challenges, capabilities, and requirements for the second-generation of SPE are added to the classical architecture of the SPE to form and introduce Borealis framework (Abadi et al., 2005; Sakr & Gaber, 2014).
2.1 The Borealis Framework
The Borealis
framework is a distributed stream processing engine where the collection of
continuous queries submitted to Borealis can be seen as a giant network of
operators whose processing is distributed to multiple sites (Abadi et al., 2005; Sakr & Gaber, 2014). There is a
sensor proxy interface which acts as another Borealis site (Abadi et al., 2005; Sakr & Gaber, 2014). The sensor networks can participate in query
processing behind that sensor proxy interface (Abadi et al., 2005; Sakr & Gaber, 2014).
Borealis server runs on each node with Global Catalog (GC), High Availability (HA) module, Neighborhood Optimizer (NHO), Local Monitor (LM), Admin, Query Processor (QP)at the top and meta-data, control and data at the bottom of the framework. The GC can be centralized or distributed across a subset of processing nodes, holding information about the complete query network and the location of all query fragments (Abadi et al., 2005; Sakr & Gaber, 2014). The HA modules monitor each node to handle any failure (Abadi et al., 2005; Sakr & Gaber, 2014). The NHO utilizes the local information and other information from other NHOs to improve the load balance between the nodes (Abadi et al., 2005; Sakr & Gaber, 2014). The LM collects performance-related statistics, while the local system reports to the local optimizer as well as the NHOs (Abadi et al., 2005; Sakr & Gaber, 2014). The QP is the core component of the Borealis’ framework. The actual execution of the query is implemented in the QP (Abadi et al., 2005; Sakr & Gaber, 2014). The QP, which is a single site processor, receives the input data streams, and the result is pulled through the I/O Queue, routing the tuples to and from remote Borealis node and clients (Abadi et al., 2005; Sakr & Gaber, 2014). The Admin module controls the QP, and issues system control messages (Abadi et al., 2005; Sakr & Gaber, 2014). These messages are pushed to the Local Optimizer (LO), which communicates with Run-Time major components of the QP to enhance the performance. These Run-Time major components of the Borealis include (1) the Priority Scheduler, (2) Box Processors, and (3) Load Shedder. The Priority Scheduler determines the order of box execution based on the priority of the tuples. The Box Processors can change the behavior during the run-time based on the messages received from the LO. The Load Shedder discards the low-priority tuples when the node is overloaded (Abadi et al., 2005; Sakr & Gaber, 2014). The Storage Manager is part of the QP and responsible for storing and retrieving data which flows through the arcs of the local query diagram. The Local Catalog is another component of the QP to store the query diagram description and metadata and is accessible by all components. Figure 3 illustrates Borealis’ framework, adapted from (Abadi et al., 2005; Sakr & Gaber, 2014).
Figure 3. Borealis’ Framework, adapted from (Abadi et al., 2005; Sakr & Gaber, 2014).
2.2 Borealis’ Query Model and Comparison with Aurora’s Query Model
Borealis inherits the Aurora Model of boxes-and-arrows to
specify the continuous queries, where the boxes reflect
the query operators and the arrows reflect
the data flow between the boxes (Abadi et al., 2005; Sakr & Gaber, 2014). Borealis extends the data model of Aurora by
supporting three types of messages of the insertion, the deletion, and the
replacement (Abadi et al., 2005; Sakr & Gaber, 2014). The Borealis’ queries are an extended version of Aurora’s operators to support revision messages (Abadi et al., 2005; Sakr & Gaber, 2014). The query model of Borealis supports the
modification of the box semantic during the runtime (Abadi et al., 2005; Sakr & Gaber, 2014). The QoS in Borealis is like in Aurora forms
the basis of resource management decision.
However, while each query output is
provided with QoS function in Aurora’ model, Borealis allows QoS to be
predicted at any point in the data flow (Abadi et al., 2005; Sakr & Gaber, 2014). Thus, Borealis supports a Vector of Metrics
for supplied messages to allow such prediction of QoS.
In
the context of the query result revision, Borealis supports “replayable” query diagram and the processing scheme
revision. While Aurora has an append-only model where a message cannot be modified
once it is placed on a stream providing an approximate or imperfect result, the
Borealis’ model supports the modification of messages to processes the query
intelligently and provide correct query results (Abadi et al., 2005; Sakr & Gaber, 2014).
The query diagram must be replayable
when messages are revised and modified
because the processing of the modified message must replay a portion of the
past with the modified value (Abadi et al., 2005; Sakr & Gaber, 2014). This replaying process is also useful for
recovery and high availability (Abadi et al., 2005; Sakr & Gaber, 2014). This dynamic
revision with the replaying process can add more overhead. Thus, the
“closed” model is used to generates deltas to show the effects of the revisions
instead of the entire result.
In
the context of the queries modification, Borealis provides online modification
of continuous queries by supporting the control lines, and the time travel
features. The control lines extend the
basic query model of Aurora to change operator parameters and operators
themselves during the run-time (Abadi et al., 2005; Sakr & Gaber, 2014). The Borealis’ boxes contain the standard data
input lines and special control lines which carry messages with revised box
parameters and new box function (Abadi et al., 2005; Sakr & Gaber, 2014). Borealis provides a new function called
“Bind” to bind the new parameters to free variables within a function
definition, which will lead to a new function to be created (Abadi et al., 2005; Sakr & Gaber, 2014). The Aurora’s connections points are leveraged
to enable the time travel in Borealis.
The original purpose of the connection points was to support ad-hoc
queries, which can query historical and run-time data. This concept is extended in Borealis model to
include connection point views to enable time travel applications, ad-hoc
queries and the query diagram to access the connection points independently and
in parallel (Abadi et al., 2005; Sakr & Gaber, 2014). The connection point views include two
operations to enable the time travel:
the replay operation, and the undo operation.
2.3 The Optimization Model of Borealis
Borealis has an optimizer framework to optimize processing across a combined sensor and server network, to deal with the QoS in stream-based applications, and to support scalability, size-wise and geographical stream-based applications. The optimization model contains multiple collaborating monitoring and optimization components. The monitoring components include the local monitor at every site and end-point monitor at output sites. The optimization components include the global optimizer, neighborhood optimizer, and local optimizer (Abadi et al., 2005). While Aurora evaluated the QoS only at outputs and had a difficult job inferring QoS at upstream nodes, Borealis can evaluate the predicted-QoS score function on each message by utilizing the values of the Metrics Vector (Abadi et al., 2005). Borealis utilizes Aurora’ concept of train scheduling of boxes and tuples to reduce the scheduling overhead. While Aurora processes the message in order of arrival, Borealis contains box scheduling flexibility which allows processing message out of order because the revision technique can be used to process them later as insertions (Abadi et al., 2005). Borealis offers a superior load shedding technique than Aurora’s technique (Abadi et al., 2005). The Load Shedder in Borealis detects and handle overload situations by adding the “drop” operators to the processing network (Ahmad et al., 2005). The “drop” operator aims to filter out messages, either based on the value of the tuple or in a randomized fashion, meaning out of order to overcome the overload (Ahmad et al., 2005). The higher quality outputs can be achieved by allowing nodes in a chain to coordinate in choosing where and how much load to shed. Distributed load shedding algorithms are used to collect local statistics from nodes and pre-computes potential drop plans at the compilation time (Ahmad et al., 2005). Figure 4 illustrates the Optimization Components of Borealis, adapted from (Abadi et al., 2005).
Figure 4. The
Optimization Components of Borealis, adapted from (Abadi et al., 2005).
Borealis provides a fault-tolerance
technique in a distributed SPE such as replication, running multiple copies of
the same query network on distinct processing nodes (Abadi et al., 2005; Balazinska, Balakrishnan, Madden,
& Stonebraker, 2005). When a node experiences a failure on one of
its input streams, the node tries to find an alternate upstream replica. All replicas must be consistent. To ensure such consistency, data-serializing
operator “SUnion” is used to take multiple streams as input and produces one
output stream with deterministically ordered tuples, to ensure all operators of
the replica processing the same input in the same order (Abadi et al., 2005; Balazinska et al., 2005). To provide high availability, each SPE
ensures that input data is processed and
results forwarded within a user-specified time threshold of its arrival (Abadi et al., 2005; Balazinska et al., 2005). When the
failure is corrected, each SPE which experienced tentative data reconciles its
state and stabilizes its output by replacing the previously tentative output
with stable data tuples forwarded to downstream clients, reconciling the state
of SPE based on checkpoint/redo, undo/redo and the revision tuples new concept (Abadi et al., 2005; Balazinska et al., 2005).
Conclusion
This
project discussed and analyzed advanced processing of Big Data. There are
various processing systems such as Iterative Processing, Graph Processing,
Stream Processing also known as Event Processing or Real-Time Processing, and
Batch Processing. A MapReduce-based framework such as Hadoop
supports the Batch-Oriented Processing.
MapReduce also lacks the built-in support for the Iterative Processing
which requires parsing datasets
iteratively, large Graph Processing, and Stream Processing. Thus, various models such as Twister, HaLoop,
and iMapReduce are introduced to improve
the Iterative Processing of the MapReduce. With
regard to the Graph Processing, MapReduce is suitable for processing
flat data structures, such as vertex-oriented tasks and propagation is optimized
for edge-oriented tasks on partitioned graphs.
However, various models are introduced to improve the programming models
for large graph processing such as Surfer, Apache Hama, GoldenOrb, Giraph,
Pregel, GraphLab. With regard to the Stream Processing, various
models are also introduced to overcome the limitation of the MapReduce
framework which deals only with batch-oriented processing. These Stream Processing models include
Aurora, Borealis, IBM Space, StreamCloud, Stormy, Twitter Storm, Spark Streaming,
Apache Storm, StreamMapReduce, Simple Scalable Streaming System (S4), and IBM
InfoSphere Streams. This project focused
the discussion and the analysis of the
Stream Processing models of Aurora and Borealis. The discussion and the analysis of Aurora model
included an overview of the Aurora model as Streaming Processing Engine (SPE),
followed by the Aurora Framework and the fundamental components of the Aurora
topology. The Query Model of Auroral,
which is known as Streak Query Algebra “SQuAI,”
supports seven operators constructing the Aurora network and queries for
expressing its stream processing requirements.
The discussion and analysis also included the “SQuAl” and the Query
Model, the Run-Time Framework and the Optimization systems. The Aurora* and Medusa as Distributed Stream
Processing are also discussed and
analyzed. The second SPE is Borealis
which is a Distributed SPE. The
discussion and the analysis of the Borealis involved the framework, the query
model, and the optimization technique. Borealis is an expansion to Aurora’s SPE to include
and support features which are required for
the Distributed Real-Time Streaming. The
comparison between Aurora and Borealis is also discussed and analyzed at all
levels from the network, query model, and
the optimization techniques.
References
Abadi, D. J., Ahmad, Y., Balazinska,
M., Cetintemel, U., Cherniack, M., Hwang, J.-H., . . . Ryvkina, E. (2005). The Design of the Borealis Stream Processing
Engine.
Abadi,
D. J., Carney, D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., . . .
Zdonik, S. (2003). Aurora: a new model and architecture for data stream
management. The VLDB Journal, 12(2),
120-139.
Ahmad,
Y., Berg, B., Cetintemel, U., Humphrey, M., Hwang, J.-H., Jhingran, A., . . .
Tatbul, N. (2005). Distributed operation
in the Borealis stream processing engine. Paper presented at the
Proceedings of the 2005 ACM SIGMOD international conference on Management of
data.
Balazinska,
M., Balakrishnan, H., Madden, S., & Stonebraker, M. (2005). Fault-tolerance in the Borealis distributed
stream processing system. Paper presented at the Proceedings of the 2005
ACM SIGMOD international conference on Management of data.
Carney,
D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., Seidman, G., . . .
Stonebraker, M. (2002). Monitoring
streams—a new class of data management applications. Paper presented at the
VLDB’02: Proceedings of the 28th International Conference on Very Large
Databases.
Chandarana,
P., & Vijayalakshmi, M. (2014, 4-5 April 2014). Big Data analytics frameworks. Paper presented at the Circuits,
Systems, Communication and Information Technology Applications (CSCITA), 2014
International Conference on.
Chen,
R., Weng, X., He, B., & Yang, M. (2010). Large graph processing in the cloud. Paper presented at the
Proceedings of the 2010 ACM SIGMOD International Conference on Management of
data.
Chen,
R., Yang, M., Weng, X., Choi, B., He, B., & Li, X. (2012). Improving large graph processing on
partitioned graphs in the cloud. Paper presented at the Proceedings of the
Third ACM Symposium on Cloud Computing.
Cherniack,
M., Balakrishnan, H., Balazinska, M., Carney, D., Cetintemel, U., Xing, Y.,
& Zdonik, S. B. (2003). Scalable
Distributed Stream Processing.
Cui,
B., Mei, H., & Ooi, B. C. (2014). Big data: the driver for innovation in
databases. National Science Review, 1(1),
27-30.
Erl,
T., Khattak, W., & Buhler, P. (2016). Big
Data Fundamentals: Concepts, Drivers & Techniques: Prentice Hall Press.
Fernández,
A., del Río, S., López, V., Bawakid, A., del Jesus, M. J., Benítez, J. M.,
& Herrera, F. (2014). Big Data with Cloud Computing: an insight on the
computing environment, MapReduce, and programming frameworks. Wiley Interdisciplinary Reviews: Data Mining
and Knowledge Discovery, 4(5), 380-409.
Gonzalez,
J. E., Xin, R. S., Dave, A., Crankshaw, D., Franklin, M. J., & Stoica, I.
(2014). GraphX: Graph Processing in a
Distributed Dataflow Framework.
Gradvohl,
A. L. S., Senger, H., Arantes, L., & Sens, P. (2014). Comparing distributed
online stream processing systems considering fault tolerance issues. Journal of Emerging Technologies in Web
Intelligence, 6(2), 174-179.
Grolinger,
K., Hayes, M., Higashino, W. A., L’Heureux, A., Allison, D. S., & Capretz,
M. A. (2014). Challenges for mapreduce in
big data. Paper presented at the Services (SERVICES), 2014 IEEE World
Congress on.
Gupta,
R., Gupta, H., & Mohania, M. (2012). Cloud
computing and big data analytics: what is new from databases perspective?
Paper presented at the International Conference on Big Data Analytics.
Hu,
H., Wen, Y., Chua, T.-S., & Li, X. (2014). Toward scalable systems for big
data analytics: A technology tutorial. IEEE
Access, 2, 652-687.
Manyika,
J., Chui, M., Brown, B., Bughin, J., Dobbs, R., Roxburgh, C., & Byers, A.
H. (2011). Big data: The next frontier for innovation, competition, and
productivity.
Neumeyer,
L., Robbins, B., Nair, A., & Kesari, A. (2010-639). S4: Distributed stream computing platform. Paper presented at the
2010 IEEE International Conference on Data Mining Workshops.
Sakr,
S., & Gaber, M. (2014). Large Scale
and big data: Processing and Management: CRC Press.
Schwarzkopf,
M., Murray, D. G., & Hand, S. (2012). The
seven deadly sins of cloud computing research. Paper presented at the
Presented as part of the.
Scott,
J. A. (2015). Getting Started with Spark: MapR Technologies, Inc.
Spiess,
J., T’Joens, Y., Dragnea, R., Spencer, P., & Philippart, L. (2014). Using
Big Data to Improve Customer Experience and Business Performance. Bell Labs Tech. J., 18: 13–17.
doi:10.1002/bltj.21642
Zhang,
Y., Chen, S., Wang, Q., & Yu, G. (2015). i^2MapReduce: Incremental
MapReduce for Mining Evolving Big Data. IEEE
transactions on knowledge and data engineering, 27(7), 1906-1919.
Zhang, Y., Gao, Q., Gao, L., & Wang, C. (2012).
imapreduce: A distributed computing framework for iterative computation. Journal of Grid Computing, 10(1), 47-68.
MapReduce and RDF Data
Query Processing Optimized Performance
This project
provides a survey on the state-of-the-art techniques for applying MapReduce to
improve the Resource Description Framework (RDF) data query processing
performance. There has been a tremendous
effort from the industry and researchers to develop efficient and scalable RDF
processing systems. Most of the complex
data-processing tasks require multiple cycles of the MapReduce which are
chained together into sequential. The decomposition
of a task into cycles or subtasks are often implemented. Thus, the low overall workflow cost is a key
element in the decomposition. Each cycle
of the MapReduce results in significant overhead. When using RDF, the decomposition problem
reflects the distribution of operations such as SELECT, and JOIN into subtasks which
is supported by MapReduce cycle. The issue of the decomposition is related to
the operations order because the neighboring operation in a query plan can be
effectively grouped into the same subtasks.
When using MapReduce, the operation order is based on the requirement of
key partitioning so that the neighboring operations do not cause any conflict. Various techniques are proposed to enhance
the performance of the semantic web queries using RDF and MapReduce.
This
project begins with the overview of RDF, followed by RDF Store Architecture,
MapReduce Parallel Processing Framework, and Hadoop. RDF and SPARQL using semantic query is
discussed covering the syntax of SPARQL and the missing features that are
required to enhance the performance of RDF using MapReduce. Various techniques are discussed and analyzed
on the application of MapReduce to improve RDF query processing
performance. Some of these techniques
include HadoopRDF, RDFPath, and PigSPARQL.
Resource Description
Framework (RDF)
Resource
Description Framework (RDF) is described as an emerging standard for processing
metadata (Punnoose,
Crainiceanu, & Rapp, 2012; Tiwana & Balasubramaniam, 2001) (Punnoose
et al., 2012; Tiwana & Balasubramaniam, 2001). RDF provides interoperability between
applications that exchange machine-understandable information on the Web (Sakr
& Gaber, 2014; Tiwana & Balasubramaniam, 2001). The primary goal of RDF is to define a
mechanism and provide standards for the metadata and for describing resources on
the Web (Boussaid,
Tanasescu, Bentayeb, & Darmont, 2007; Firat & Kuzu, 2011; Tiwana &
Balasubramaniam, 2001) which makes no a priori assumptions
about a particular application domain or the associated semantics (Tiwana
& Balasubramaniam, 2001). These standards or mechanisms provided by the
RDF can prevent users from accessing irrelevant subjects because RDF provided
metadata that is relevant to the desired information (Firat
& Kuzu, 2011).
RDF
is also described as a Data Model (Choi,
Son, Cho, Sung, & Chung, 2009; Myung, Yeon, & Lee, 2010) for representing
labeled directed graphs (Choi
et al., 2009; Nicolaidis & Iniewski, 2017), and useful for a
Data Warehousing solution as the MapReduce framework (Myung
et al., 2010).
RDF is used as an important building block of Semantic Web of Web 3.0
(see Figure 1) (Choi
et al., 2009; Firat & Kuzu, 2011). The technologies of the Semantic Web are
useful for maintaining data in the Cloud (M.
F. Husain, Khan, Kantarcioglu, & Thuraisingham, 2010). These technologies of the Semantic Web
provide the ability to specify and query heterogeneous data in a standard
manner (M.
F. Husain et al., 2010).
RDF
Data Model can be extended to ontologies to include RDF Schema (RDFS) and Ontology
Web Language (OWL) to provide techniques to define and identify vocabularies
specified to a certain domain, schema and relations between the elements of the
vocabulary (Choi
et al., 2009).
RDS can be exported in various
file formats (Sun
& Jara, 2014).
The most common of these formats is RDF + XML and XSD (Sun
& Jara, 2014).
The OWL is used to add semantics to the schema (Sun
& Jara, 2014).
For instance, if “A isAssociatedWith B,” which implies that “B is AssociatedWith
A” (Sun
& Jara, 2014).
The OWL allows the ability to express these two things the same way (Sun
& Jara, 2014).
This similarity feature allowed by OWL is very useful for “joining” data
expressed in different schemas (Sun
& Jara, 2014).
This feature allows building relationship and joining up data from
multiple sites, described as “Linked Data” facilitating the heterogeneous data
stream integration (Sun
& Jara, 2014).
OWL enables new facts to be derived from known facts using the inference
rules (Nicolaidis
& Iniewski, 2017).
Another example which can be used to enforce the inference technique
using OWL is when a triple states that a car is a subtype of a vehicle and
another triple state that a Cabrio is a subtype of a car, the new fact will be
that Cabrio is a vehicle which can be inferred from the previous facts (Nicolaidis
& Iniewski, 2017).
RDF Data Model is described to be a simple and flexible framework (Myung et al., 2010). The underlying form and expression in RDF is a collection of “triples,” each consisting of a subject (s), a predicate (p), and an object (o) (Brickley, 2014; Connolly & Begg, 2015; Nicolaidis & Iniewski, 2017; Przyjaciel-Zablocki, Schätzle, Skaley, Hornung, & Lausen, 2013; Punnoose et al., 2012). The subjects and predicates are Resources; each encoded as a Uniform Resource Identifier (URI) to ensure the uniqueness, while the object can be a Resource or a Literal such as string, date or number (Nicolaidis & Iniewski, 2017). In (Firat & Kuzu, 2011), the basic structure of the RDF Data Model is based on a triplet of the object (O), quality (A) and value (V) (Firat & Kuzu, 2011). The basic role of RDF is to provide Data Model of the object, quality and value (OAV) (Firat & Kuzu, 2011). RDF Data Model is similar to the XML Data Model, where both do not include form-related information or names (Firat & Kuzu, 2011).
Figure 1: The Major Building Blocks of the Semantic Web Architectures. Adapted from (Firat & Kuzu, 2011).
RDF has been commonly used in
applications such as Semantic Web, Bioinformatics, and Social Networks because
of its great flexibility and applicability (Choi
et al., 2009).
These applications require a huge computation over a large set of data (Choi
et al., 2009).
Thus, the large-scale graph datasets are very common among these
applications of Semantic Web, Bioinformatics, and Social Networks (Choi
et al., 2009).
However, the traditional techniques for processing such large-scale of
the dataset are found to be inadequate (Choi
et al., 2009).
Moreover, RDF Data Model enables
existing heterogeneous database systems to be integrated into a Data Warehouse
because of its flexibility (Myung
et al., 2010).
The flexibility of the RDF Data Model also provides users the inference
capability to discover unknown knowledge which is useful for large-scale data
analysis (Myung
et al., 2010). RDF triples require terabytes of disk space
for storage and analysis (M.
Husain, McGlothlin, Masud, Khan, & Thuraisingham, 2011; M. F. Husain et
al., 2010).
Researchers are encouraged to develop efficient repositories, because
there are only a few existing frameworks such as RDF-3X, Jena, Sesame, BigOWLIM
for Semantic Web technologies (M.
Husain et al., 2011; M. F. Husain et al., 2010).
These frameworks are single-machine RDF systems and are widely used
because they are user-friendly and perform well for small and medium-sized RDF datasets
(M.
Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).
The RDF-3X is regarded to be the fastest single machine RDF systems regarding
query performance that vastly outperforms previous single machine systems (M.
Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).
However, the performance of RDF-3X diminishes for queries with unbound
objects and low selectivity factor (M.
Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).
These frameworks are confronted by the large RDF graphs (M.
Husain et al., 2011; M. F. Husain et al., 2010). Therefore, the storage of a large volume of
RDF triples and the efficient query of the RDF triples are challenging and are regarded
to be critical problems in Semantic Web (M.
Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).
These challenges also limit the scaling capabilities (M.
Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).
RDF Store Architecture
The main purpose of the RDF store is to build a database for storing and retrieving data of any data expressed in RDF (Modoni, Sacco, & Terkaj, 2014). The term RDF store is used as an abstract for any system that can handle RDF data, allowing the ingestion of serialized RDF data and the retrieval of these data, and providing a set of APIs to facilitate the integration with other third-party application as the client application (Modoni et al., 2014). The term triple store often refers to these types of systems (Modoni et al., 2014). RDF store includes two major components; the Repository and the Middleware. The Repository represents a set of files or database (Modoni et al., 2014). The Middleware is on top of the repository and in constant communication with it (Modoni et al., 2014). Figure 2 illustrates the RDF store architecture. The Middleware has its components; Storage Provider, Query Engine, Parser/Serializer, and Client Connector (Modoni et al., 2014). The current RDF stores are categorized into three groups; database based stores, native stores, and hybrid stores (Modoni et al., 2014). Examples of the database based stores include MySQL, Oracle 12c, which are built on top of existing commercial database engines (Modoni et al., 2014). Examples of native stores are AllegroGraph, OWLIM which are built as database engines from scratch (Modoni et al., 2014). Examples of the hybrid stores are Virtuoso, and Sesame which supports architectural styles; native and DBMS-backed (Modoni et al., 2014).
Figure 2: RDF
Store Architecture (Modoni et al., 2014)
MapReduce Parallel
Processing Framework and Hadoop
In 2004, Google introduced MapReduce framework as a Parallel Processing
framework which deals with a large set of
data (Bakshi, 2012; Fadzil, Khalid, &
Manaf, 2012; White, 2012-important-buildingblocks). The MapReduce framework has gained much
popularity because it has features for hiding sophisticated operations of the
parallel processing (Fadzil et al., 2012). Various MapReduce frameworks
such as Hadoop were introduced because of the enthusiasm towards MapReduce (Fadzil et al., 2012). The capability of the MapReduce
framework was realized by different research areas such as data warehousing,
data mining, and the bioinformatics (Fadzil et al., 2012). MapReduce framework consists of
two main layers; the Distributed File System (DFS) layer to store data and the
MapReduce layer for data processing (Lee, Lee, Choi, Chung, & Moon, 2012;
Mishra, Dehuri, & Kim, 2016; Sakr & Gaber, 2014). DFS is a major feature of the MapReduce
framework (Fadzil et al., 2012).
MapReduce framework is using large clusters of low-cost
commodity hardware to lower the cost (Bakshi, 2012; H. Hu, Wen, Chua, & Li, 2014;
Inukollu, Arsi, & Ravuri, 2014; Khan et al., 2014; Krishnan, 2013; Mishra
et al., 2016; Sakr & Gaber, 2014; White, 2012-important-buildingblocks). MapReduce framework is using
“Redundant Arrays of Independent (and inexpensive) Nodes (RAIN),” whose
components are loosely coupled and when
any node goes down, there is no negative
impact on the MapReduce job (Sakr & Gaber, 2014; Yang, Dasdan,
Hsiao, & Parker, 2007). MapReduce framework involves the
“Fault-Tolerance” by applying the replication technique and allows replacing
any crashed nodes with another node without affecting the currently running job (P. Hu & Dai, 2014; Sakr & Gaber,
2014). MapReduce framework involves the automatic
support for the parallelization of execution which makes the MapReduce highly
parallel and yet abstracted (P. Hu & Dai, 2014; Sakr & Gaber,
2014).
MapReduce was introduced to solve the
problem of parallel processing of a large set
of data in a distributed environment which required manual management of the
hardware resources (Fadzil et al., 2012; Sakr & Gaber,
2014). The complexity of the parallelization is solved by using two techniques: Map/Reduce technique, and Distributed File
System (DFS) technique (Fadzil et al., 2012; Sakr & Gaber,
2014). The parallel framework must be reliable to
ensure good resource management in the distributed environment using
off-the-shelf hardware to solve the scalability issue to support any future
requirement for processing (Fadzil et al., 2012). The earlier frameworks such as
the Message Passing Interface (MPI) framework was having a reliability issue
and had a fault-tolerance issue when
processing a large set of data (Fadzil et al., 2012). MapReduce framework covers the
two categories of the scalability; the structural scalability, and the load
scalability (Fadzil et al., 2012). It addresses the structural
scalability by using the DFS which allows forming large virtual storage for the
framework by adding off-the-shelf hardware.
MapReduce framework addresses the load scalability by increasing the
number of the nodes to improve the performance (Fadzil et al., 2012).
However,
the earlier version of MapReduce framework faced challenges. Among these
challenges are the join operation and the
lack of support for aggregate functions to join multiple datasets in one task (Sakr
& Gaber, 2014).
Another limitation of the standard MapReduce framework is found in the iterative processing which is required for analysis techniques such as
PageRank algorithm, recursive relational queries, and social network analysis (Sakr
& Gaber, 2014).
The standard MapReduce does not share the execution of work to reduce
the overall amount of work (Sakr
& Gaber, 2014).
Another limitation was found in the lack of support of data index and
column storage but support only for a sequential
method when scanning the input data. Such a lack of data index affected
the query performance (Sakr
& Gaber, 2014).
Moreover, many argued that MapReduce is not regarded to be the optimal
solution for structured data. It is
known as shared-nothing architecture, which supports scalability (Bakshi,
2012; Jinquan, Jie, Shengsheng, Yan, & Yuanhao, 2012; Sakr & Gaber,
2014; White, 2012-important-buildingblocks), and the processing of large
unstructured data sets (Bakshi,
2012).
MapReduce has the limitation of performance and efficiency (Lee
et al., 2012).
Hadoop is a software framework which is derived from Big
Table and MapReduce and managed by Apache.
It was created by Doug Cutting and was named after his son’s toy
elephant (Mishra et al.,
2016). Hadoop allows applications to run on huge
clusters of commodity hardware based on MapReduce (Mishra et al.,
2016). The underlying concept of Hadoop is to allow
the parallel processing of the data across different computing nodes to speed
up computations and hide the latency (Mishra et al.,
2016). The Hadoop Distributed File System (HDFS) is
one of the major components of the Hadoop framework for storing large files (Bao, Ren, Zhang,
Zhang, & Luo, 2012; Cloud Security Alliance, 2013; De Mauro, Greco, &
Grimaldi, 2015) and allowing
access to data scattered over multiple nodes in without any exposure to the
complexity of the environment (Bao et al., 2012;
De Mauro et al., 2015). The MapReduce programming model is another major component
of the Hadoop framework (Bao
et al., 2012; Cloud Security Alliance, 2013; De Mauro et al., 2015) which is designed to implement the
distributed and parallel algorithms efficiently (De
Mauro et al., 2015).
HBase is the third component of Hadoop framework (Bao
et al., 2012).
HBase is developed on the HDFS and is a NoSQL (Not only SQL) type
database (Bao
et al., 2012).
The
key features of Hadoop include the scalability and flexibility, cost efficiency
and fault tolerance (H.
Hu et al., 2014; Khan et al., 2014; Mishra et al., 2016; Polato, Ré, Goldman,
& Kon, 2014; Sakr & Gaber, 2014).
Hadoop allows the nodes in the cluster to scale up and down based on the
computation requirements and with no change in the data formats (H.
Hu et al., 2014; Polato et al., 2014).
Hadoop also provides massively parallel computation to commodity
hardware decreasing the cost per terabyte of storage which makes the massively
parallel computation affordable when the volume of the data gets increased (H.
Hu et al., 2014).
The Hadoop technology offers the flexibility feature as it is not tight
with a schema which allows the utilization of any data either structured,
non-structures, and semi-structured, and
the aggregation of the data from multiple sources (H.
Hu et al., 2014; Polato et al., 2014).
Hadoop also allows nodes to crash without affecting the data
processing. It provides fault tolerance
environment where data and computation can be recovered without any negative
impact on the processing of the data (H.
Hu et al., 2014; Polato et al., 2014; White, 2012-important-buildingblocks).
RDF and SPARQL Using
Semantic Query
Researchers
have exerted effort in developing Semantic Web technologies which have been standardized
to address the inadequacy of the current traditional analytical techniques (M.
Husain et al., 2011; M. F. Husain et al., 2010). RDF, SPARQL
(Simple Protocol And RDF Query Language) are the most prominent standardized semantic
web technologies (M.
Husain et al., 2011; M. F. Husain et al., 2010). The Data Access Working Group (DAWG) of the
World Wide Web Consortium (W3C) in 2007 recommended SPARQL and provided
standards to be the query language for RDF, a protocol definition for sending
SPARQL queries from a client to a query processor and an XML-based serialization
format for results returned by the SPARQL query (Konstantinou,
Spanos, Stavrou, & Mitrou, 2010; Sakr & Gaber, 2014). RDF is regarded to be the standard for
storing and representing data (M.
Husain et al., 2011; M. F. Husain et al., 2010). SPARQL is the query language to retrieve data
from RDF triplestore (M.
Husain et al., 2011; M. F. Husain et al., 2010; Nicolaidis & Iniewski,
2017; Sakr & Gaber, 2014; Zeng, Yang, Wang, Shao, & Wang, 2013).
Like RDF, SPARQL is built on the “triple pattern,” which also contains
the “subject,” “predicate,” and “object” and is terminated with a full stop (Connolly
& Begg, 2015).
RDF triple is regarded to be a SPARQL triple pattern (Connolly
& Begg, 2015).
URIs are written inside angle brackets for identifying resources;
literal strings are denoted with either double or single quote; properties,
like Name, can be identified by their URI or more normally using a QName-style
syntax to improve readability (Connolly
& Begg, 2015). The triple pattern can include
variables which are not like the triple (Connolly
& Begg, 2015). Any or all of the values of
subject, predicate, and object in a triple pattern may be replaced by a
variable, which indicates data items of interest that will be returned by a
query (Connolly
& Begg, 2015).
The semantic query plays a significant role in the Semantic Web, and the standardization of SPARQL plays a significant role to achieve such semantic queries (Konstantinou et al., 2010). Unlike the traditional query languages, SPARQL does not consider the graph level, but rather it models the graph as a set of triples (Konstantinou et al., 2010). Thus, when using the SPARQL query, the graph pattern is identified, and the nodes which match this pattern are returned (Konstantinou et al., 2010; Zeng et al., 2013). SPARQL syntax is similar to SQL such as SELECT FROM WHERE syntax which is the most striking syntax (Konstantinou et al., 2010). The core syntax of SPARQL is a conjunctive set of triple patterns called as “basic graph pattern” (Zeng et al., 2013). Table 1 shows the syntax of SPARQL to retrieve data from RDF using SELECT statement.
Table 1: Example
of SPARQL syntax (Sakr & Gaber, 2014)
Although SPARQL syntax is similar to SQL
in the context of SELECT to retrieve data, SPARQL is not as mature as SQL (Konstantinou
et al., 2010).
The current form of SPARQL allows the access to the raw data using URIs
from RDF or OWL graph and letting the user perform the result processing (Konstantinou
et al., 2010).
However, SPARQL is expected to be the gateway to query information and
knowledge supporting as many features as SQL does (Konstantinou
et al., 2010).
SPARQL does not support any aggregated functions such as MAX, MIN, SUM,
AVG, COUNT, and the GROUP BY operations (Konstantinou
et al., 2010).
Moreover, SPARQL supports ORDER BY only on a global level and not solely
on the OPTIONAL part of the query (Konstantinou
et al., 2010).
For mathematical operations, SPARQL does not extend its support beyond
the basic mathematical operations (Konstantinou
et al., 2010).
SPARQL does not support the nested queries, meaning it does not allow
CONSTRUCT query in the FROM part of the query.
Moreover, SPARQL is missing the functionality offered by SELECT WHERE
LIKE statement in SQL, allowing for keyword-based queries (Konstantinou
et al., 2010).
While SPARQL offers regex() function for string pattern matching, it
cannot emulate the functionality of the LIKE operator (Konstantinou
et al., 2010).
SPARQL enables only the unbound variables in the SELECT part and
rejecting the use of functions or other operators (Konstantinou
et al., 2010). This limitation places SPARQL as
elementary query language where URIs or literals only are returned, while users
look for some result processing in the practical use cases (Konstantinou
et al., 2010).
SPARQL can be enhanced to include these missing features and
functionality to include stored procedures, triggers, and operations for data
manipulations such as update, insert, and delete (Konstantinou
et al., 2010).
There
is a group called SPARQL Working Group who are working on integrating these missing
features in SPARQL. SPARQL/Update is an
extension to SPARQL included in the leading Semantic Web development framework
“Jena” allowing the update operation, the creation and the removal of the RDF
graphs (Konstantinou
et al., 2010). ARQ is a query engine for Jena
which supports the SPARQL RDF Query language (Apache,
2017a). Some of the key features of the ARQ include
the update, the GROUP BY, access and extension of the SPARQL algebra, and
support for the federated query (Apache,
2017a). LARQ integrates SPARQL with Apache’s
full-text search framework Lucene (Konstantinou
et al., 2010) adding free text searches to SPARQL (Apache,
2017b). SPARQL+ extension of the ARC RDF sore offers
most of the common aggregates and extends the SPARUL’s INSERT with CONSTRUCT
clause (Konstantinou
et al., 2010).
The OpenLink’s Virtuoso extends SPARQL with aggregate functions, nesting,
and subqueries, allowing the user to insert SPARQL queries inside SQL (Konstantinou
et al., 2010).
SPASQL offers a similar functionality embedding SPARQL into SQL (Konstantinou
et al., 2010).
Although
SPARQL is missing a lot of SQL features, it offers other features which are not
part of SQL (Konstantinou
et al., 2010).
Some of these features include the OPTIONAL operator which does not
modify the results in case of non-existence and it can be met in almost all of
the query languages for RDF (Konstantinou
et al., 2010).
This feature is equivalent to the LEFT OUTER JOIN in SQL (Konstantinou
et al., 2010).
However, SPARQL syntax is much more user-friendly and intuitive than SQL
(Konstantinou
et al., 2010).
Techniques Applied on MapReduce
To Improve RDF Query Processing
Performance
With
the explosive growth of the data size, the traditional approach of analyzing
the data in a centralized server is not adequate to scale up (Punnoose
et al., 2012; Sakr & Gaber, 2014), and cannot scale
concerning the increasing RDF datasets (Sakr
& Gaber, 2014).
Although SPARQL is used to query RDF data, the query of RDF dataset at
the web scale is challenging because the computation of SPARQL queries requires
several joins between subsets of the dataset (Sakr
& Gaber, 2014).
New methods are introduced to improve the parallel computing and allow
storage and retrieval of RDF across large compute clusters which enables
processing data of unprecedented magnitude (Punnoose
et al., 2012).
Various solutions are introduced to solve these challenges and achieve
scalable RDF processing using the MapReduce framework such as PigSPARQL, and
RDFPath.
RDFPath
In (Przyjaciel-Zablocki, Schätzle, Hornung, & Lausen, 2011), the RDFPath is proposed as a declarative path query language for RDF which provides a natural mapping to the MapReduce programming model by design, while remaining extensible (Przyjaciel-Zablocki et al., 2011). It supports the exploration of graph properties such as shortest connections between two nodes in an RDF graph (Przyjaciel-Zablocki et al., 2011). RDFPath is regarded to be a valuable tool for the analysis of social graphs (Przyjaciel-Zablocki et al., 2011). RDFPath combines an intuitive syntax for path queries with an effective execution strategy using MapReduce (Przyjaciel-Zablocki et al., 2011). RDFPath does benefit from the horizontal scaling properties of MapReduce when adding more nodes to improve the overall executions time significantly (Przyjaciel-Zablocki et al., 2011). Using RDFPath, large RDF graphs can be handled while scaling linearly with the size of the graph that RDFPathh can be used to investigate graph properties such as a variant of the famous six degrees of separation paradigm typically encountered in social graphs (Przyjaciel-Zablocki et al., 2011). It focuses on the path queries and studies their implementation based on MapReduce. There are various RDF query languages such as RQL, SeRQL, RDQL, Triple, N3, Versa, RxPath, RPL, and SPARQL (Przyjaciel-Zablocki et al., 2011). RDFPath has a competitive expressiveness to these other RDF query languages (Przyjaciel-Zablocki et al., 2011). A comparison of RDFPath capabilities with these other RDF query language shows that RDFPath has the same capabilities of SPARQL 1.1for the adjacent nodes, adjacent edges, the degree of a node, and fixed-length path. However, RDFPath shows more capabilities than SPARQL 1.1 in areas like the distance between two nodes and shortest paths as it has partial support for these two properties. However, SPARQL 1.1 shows full support to the aggregate functions while RDFPath shows only partial support (Przyjaciel-Zablocki et al., 2011). Table 2 shows the comparison of RDFPath with other RDF query languages including SPARQL.
Table 2: Comparison of RDF Query Language, adapted from (Przyjaciel-Zablocki et al., 2011).
2. PigSPARQL
PigSPARQL
is regarded as a competitive yet easy to
use SPARQL query processing system on MapReduce that allows ad-hoc SPARQL query processing n large RDF
graphs out of the box (Schätzle, Przyjaciel-Zablocki, Hornung, & Lausen,
2013). PigSPARQL is described as a system for
processing SPARQL queries using the MapReduce framework by translating them
into Pig Latin programs where each Pig Latin program is executed by a series of
MapReduce jobs on a Hadoop cluster (Sakr & Gaber, 2014; Schätzle et al., 2013). PigSPARQL
utilizes the query language of Pig, which is a data analysis platform on top of
Hadoop MapReduce, as an intermediate layer between SPARQL and MapReduce (Schätzle et al., 2013). That intermediate layer provides an
abstraction level which makes PigSPARQL independent of Hadoop version and
accordingly ensures the compatibility to future changes of the Hadoop framework
as they will be covered by the underlying Pig layer (Schätzle et al., 2013). This intermediate layer of Pig Latin approach
provides the sustainability of PigSPARQL and is an attractive long-term
baseline for comparing various MapReduce based SPARQL implementations which are also underpinned by the competitiveness
with the existing systems such as HadoopRDF (Schätzle et al., 2013). As illustrated in Figure 3, the PigSPARQL
workflow begins with the SPARQL that is mapped to Pig Latin by parsing the
SPARQL query to generate an abstract syntax tree which is translated into a SPARQL Algebra tree (Schätzle et al., 2013). Several optimizations are applied on the
Algebra level like the early execution of filters and a re-arrangement of
triple patterns by selectivity (Schätzle et al., 2013). The optimized Algebra tree is traversed bottom-up,
and an equivalent sequence of Pig Latin
expressions are generated for every SPARQL Algebra operator (Schätzle et al., 2013). Pig automatically maps the resulting Pig
Latin script into a sequence of MapReduce iterations at the runtime (Schätzle et al., 2013).
PigSPARQL is described as easy to use and competitive baseline for the comparison of MapReduce based SPARQL processing. PigSPARQL exceeds the functionalities of most existing research prototypes with the support of SPARQL 1.0 (Schätzle et al., 2013).
Figure 3: PigSPARQL Workflow From SPARQL to MapReduce, adapted from (Schätzle et al., 2013).
3. Interactive SPARQL Query Processing on Hadoop: Sempala
In (Schätzle, Przyjaciel-Zablocki, Neu, & Lausen, 2014), an interactive SPARQL query processing techniques “Sempala” on Hadoop is proposed. Sempala is a SPARQL-over-SQL-on-Hadoop approach designed with selective queries (Schätzle et al., 2014). It shows significant performance improvements compared to existing approaches (Schätzle et al., 2014). The approach of Sempala is inspired by the trend of applying SQL-on-Hadoop field where several new systems are developed for interactive SQL query processing such as Hive, Sharl, Presto, Phoenix, Impala and so forth (Schätzle et al., 2014). Thus, Sempala as the SPARQL-over-SQL approach is introduced to follow the trend and provide interactive-time SPARQL query processing on Hadoop (Schätzle et al., 2014). With Sempala, the data is stored in RFD in a columnar layout on HDFS and use Impala, which is an open source massive parallel processing (MPP) SQL query engine for Hadoop, to serve as the execution layer on top (Schätzle et al., 2014). The architecture of Sempala is illustrated in Figure 4.
Figure 4: Sempala Architecture
adapted from (Schätzle et al., 2014).
Two main components of the architecture of the proposed Sempala; RDF Loader and Query Compiler (Schätzle et al., 2014). The RDF Loader converts an RDF dataset into the data layout used by Sempala. The Query Compiler rewrites a given SPARQL query into the SQL dialect of Impala based on the layout of the data (Schätzle et al., 2014). The Query Compiler of Sempala is based on the algebraic representation of SPARQL expressions defined by W3C recommendation (Schätzle et al., 2014). Jena ARQ is used to parse a SPARQL query into the corresponding algebra tree (Schätzle et al., 2014). Some basic algebraic optimization such as filter pushing is applied (Schätzle et al., 2014). The final step is to traverse the tree bottom up to generate the equivalent Impala SQL expressions based on the unified property table layout (Schätzle et al., 2014). In a comparison of Sempala with other Hadoop based systems such as Hive, PigSPARQL, MapMerge, and MAPSIN. Hive is the standard SQL warehouse for Hadoop based on MapReduce (Schätzle et al., 2014). The same query with minor syntactical modification can run on the same data because Impala is developed to be highly compatible with Hive (Schätzle et al., 2014). Sempala seems to follow a similar approach as PigSPARQL. However, in PigSPARQL, the Pig is used as the underlying system and intermediate level between MapReduce and SPARQL (Schätzle et al., 2014). MapMerge is an efficient map-side merge join implementation for scalable SPARQL BGP (“BasicGraphPatterns” (W3C, 2016)) which reduces the shuffling of the data between map and reduce phases in MapReduce (Schätzle et al., 2014). MAPSIN is an approach that uses HBase, which is standard NoSQL database for Hadoop to store RDF data and applies a map-side index nested loop join which avoids the reduce phase of the MapReduce (Schätzle et al., 2014). The findings of (Schätzle et al., 2014) shows that Sempala outperforms Hive and PigSPARQL, while MapMerge and MAPSIN could not be used because they only support SPARQL BGP (Schätzle et al., 2014).
4. Map-Side Index Nested Loop Join (MAPSIN JOIN)
MapReduce is
facing the challenge of processing joins because the datasets are very large (Sakr & Gaber, 2014). Two datasets can be joined using MapReduce, but they have to be located on the same
machine, which is not practical (Sakr & Gaber, 2014). Thus, solutions such as reduce-side approach are used and regarded to be the most prominent
and flexible join technique in MapReduce (Sakr & Gaber, 2014). The reduce-side
approach is also known as “Repartition Join” because datasets at the map phase are read and repartition according to the join
key at the shuffle phase, while the actual computation for join is done in the reduce phase (Sakr & Gaber, 2014). The problem with this approach is that the
datasets are transferred through the
network with no regard to the join output which can consume a lot of the
bandwidth of the network and cause bottleneck (Sakr & Gaber, 2014; Schätzle et al., 2013). Another solution called map-side join
solution is introduced, where the actual
join processing is done in the map phase to avoid the shuffle and reduce phase
and avoid transferring both datasets over the network (Sakr & Gaber, 2014). The
most common approach is the map-side merge join, although it is hard to
cascade, in addition to the advantage of avoiding the shuffle and reduce phase
is lost (Sakr & Gaber, 2014). Thus, the MAPSIN approach is proposed which is a map-side index nested
loop join based on HBase (Sakr & Gaber, 2014; Schätzle et al., 2013). The MAPSIN join has the indexing capabilities
of HBase which improves the query performance of the selective queries (Sakr & Gaber, 2014; Schätzle et al., 2013). The capabilities retain the flexibility of
reduce-side joins while utilizing the effectiveness of a map-side join without
any modification to the underlying framework (Sakr & Gaber, 2014; Schätzle et al., 2013).
Comparing MAPSIN with PigSPARQL, MAPSIN performs faster than PigSPARQL when using a sophisticated storage schema based on HBase which works well for selective queries but diminishes significantly in performance for less selective queries (Schätzle et al., 2013). However, MAPSIN does not support the queries of LUBM (Lehigh University Benchmark (W3C, 2016). The query runtime of MAPSIN is close to the runtime of the merge join approach (Schätzle et al., 2013).
5. HadoopRDF
HadoopRDF is proposed by (Tian, Du, Wang, Ni, & Yu, 2012) to combine the advantages of high fault tolerance and high throughput of the MapReduce distributed framework and the sophisticated indexing and query answering mechanism (Tian et al., 2012). HadoopRDF is developed on Hadoop cluster with many computers and echo node in the cluster has a sesame server to supply the service for storing and retrieving the RDF data (Tian et al., 2012). HadoopRDF is a MapReduce-based RDF system which stores data directly in HDFS and does not require any modification to the Hadoop framework (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014; Tian et al., 2012). The basic idea is to substitute the rudimentary HDFS without indexes and a query execution engine, with more elaborated RDF stores (Tian et al., 2012). The architecture of HadoopRDF is illustrated in Figure 5.
Figure 5: HadoopRDF Architecture,
adapted from (Tian et al., 2012).
The architecture of HadoopRDF is similar to the architecture of Hadoop which scales up to thousands of nodes (Tian et al., 2012). Hadoop framework is the core of the HadoopRDF (Tian et al., 2012). Hadoop is built on top of HDFS, which is a replicated key-value store under the control of a central NameNode (Tian et al., 2012). Files in HDFS are broken into chunks fixed size, and the replica of these chunks are distributed across a group of DataNodes (Tian et al., 2012). The NameNode tracks the size and location of each replica (Tian et al., 2012). Hadoop which is a MapReduce framework is used for the computational purpose in the data-intensive application (Tian et al., 2012). In the architecture of HadoopRDF, the RDF stores are incorporated into the MapReduce framework. HadoopRDF is an advanced SPARQL engine which splits the original RDF graph according to predicates and objects and utilizes a cost-based query execution plan for reduce-side join (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014; Schätzle et al., 2013). HadoopRDF can re-balance automatically when the cluster size changes but join processing is also done in the reduce phase (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014). The findings of (M. Husain et al., 2011) indicated that HadoopRDF is more scalable and handles low selectivity queries more efficiently than RDF-3X. Moreover, the result showed that HadoopRDF is much more scalable than BigOWLIM and provides more efficient queries for the large data set (M. Husain et al., 2011). HadoopRDF requires a pre-processing phase like most systems (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014).
6. RDF-3X: RDF Triple eXpress
RDF-3X is proposed
by (Neumann & Weikum, 2008). The RDF-3X engine is an implementation of
SPARQL which achieves excellent performance by pursuing a RISC-style
architecture with a streamlined architecture (Neumann & Weikum, 2008). RISC
is Reduced Instruction Set Computer which is a type of microprocessor
architecture that utilizes a small, highly-optimized
set of instructions, rather than a more specialized
set of instruction often found in other types of architectures (Neumann & Weikum, 2008). Thus, RDF-3X follows the concept of
RISC-style with “reduced instruction set” designed to support RDF. RDF-3X is described to be a generic solution
for storing and indexing RDF triples that eliminates
the need for physical-design turning (Neumann & Weikum, 2008). RDF-3X provides a query optimizer for
choosing optimal join orders using a cost model based on statistical synopses
for entire join paths (Neumann & Weikum, 2008). It also provides a powerful and simple query
processor which leverage fast merge joins to the large-scale data (Neumann & Weikum, 2008). Three major components in RDF-3X; physical
design, query processor, and the query
optimizer. The physical design component
is workload-independent by creating appropriate indexes over a single “giant triples
table” (Neumann & Weikum, 2008). The query processor is RISC-style by relying
mostly on merge joins over sorted index lists.
The query optimizer focuses on join order in its generation of the execution plan (Neumann & Weikum, 2008).
The findings of (Neumann & Weikum, 2008) showed that RDF-3X addressed the challenge of schema-free data and copes very well with data that exhibit a large diversity of property names (Neumann & Weikum, 2008). The optimizer of RDF-3X is known to produce efficient query execution plan (Galarraga, Hose, & Schenkel, 2014). The RDF-3X maintains local indexes for all possible orders and combinations of the triple components and for aggregations which enable efficient local data access (Galarraga et al., 2014). RDF-3X does not support LUMB. RDF-3X is a single-node RDF-store which builds indexes over all possible permutations of subject, predicate and object (Huang, Abadi, & Ren, 2011; M. Husain et al., 2011; Schätzle et al., 2014; Zeng et al., 2013). RDF-X3 is regarded to be the fastest existing semantic web repository and state-of-the-art “benchmark” engine for single place machines (M. Husain et al., 2011; Przyjaciel-Zablocki et al., 2013). Thus, it outperforms any other solution for queries with bound objects and aggregate queries (M. Husain et al., 2011). However, the performance of RDF-3X diminishes exponentially for unbound queries and queries with even simple joins if the selectivity factor is low (M. Husain et al., 2011; Przyjaciel-Zablocki et al., 2013). The experiment of (M. Husain et al., 2011) showed that RDF-3X is not only slower for such queries, it often aborts and cannot complete the query (M. Husain et al., 2011).
7. Rya: A Scalable RDF Triple Store for the Clouds
In (Punnoose et al., 2012), the Rya is proposed as a new scalable system for storing
and retrieving RDF data in cluster nodes.
In Rya, OWL model is used as a set of triples and store them in the
triple store (Punnoose et al., 2012). Storing all the data
in the triple store provides the benefits of using Hadoop MapReduce to run
large batch processing jobs against the data set (Punnoose et al., 2012). The first phase of
the process is performed only once at the time when the OWL model is loaded
into Rya (Punnoose et al., 2012). In phase 1,
MapReduce job runs to iterate through the entire graph of relationships and
output the implicit relationships found as explicit RDF triples stores into the
RDF store (Punnoose et al., 2012). The second phase of
the process is performed every time a query is run, and once all explicit and
implicit relationships are stored in Rya, the Rya query planner can expand the
query at the runtime to utilize all these relationships (Punnoose et al., 2012). Three table index
for indexing RDF triples is used to enhance the performance (Punnoose et al., 2012). The results of (Punnoose, Crainiceanu, & Rapp, 2015) showed that Rya outperformed SHARD. Moreover, in comparison
with the graph-partitioning algorithm introduced by (Huang et al., 2011), as indicated in (Punnoose et al., 2015), the performance of Rya showed superiority in many cases
over Graph Partitioning (Punnoose et al., 2015).
Conclusion
This
project provided a survey on the state-of-the-art techniques for applying MapReduce to improve the RDF data query
processing performance. Tremendous
effort from the industry and researchers have been exerted to develop efficient
and scalable RDF processing system. The
project discussed the RDF framework and
the major building blocks of the semantic web architecture. The RDF store architecture and the MapReduce
Parallel Processing Framework and Hadoop are
discussed in this project.
Researchers have exerted effort in developing Semantic Web technologies
which have been standardized to address
the inadequacy of the current traditional
analytical techniques. This paper also
discussed the most prominent standardized
semantic web technologies RDF and
SPARQL. The project also discussed and
analyzed in details various techniques applied on MapReduce to improve the RDF
query processing performance. These techniques include RDFPath, PigSPARQL,
Interactive SPARQL Query Processing on Hadoop (Sempala), Map-Side Index Nested
Loop Join (MAPSIN JOIN), HadoopRDF, RDF-3X (RDF Triple eXpress), and Rya (a
Scalable RDF Triple Store for the Clouds).
References
Apache, J. (2017a). ARQ – A SPARQL Processor for Jena
Apache, J.
(2017b). LARQ – Adding Free Text Searches
to SPARQL
Bakshi, K.
(2012). Considerations for big data:
Architecture and approach. Paper presented at the Aerospace Conference,
2012 IEEE.
Bao, Y., Ren, L.,
Zhang, L., Zhang, X., & Luo, Y. (2012). Massive
sensor data management framework in cloud manufacturing based on Hadoop.
Paper presented at the Industrial Informatics (INDIN), 2012 10th IEEE
International Conference on.
Boussaid, O.,
Tanasescu, A., Bentayeb, F., & Darmont, J. (2007). Integration and
dimensional modeling approaches for complex data warehousing. Journal of Global Optimization, 37(4),
571. doi:10.1007/s10898-006-9064-6
Choi, H., Son,
J., Cho, Y., Sung, M. K., & Chung, Y. D. (2009). SPIDER: a system for scalable, parallel/distributed evaluation of
large-scale RDF data. Paper presented at the Proceedings of the 18th ACM
conference on Information and knowledge management.
Cloud Security
Alliance. (2013). Big Data Analytics for Security Intelligence. Big Data Working Group.
Connolly, T.,
& Begg, C. (2015). Database Systems:
A Practical Approach to Design, Implementation, and Management (6th Edition
ed.): Pearson.
De Mauro, A.,
Greco, M., & Grimaldi, M. (2015). What
is big data? A consensual definition and a review of key research topics.
Paper presented at the AIP Conference Proceedings.
Fadzil, A. F. A.,
Khalid, N. E. A., & Manaf, M. (2012). Performance
of scalable off-the-shelf hardware for data-intensive parallel processing using
MapReduce. Paper presented at the Computing and Convergence Technology
(ICCCT), 2012 7th International Conference on.
Firat, M., &
Kuzu, A. (2011). Semantic web for e-learning bottlenecks: disorientation and
cognitive overload. International Journal
of Web & Semantic Technology, 2(4), 55.
Galarraga, L.,
Hose, K., & Schenkel, R. (2014). Partout:
a distributed engine for efficient RDF processing. Paper presented at the
Proceedings of the 23rd International Conference on World Wide Web.
Hu, H., Wen, Y.,
Chua, T., & Li, X. (2014). Toward Scalable Systems for Big Data Analytics:
A Technology Tutorial. Practical
Innovation, Open Solution, 2, 652-687. doi:10.1109/ACCESS.2014.2332453
Hu, P., &
Dai, W. (2014). Enhancing fault tolerance based on Hadoop cluster. International Journal of Database Theory and
Application, 7(1), 37-48.
Huang, J., Abadi,
D. J., & Ren, K. (2011). Scalable SPARQL querying of large RDF graphs. Proceedings of the VLDB Endowment, 4(11),
1123-1134.
Husain, M.,
McGlothlin, J., Masud, M. M., Khan, L., & Thuraisingham, B. M. (2011).
Heuristics-based query processing for large RDF graphs using cloud computing. IEEE transactions on knowledge and data
engineering, 23(9), 1312-1327.
Husain, M. F.,
Khan, L., Kantarcioglu, M., & Thuraisingham, B. (2010). Data intensive query processing for large
RDF graphs using cloud computing tools. Paper presented at the Cloud
Computing (CLOUD), 2010 IEEE 3rd International Conference on.
Inukollu, V. N.,
Arsi, S., & Ravuri, S. R. (2014). Security Issues Associated with Big Data
in Cloud Computing. International Journal
of Network Security & Its Applications, 6(3), 45.
doi:10.5121/ijnsa.2014.6304
Jinquan, D., Jie,
H., Shengsheng, H., Yan, L., & Yuanhao, S. (2012). The Hadoop Stack: New
Paradigm for Big Data Storage and Processing. Intel Technology Journal, 16(4), 92-110.
Khan, N., Yaqoob,
I., Hashem, I. A. T., Inayat, Z., Ali, M. W. K., Alam, M., . . . Gani, A.
(2014). Big Data: Survey, Technologies, Opportunities, and Challenges. The Scientific World Journal, 1-18.
doi:10.1155/2014/712826
Konstantinou, N.,
Spanos, D.-E., Stavrou, P., & Mitrou, N. (2010). Technically approaching
the semantic web bottleneck. International
Journal of Web Engineering and Technology, 6(1), 83-111.
Krishnan, K.
(2013). Data warehousing in the age of
big data: Newnes.
Lee, K.-H., Lee,
Y.-J., Choi, H., Chung, Y. D., & Moon, B. (2012). Parallel data processing
with MapReduce: a survey. ACM SIGMOD
Record, 40(4), 11-20.
Mishra, B. S. P.,
Dehuri, S., & Kim, E. (2016). Techniques
and Environments for Big Data Analysis: Parallel, Cloud, and Grid Computing
(Vol. 17): Springer.
Modoni, G. E.,
Sacco, M., & Terkaj, W. (2014). A
survey of RDF store solutions. Paper presented at the Engineering,
Technology and Innovation (ICE), 2014 International ICE Conference on.
Myung, J., Yeon,
J., & Lee, S.-g. (2010). SPARQL basic
graph pattern processing with iterative MapReduce. Paper presented at the Proceedings
of the 2010 Workshop on Massive Data Analytics on the Cloud.
Neumann, T.,
& Weikum, G. (2008). RDF-3X: a RISC-style engine for RDF. Proceedings of the VLDB Endowment, 1(1),
647-659.
Nicolaidis, I.,
& Iniewski, K. (2017). Building
Sensor Networks: CRC Press.
Polato, I., Ré,
R., Goldman, A., & Kon, F. (2014). A comprehensive view of Hadoop
research—A systematic literature review. Journal
of Network and Computer Applications, 46, 1-25.
Przyjaciel-Zablocki,
M., Schätzle, A., Hornung, T., & Lausen, G. (2011). Rdfpath: Path query processing on large rdf graphs with mapreduce.
Paper presented at the Extended Semantic Web Conference.
Przyjaciel-Zablocki,
M., Schätzle, A., Skaley, E., Hornung, T., & Lausen, G. (2013). Map-side merge joins for scalable SPARQL BGP
processing. Paper presented at the Cloud Computing Technology and Science
(CloudCom), 2013 IEEE 5th International Conference on.
Punnoose, R.,
Crainiceanu, A., & Rapp, D. (2012). Rya:
a scalable RDF triple store for the clouds. Paper presented at the
Proceedings of the 1st International Workshop on Cloud Intelligence.
Punnoose, R.,
Crainiceanu, A., & Rapp, D. (2015). SPARQL in the cloud using Rya. Information Systems, 48, 181-195.
Sakr, S., &
Gaber, M. (2014). Large Scale and big
data: Processing and Management: CRC Press.
Schätzle, A.,
Przyjaciel-Zablocki, M., Hornung, T., & Lausen, G. (2013). PigSPARQL: a SPARQL query processing
baseline for big data. Paper presented at the Proceedings of the 12th
International Semantic Web Conference (Posters & Demonstrations
Track)-Volume 1035.
Schätzle, A.,
Przyjaciel-Zablocki, M., Neu, A., & Lausen, G. (2014). Sempala: interactive SPARQL query processing on hadoop. Paper
presented at the International Semantic Web Conference.
Sun, Y., &
Jara, A. J. (2014). An extensible and active semantic model of information
organizing for the Internet of Things. Personal
and Ubiquitous Computing, 18(8), 1821-1833. doi:10.1007/s00779-014-0786-z
Tian, Y., Du, J.,
Wang, H., Ni, Y., & Yu, Y. (2012). Hadooprdf: A scalable rdf data analysis
system. 8th ICIC, 633-641.
Tiwana, A., &
Balasubramaniam, R. (2001). Integrating knowledge on the web. IEEE Internet Computing, 5(3), 32-39.
White, T. (2012-important-buildingblocks).
Hadoop: The definitive guide: ”
O’Reilly Media, Inc.”.
Yang, H.-c.,
Dasdan, A., Hsiao, R.-L., & Parker, D. S. (2007). Map-reduce-merge: simplified relational data processing on large
clusters. Paper presented at the Proceedings of the 2007 ACM SIGMOD
international conference on Management of data.
Zeng,
K., Yang, J., Wang, H., Shao, B., & Wang, Z. (2013). A distributed graph engine for web scale RDF data. Paper presented
at the Proceedings of the VLDB Endowment.
Hadoop was developed by Yahoo and Apache to run jobs
in hundreds of terabytes of data (Yan, Yang, Yu, Li, & Li, 2012). A
various large corporation such as Facebook, Amazon have used Hadoop as
it offers high efficiency, high scalability, and high reliability (Yan et al., 2012). Hadoop has faced various
limitation such as low-level programming paradigm and schema, strictly batch
processing, time skew and incremental computation (Alam & Ahmed, 2014). The incremental computation is
regarded to be one of the major shortcomings of Hadoop technology (Alam & Ahmed, 2014). The efficiency on handling
incremented data is at the expense of losing the incompatibility with
programming models which are offered by
non-incremental systems such as MapReduce, which requires the implementation of
incremental algorithms and increasing the
complexity of the algorithm and the code (Alam & Ahmed, 2014). The caching technique is
proposed by (Alam & Ahmed, 2014) as a solution. This caching
solution will be at three levels; the Job, the Task and the Hardware (Alam & Ahmed, 2014).
Incoop is another solution proposed by (Bhatotia, Wieder, Rodrigues, Acar, &
Pasquin, 2011). The Incoop proposed solution is to extend
the open-source implementation of Hadoop of MapReduce programming paradigm to
run unmodified MapReduce program in an incremental method (Bhatotia et al., 2011; Sakr & Gaber,
2014). Incoop allows programmers to increment the
MapReduce programs automatically without any modification to the code (Bhatotia et al., 2011; Sakr & Gaber,
2014). Moreover, information about the previously
executed MapReduce tasks are recorded by Incoop to be reused in subsequent
MapReduce computation when possible (Bhatotia et al., 2011; Sakr & Gaber,
2014).
The Incoop is not a perfect solution, and it has some shortcomings which are
addressed by (Sakr & Gaber, 2014; Zhang, Chen,
Wang, & Yu, 2015). Some enhancements are implemented to Incoop
to include incremental HDFS called Inc-HDFS, Contraction Phase, and “Memoization-aware
Scheduler” (Sakr & Gaber, 2014). The Inc-HDFS provides the delta
technique in the inputs of two consecutive job runs and splits the input based
on the contents where the compatibility with HDFS is maintained. The
Contraction phase is a new phase in the MapReduce framework consisting of
breaking up the Reduce tasks into smaller sub-computation
forming an inverted tree allowing the small portion of the input changes
to the path from the corresponding leaf
to the root to be computed (Sakr & Gaber, 2014). The Memoization-aware Scheduler
is a modified version of the scheduler of
Hadoop taking advantage of the locality of memorized results (Sakr & Gaber, 2014).
Another solution called i2MapReduce proposed by (Zhang et al., 2015) which was compared to Incoop by (Zhang et al., 2015). The i2MapReduce does
not perform the task-level computation but rather
a key-value pair level incremental processing. This solution also supports more complex
iterative computation, which is used in data mining and reduces the I/O
overhead by applying various techniques (Zhang et al., 2015). IncMR is an enhanced framework
for the large-scale incremental data processing (Yan et al., 2012). It inherits the simplicity of
the standard MapReduce, it does not modify HDFS
and utilizes the same APIs of the MapReduce (Yan et al., 2012). When using IncMR, all programs
can complete incremental data processing without any modification (Yan et al., 2012).
In conclusion, various efforts are exerted by
researchers to overcome the incremental computation limitation of Hadoop, such
as Incoop, Inc-HDFS, i2MapReduce, and IncMR. Each proposed solution is an attempt to
enhance and extend the standard Hadoop to avoid overheads such as I/O, to
increase the efficiency, and without increasing the complexing of the
computation and without causing any modification to the code.
References
Alam, A., & Ahmed, J. (2014). Hadoop architecture and its issues.
Paper presented at the Computational Science and Computational Intelligence
(CSCI), 2014 International Conference on.
Bhatotia, P.,
Wieder, A., Rodrigues, R., Acar, U. A., & Pasquin, R. (2011). Incoop: MapReduce for incremental
computations. Paper presented at the Proceedings of the 2nd ACM Symposium
on Cloud Computing.
Sakr, S., &
Gaber, M. (2014). Large Scale and big
data: Processing and Management: CRC Press.
Yan, C., Yang,
X., Yu, Z., Li, M., & Li, X. (2012). Income:
Incremental data processing based on MapReduce. Paper presented at the
Cloud Computing (CLOUD), 2012 IEEE 5th International Conference on.
Zhang,
Y., Chen, S., Wang, Q., & Yu, G. (2015). i^2MapReduce: Incremental
MapReduce for Mining Evolving Big Data. IEEE
transactions on knowledge and data engineering, 27(7), 1906-1919.