The Impact of XML on MapReduce

Dr. Aly, O.
Computer Science

Introduction

The purpose of this discussion is to discuss and analyze the impact of XML on MapReduce. The discussion addresses the various techniques and approaches proposed by various research studies for processing large XML document using MapReduce.  The XML fragmentation process in the absence and presence of MapReduce is also discussed to provide a better understanding of the complex process of XML large documents using a distributed scalable MapReduce environment.

XML Query Processing Using MapReduce

XML format has been used to store data for multiple applications (Aravind & Agrawal, 2014).  Data needs to be ingested into Hadoop and get analyzed to obtain value from the XML data (Aravind & Agrawal, 2014).  Hadoop ecosystem needs to understand XML when it gets ingested into it and be able to interpret it (Aravind & Agrawal, 2014).  MapReduce is a building block of Hadoop ecosystem.  In the age of Big Data, XML documents are expected to be very large and to be scalable and distributed.  The process of XML queries using MapReduce requires the decomposition of a big XML document and distribute portions to different nodes.  The relational approach is not appropriate as it is expensive because transforming a big XML document into relational database tables can be extremely time consuming and θ-joins among relational table (Wu, 2014).  Various research studies have proposed various approaches to implement native XML query processing algorithms using MapReduce. 

            (Dede, Fadika, Gupta, & Govindaraju, 2011) have discussed and analyzed the scalable and distributed processing of scientific XML data, and how the MapReduce model should be used in XML metadata indexing.   The study has presented performance results using two MapReduce implementations of Apache Hadoop framework and proposed framework of LEMO-MR.  The study has provided an indexing framework that is capable of indexing and efficiently searching large-scale scientific XML datasets.  The framework has been tailed for integration with any framework that uses the MapReduce model to meet the scalability and variety requirements. 

(Fegaras, Li, Gupta, & Philip, 2011) have also discussed and analyzed query optimization in a MapReduce environment. The study has presented a novel query language for large-scale analysis of XML data on a MapReduce environment, called MRQL for MapReduce Query Language, that is designed to capture most common data analysis tasks which can be optimized.   XML data fragmentation is also discussed in this study.  When using a parallel data computation, it expects the input data to be fragmented into small manageable pieces, that determine the granularity of the computation.  In a MapReduce environment, each map worker is assigned a data split that consists of data fragments.  A map worker processes these data one fragment at a time.  The fragment is a relational tuple for relational data that is structured, while for a text file, a fragment can be a single line in the file.  However, for hierarchical data and nested collections data such as XML data, the fragment size and structure depend on the actual application that processes these data.  For instance, XML data may consist of some XML documents, each one containing a single XML element, whose size may exceed the memory capability of a map worker.  Thus, when processing XML data, it is recommended to allow custom fragmentation to meet a wide range of applications requirements.  (Fegaras et al., 2011) have argued that Hadoop provides a simple input format for XML fragmentation based on a single tag name.  XML document data can be split, which may start and end at arbitrary points in the document, even in the middle of tag names.  (Fegaras et al., 2011; Sakr & Gaber, 2014) have indicated that this input format allows reading the document as a stream of string fragments, so that each string will contain a single complete element that has the requested tag name.   XML parser can then be used to parse these strings and convert them to objects.  The fragmentation process is complex because the requested elements may cross data split boundaries and these data splits may reside in different data nodes in the data file system (DFS).  Hadoop DFS is the implicit solution for this problem allowing to scan beyond a data split to the next, subject to some overhead for transferring data between nodes.  (Fegaras et al., 2011) have proposed XML fragmentation technique that was built on top of the existing Hadoop XML input format, providing a higher level of abstraction and better customization.  It is a higher level of abstraction because it constructs XML data in the MRQL data model, ready to be processed by MRQL queries instead of deriving a string for each XML element (Fegaras et al., 2011).

(Sakr & Gaber, 2014) have also discussed briefly another language that has been proposed to support distributed XML processing using the MapReduce framework, called ChuQL.  It presents a MapReduce-based extension for the syntax, grammar, and semantics of XQuery, the standard W3C language for querying the XML documents.  The implementation of ChuQL takes care of distributing the computation to multiple XQuery engines running in Hadoop nodes, as described by one or more ChuQL MapReduce expressions.  The representation of the “word count” example program in the ChuQL language using its extended expressions where the MapReduce expression is used to describe a MapReduce job.  The clauses of input and output are used to read and write onto HDFS respectively. The clauses of rr and rw are used for describing the record reader and writer respectively.  The clauses of the map and reduce represent the standard map and reduce phases of the framework where they process XML values or key/value pairs of XML values to match the MapReduce model which are specified using XQuery expressions. Figure 1 show the word count example in ChQL using XML in distributed environment.

Figure 1.  The Word Count Example Program in ChQL Using XML in Distributed Env. (Sakr & Gaber, 2014).

(Vasilenko & Kurapati, 2014) have discussed and analyzed the efficient processing of XML documents in Hadoop MapReduce environment.  They argued that the most common approach to process XML data is to introduce a custom solution based on the user-defined functions or scripts.  The common choices vary from introducing an ETL process for extracting the data of interest to the transformation of XML into other formats that are natively supported by Hive.   They have addressed a generic approach to handling XML based on Apache Hive architecture.  The researchers have described an approach that complements the existing family of Hive serializers and de-serializers for other popular data formats, such as JSON, and makes it much easier for users to deal with the large XML dataset format.  The implementation included logical splits for the input files each of which is assigned to an individual Mapper. The mapper relies on the implemented Apache Hive XML SerDe to break the split into XML fragments using a specified start/end byte sequences. Each fragment corresponds to a single Hive record.  The fragments are handled by the XML processor to extract value for the record column utilizing specified XPath queries.  The reduce phase was not required in this implementation (Vasilenko & Kurapati, 2014). 

            (Wu, 2014) have discussed and analyzed the partitioning XML documents and distributing XML fragments into different compute nodes, which can introduce high overhead in XML fragment transferring from one node to another during the MapReduce process execution.  The researchers have proposed a technique to use MapReduce to distribute labels in inverted lists in a computing cluster so that structural joins can be parallelly performed to process queries.  They have also proposed an optimization technique to reduce the computing space in the proposed framework to improve the performance of query processing.  They have argued that their approaches are different from the current shred and distributed XML document into different nodes in a cluster approach.  The process includes reading and distributing the inverted lists that are required for input queries during the query processing, and their size is much smaller than the size of the whole document.  The process also includes the partition of the total computing space for structural joins so that each sub-space can be handled by one reducer to perform structural joins.  The researchers have also proposed a pruning-based optimization algorithm to improve the performance of their approach.

Conclusion

This discussion has addressed the XML query processing using MapReduce environment.  The discussion has addressed the various techniques and approaches proposed by various research studies for processing large XML document using MapReduce.  The XML fragmentation process in the absence and presence of MapReduce has also been discussed to provide a better understanding of the complex process of XML large documents using a distributed scalable MapReduce environment.

References

Aravind, P. S., & Agrawal, V. (2014). Processing XML data in BigInsights 3.0. Retrieved from https://developer.ibm.com/hadoop/2014/10/31/processing-xml-data-biginsights-3-0/.

Dede, E., Fadika, Z., Gupta, C., & Govindaraju, M. (2011). Scalable and distributed processing of scientific XML data. Paper presented at the Grid Computing (GRID), 2011 12th IEEE/ACM International Conference on.

Fegaras, L., Li, C., Gupta, U., & Philip, J. (2011). XML Query Optimization in Map-Reduce.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Vasilenko, D., & Kurapati, M. (2014). Efficient processing of xml documents in hadoop map reduce.

Wu, H. (2014). Parallelizing structural joins to process queries over big XML data using MapReduce. Paper presented at the International Conference on Database and Expert Systems Applications.

Advanced Processing Techniques for Big Data

Dr. Aly, O.
Computer Science

Abstract

The purpose of this project is to discuss and analyze advanced processing techniques for Big Data.  There are various processing systems such as Iterative Processing, Graph Processing, Stream Processing also known as Event Processing or Real-Time Processing, and Batch Processing.    A MapReduce-based framework such as Hadoop supports the Batch-Oriented Processing.  MapReduce lacks the built-in support for the Iterative Processing which requires parsing datasets iteratively, large Graph Processing, and Stream Processing.  Thus, various models such as Twister, and iMapReduce are introduced to improve the Iterative Processing of the MapReduce, and Surfer, Apache Hama, Pregel, GraphLab for large Graph Processing.  Other models are also introduced to support the Stream Processing such as Aurora, Borealis, and IBM InfoSphere Streams.  This project focuses the discussion and the analysis of the Stream Processing models of Aurora and Borealis.  The discussion and the analysis of Aurora model includes an overview of the Aurora model as Streaming Processing Engine (SPE), followed by the Aurora Framework and the fundamental components of the Aurora topology.  The Query Model of Aurora, which is known as Stream Query Algebra “SQuAI,” supports seven operators constructing the Aurora network and queries for expressing its stream processing requirements.   The discussion and analysis also include the SQuAl and the Query Model, the Run-Time Framework and the Optimization systems to overcome bottlenecks at the network.  The Aurora* and Medusa as Distributed Stream Processing are also discussed and analyzed.  The second SPE is Borealis which is a Distributed SPE.  The discussion and the analysis of the Borealis involved the framework, the query model, and the optimization techniques to overcome bottlenecks at the network.  A comparison between Aurora and Borealis is also discussed and analyzed. 

Keywords: Stream Processing, Event Processing, Aurora, Borealis.

Introduction

            When dealing with Big Data, its different characteristics and attributes such as volume, velocity, variety, veracity, and value must be taken into consideration (Chandarana & Vijayalakshmi, 2014).   Thus, different types of the framework are required to run different types of analytics (Chandarana & Vijayalakshmi, 2014).  The workload of the large-scale data processing has different types of workloads (Chandarana & Vijayalakshmi, 2014).   Organizations deploy a combination of different types of workloads to achieve the business goal (Chandarana & Vijayalakshmi, 2014).  These various types of workloads involve Batch-Oriented Processing, Online-Transaction Processing, Stream Processing, Interactive ad-hoc Query and Analysis (Chandarana & Vijayalakshmi, 2014), and Online Analytical Processing (Erl, Khattak, & Buhler, 2016). 

For the Batch-Oriented Processing, a MapReduce-based framework such as Hadoop can be deployed for recurring tasks such as large-scale Data Mining or Aggregation (Chandarana & Vijayalakshmi, 2014; Erl et al., 2016; Sakr & Gaber, 2014).  For the OLTP such as user-facing e-commerce transactions, the Apache HBase can be deployed (Chandarana & Vijayalakshmi, 2014).  The OLTP system processes transaction-oriented data (Erl et al., 2016).  For the Stream Processing, Storm framework can be deployed to handle stream sources such as social media feeds or sensor data (Chandarana & Vijayalakshmi, 2014).  For the Interactive ad-hoc Query and Analysis, the Apache Drill framework can be deployed (Chandarana & Vijayalakshmi, 2014).  For the OLAP, which form an integral part of Business Intelligence, Data Mining, and Machine Learning, the systems are used for processing data analysis queries (Erl et al., 2016).   

Apache Hadoop framework allows distributed processing for large data sets across clusters of computers using simple programming models (Chandarana & Vijayalakshmi, 2014).   The Apache Hadoop framework involves four major modules; Hadoop Core, Hadoop Distributed Files System (HDFS), Hadoop YARN, and Hadoop Map Reduce.  The Hadoop Core is used as the common utilities which support other modules.  The HDFS module provides high throughput access to application data.  The Hadoop YARN module is for job scheduling and resource management.  The Hadoop MapReduce is for parallel processing of large-scale dataset (Chandarana & Vijayalakshmi, 2014).   

Moreover, there are various processing systems such as Iterative Processing (Schwarzkopf, Murray, & Hand, 2012), Graph Processing, and Stream Processing (Sakr & Gaber, 2014; Schwarzkopf et al., 2012).  The Iterative Processing systems utilize the in-memory caching (Schwarzkopf et al., 2012).   Many data analysis application requires the iterative processing of the data which includes algorithms for text-based search and machine learning.  However, because MapReduce lacks the built-in support for iterative processing which requires parsing datasets iteratively (Zhang, Chen, Wang, & Yu, 2015; Zhang, Gao, Gao, & Wang, 2012), various models such as Twister, HaLoop, and iMapReduce are introduced to improve the iterative processing of the MapReduce (Zhang et al., 2015).   With regard to the Graph Processing, MapReduce is suitable for processing flat data structures, such as vertex-oriented tasks and propagation is optimized for edge-oriented tasks on partitioned graphs. However, to improve the programming models for large graph processing, various models such as Surfer (Chen, Weng, He, & Yang, 2010; Chen et al., 2012), GraphX (Gonzalez et al., 2014), Apache Hama, GoldenOrb, Giraph, Phoebus, GPS (Cui, Mei, & Ooi, 2014), Pregel (Cui et al., 2014; Hu, Wen, Chua, & Li, 2014; Sakr & Gaber, 2014), and GraphLab (Cui et al., 2014; Hu et al., 2014; Sakr & Gaber, 2014).  With regard to the Steam Processing, because MapReduce is design for Batch-Oriented Computation such as log analysis and text processing (Chandarana & Vijayalakshmi, 2014; Cui et al., 2014; Erl et al., 2016; Sakr & Gaber, 2014; Zhang et al., 2015; Zhang et al., 2012), and is not adequate for supporting real-time stream processing tasks (Sakr & Gaber, 2014) various Steam Processing models are introduced such as DEDUCE, Aurora, Borealis, IBM Spade, StreamCloud, Stormy (Sakr & Gaber, 2014), Twitter Storm (Grolinger et al., 2014; Sakr & Gaber, 2014), Spark Streaming, Apache Storm (Fernández et al., 2014; Gupta, Gupta, & Mohania, 2012; Hu et al., 2014; Scott, 2015), StreamMapReduce (Grolinger et al., 2014), Simple Scalable Streaming System (S4) (Fernández et al., 2014; Grolinger et al., 2014; Gupta et al., 2012; Hu et al., 2014; Neumeyer, Robbins, Nair, & Kesari, 2010-639), and IBM InfoSphere Streams (Gupta et al., 2012).

            The project focuses on two models of the Stream Processing.  The discussion and the analysis will be on Aurora stream processing systems and Borealis stream processing systems.  The discussion and the analysis will also address their characteristics, architectures, performance optimization capability, and scalability.  The project will also discuss and analyze the performance bottlenecks, the cause of such bottlenecks and the strategies to remove these bottlenecks.  The project begins with a general discussion on the Stream Processing.

Stream Processing Engines

            Stream Processing is defined by (Manyika et al., 2011) as technologies designed to process large real-time streams of event data.  The Stream Processing allows applications such as algorithms trading in financial services, RFID even processing applications, fraud detection (Manyika et al., 2011; Scott, 2015), process monitoring, and location-based services in telecommunications (Manyika et al., 2011).  Stream Processing reflects the Real-Time Streaming, and also known as “Event Stream Processing” (Manyika et al., 2011).   The “Event Stream Processing” is also known as “Streaming Analytics” which is used to process customer-centric data “on the fly” without the need for long-term storage (Spiess, T’Joens, Dragnea, Spencer, & Philippart, 2014).

            In the Real-Time mode, the data is processed in-memory because it is captured before it gets persisted to the disk (Erl et al., 2016).  The response time ranges from a sub-second to under a minute (Erl et al., 2016).  The Real-Time mode reflects the velocity feature and characteristics of Big Data datasets (Erl et al., 2016).  When Big Data is processed using the Real-Time or Even Stream Processing, the data arrives continuously in a stream, or at an interval in events (Erl et al., 2016).  The individual data for streaming is small. However, the continuous nature leads to such streamed data result in very large datasets (Erl et al., 2016; Gradvohl, Senger, Arantes, & Sens, 2014).   Real-Time mode also involves “Interactive Mode” (Erl et al., 2016). The “Interactive Mode” refers to the Query Processing in the Real-Time (Erl et al., 2016). 

            The systems of the Event Stream Processing (ESP) are designed to provide high-performance analysis of streams with low latency (Gradvohl et al., 2014).  The first Event Stream Processing (ESP) systems, which were developed in the early 2000s, include Aurora, Borealis, STREAM, TelegraphCQ, NiagaraCQ, and Cougar (Gradvohl et al., 2014).  During that time, the systems were centralized systems namely running on a single server aiming to overcome the issues of stream processing by the traditional database (Gradvohl et al., 2014).   Tremendous efforts have been exerted to enhance and improve the data stream processing from centralized stream processing systems to stream processing engines with the ability to distribute queries among a cluster of nodes (Sakr & Gaber, 2014).  This next discussion will focus on two of the scalable processing of streaming data; Aurora and Borealis. 

  1. Aurora Streaming Processing Engine

Aurora was introduced through a project effort from Brandeis University, Brown University, and MIT (Abadi et al., 2003; Sakr & Gaber, 2014).   The prototype of Aurora implementation was introduced in 2003 (Abadi et al., 2003; Sakr & Gaber, 2014).  The GUI interface of Aurora is based on Java allowing construction and execution of Aurora networks, which supports the construction of arbitrary Aurora networks and query (Abadi et al., 2003; Sakr & Gaber, 2014).  The Aurora system is described as a processing model to manage data streams for monitoring applications, which are distinguished substantially from the traditional business data processing (Abadi et al., 2003; Sakr & Gaber, 2014).   The main aim of the monitoring applications is to monitor continuous streams of data (Abadi et al., 2003; Sakr & Gaber, 2014).  As an example of these Monitoring, Applications is the military applications which monitor readings from sensors worn by soldiers such as blood pressure, heart rate, position, and so forth.  Another example of these Monitoring Applications includes the financial analysis applications which monitor the stock data streams reported from various stock exchanges (Abadi et al., 2003; Sakr & Gaber, 2014).  The Tracking Applications which monitor the location of large numbers of the object are other types of Monitoring Applications (Abadi et al., 2003; Sakr & Gaber, 2014). 

Due to the nature of the Monitoring Applications, they can benefit from the Database Management System (DBMS) because of the high volume of monitored data and the requirement of the query for these applications (Abadi et al., 2003; Sakr & Gaber, 2014).  However, the existing DBMS systems are unable to fully support such applications because DBMS systems target Business Applications and not Monitoring Applications (Abadi et al., 2003; Sakr & Gaber, 2014).  DBMS gets its data from humans issuing transactions, while the Monitoring Applications get their data from external sources such as sources (Abadi et al., 2003; Sakr & Gaber, 2014).  The role of DBMS when supporting the Monitoring Applications is to detect and alert humans of any abnormal activities (Abadi et al., 2003; Sakr & Gaber, 2014).  This model is described as DBMS-Active, Human-Passive (DAHP) Model (Abadi et al., 2003; Sakr & Gaber, 2014).  This model is different from the traditional DBMS model which is described as Human-Active, DBMS-Passive (HADP) Model, where humans initiate queries and transactions on the DBMS passive repository (Abadi et al., 2003; Sakr & Gaber, 2014).

Besides, the Monitoring Applications require not only the latest value of the object but also the historical values (Abadi et al., 2003; Sakr & Gaber, 2014).   The Monitoring Applications are trigger-oriented applications to send the alert message when abnormal activities are detected (Abadi et al., 2003; Sakr & Gaber, 2014).  Besides, the Monitoring Applications requires approximate answers due to the nature of the data stream processing where data can get lost or omit for processing reasons.  The last characteristic of the Monitoring Applications involves the Real-Time requirement and the Quality-of-Service (QoS).  Table 1 summarizes these five major characteristics of the Monitoring Applications, for which Aurora systems are designed to manage data streams.

Table 1: Monitoring Applications Characteristics.

1.1 Aurora Framework

The traditional DBM could not be used to implement these Monitoring Applications with these challenging characteristics (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  The prevalent requirements of these Monitoring Applications are the data and information streams, triggers, imprecise data, and real-time (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  Thus, Aurora systems are designed to support these Monitoring Applications with these challenging characteristics and requirements (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  The underlying concept of the Aurora System Model is to process the incoming data streams as an application administrator and use boxes and arrows paradigm as a data-flow system, where the tuples flow through a loop-free, directed graph of processing operations (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  The output streams are presented to applications which get programmed to deal with the asynchronous tuples in an output stream (Abadi et al., 2003; Sakr & Gaber, 2014).   The Aurora System Model also maintains historical storage to support ad-hoc queries (Abadi et al., 2003; Sakr & Gaber, 2014).  The Aurora systems handle data from a variety of sources such as computer programs which generate values at regular or irregular intervals or hardware sensors (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  Figure 1 illustrates the Aurora System Model reflecting the input data stream, the operator boxes, the continuous and ad-hoc queries, and the output to applications.  

Figure 1:  Overview of Aurora System Model and Architecture.  Adapted from (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).

1.2  Aurora Query Model: SQuAl Using Seven Primitive Operations

            The Aurora Stream Query Algebra (SQuAl) supports seven operators which are used to construct Aurora networks and queries for expressing its stream processing requirements (Abadi et al., 2003; Sakr & Gaber, 2014).  Many of these operations have analogs in the relational query operation.  For instance, the “filter” operator in Aurora Query Algebra, which applies any number of predicates to each incoming tuple, routing the tuples based on the satisfied predicates, is like the relational operator “select” (Abadi et al., 2003; Sakr & Gaber, 2014).   The “aggregate” operators in Aurora Query Algebra computes stream aggregation to address the fundamental push-based nature of data streams, applying a function such as a moving average across a window of values in a stream (Abadi et al., 2003; Sakr & Gaber, 2014).  The windowed operations are required when the data is stale or time imprecise (Abadi et al., 2003; Sakr & Gaber, 2014).  The application administrator in the Aurora System Model can connect the output of one box to the input of several others which implements the “implicit split” operations rather than the “explicit split” of the relational operations (Abadi et al., 2003; Sakr & Gaber, 2014).  Besides, the Aurora System Model contains an “explicit union” operation where two streams can be put together (Abadi et al., 2003; Sakr & Gaber, 2014).   The Aurora System Model also represents a collection of streams with a common schema, called “Arcs” (Abadi et al., 2003; Sakr & Gaber, 2014).   The Arc does not have any specific number of streams which makes it easier to have streams come and goes without any modifications to the Aurora network (Abadi et al., 2003; Sakr & Gaber, 2014).

            In Aurora Query Model, the stream is an append-only sequence of tuples with uniform schema, where each tuple in a stream has a timestamp for QoS calculations (Abadi et al., 2003; Sakr & Gaber, 2014).  When using Aurora Query Model, there is no arrival order assumed which help in gaining latitude for producing outputs out of order for serving high-priority tuples first (Abadi et al., 2003; Sakr & Gaber, 2014).   Moreover, this no arrival order assumption also helps in redefining the windows for attributes, and in merging multiple streams (Abadi et al., 2003; Sakr & Gaber, 2014).   Some operators are described as “order-agnostic” such as such as Filter, Map, and Union.  Some other operators are described as “order-sensitive” such as BSort, Aggregate, Join, and Resample where they can only be guaranteed to execute with finite buffer space and in a finite time if they can assume some ordering over their input streams (Abadi et al., 2003; Sakr & Gaber, 2014).  Thus, the order-sensitive operators require order specification arguments which indicate the arrival order of the expected tuple (Abadi et al., 2003; Sakr & Gaber, 2014).   

            The Aurora Query Model supports three main operations modes: (1) the continuous queries of the real-time processing, (2) the views, and (2) the ad-hoc queries (Abadi et al., 2003; Sakr & Gaber, 2014).  These three operations modes utilize the same conceptual building blocks technique processing flows based on QoS specifications (Abadi et al., 2003; Sakr & Gaber, 2014).  In Aurora Query Model, each output is associated with two-dimensional QoS graphs which specify the utility of the output with regard to several performance-related and quality-related attributes (Abadi et al., 2003; Sakr & Gaber, 2014).   The stream-oriented operators which constitute the Aurora network and queries are designed to operate in a data flow mode where data elements are processed as they appear on the input (Abadi et al., 2003; Sakr & Gaber, 2014).

1.3 Aurora Run-Time Framework and Optimization

The main purpose of the Aurora run-time operations is to process data flows through a potentially large workflow diagram (Abadi et al., 2003; Sakr & Gaber, 2014).  The Aurora Run-Time Architecture involves five main techniques: (1) the QoS data structure, (2) the Aurora Storage Management (ASM), (3) the Run-Time Scheduling (RTS), (4) the Introspection, and (5) the Load Shedding.   

The QoS is a multi-dimensional function which involves response times, tuple drops, and values produced (Abadi et al., 2003; Sakr & Gaber, 2014).  The ASM is designed to store all tuples required by the Aurora network.  The ASM requires two main operations; one to manage storage for the tuples being passed through an Aurora network, and the second operations must maintain extra tuple storage which may be required at the connection point.  Thus, the ASM involves two main management operations: (1) the Queue Management, and (2) the Connection Point Management (Abadi et al., 2003; Sakr & Gaber, 2014).  

The RTS in Aurora is challenging because of the need to simultaneously address several issues such as large system scale, real-time performance requirements, and dependencies between box executions (Abadi et al., 2003; Sakr & Gaber, 2014).  Besides, the processing of tuple in Aurora spans many scheduling and execution steps, where the input tuple goes through many boxes before potentially contributing to an output stream, which may require secondary storage (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora systems reduce the overall processing costs by using two main non-linearities when processing tuples: “Interbox Non-Linearity,” and the “Intrabox Non-Linearity” techniques.  The Aurora systems take advantages of the Non-Linearity technique in both the Interbox and the Intrabox tuple processing through the “Train Scheduling” (Abadi et al., 2003; Sakr & Gaber, 2014).  The “Train Scheduling” is a set of scheduling heuristics which attempt (1) to have boxes queue as many tuples as possible without processing, thus generating long tuple trains, (2) to process complete trains at once, thus using the “Intrabox Non-Linearity” technique, and (3) to pass them to subsequent boxes without having to go to disk, thus employing the “Interbox Non-linearity” technique (Abadi et al., 2003; Sakr & Gaber, 2014).  The primary goal of the “Train Scheduling” is to minimize the number of I/O operations performed per tuple.  The secondary goal of the “Train Scheduling is to minimize the number of box calls made per tuple. With regard to the Introspection technique, Aurora systems employ static and dynamic or run-time introspection techniques to predict and detect overload situation (Abadi et al., 2003; Sakr & Gaber, 2014).  The purpose of the static introspection technique is to determine if the hardware running the Aurora network is sized correctly.  The dynamic analysis which is based on the run-time introspection technique uses timestamps for all tuples (Abadi et al., 2003; Sakr & Gaber, 2014).   With regard to the “Load Shedding”, Aurora systems reduces the volume of the tuple processing via the load shedding if an overload is detected as a result of the static or dynamic analysis, by either dropping the tuples or filtering the tuples (Abadi et al., 2003; Sakr & Gaber, 2014).   Figure 2 illustrates the Aurora Run-Time Architecture, adapted from (Abadi et al., 2003; Sakr & Gaber, 2014).

Figure 2:  Aurora Run-Time Framework. Adapted from (Abadi et al., 2003).

The Aurora optimization techniques involve two main optimization systems: (1) the dynamic continuous query optimization, and (2) the ad-hoc query optimization. The dynamic continuous query optimization involves the inserting projections, the combining boxes, and the reordering boxes optimization techniques (Abadi et al., 2003). The ad-hoc query optimization involves the historical information because Aurora semantics require the historical sub-network to be run first.  This historical information is organized in a B-tree data model (Abadi et al., 2003).  The initial boxes in an ad-hoc query can pull information from the B-tree associated with the corresponding connection point (Abadi et al., 2003).  When the historical operation is finished, the Aurora optimization technique switches the implementation to the standard push-based data structures and continues processing in the conventional mode (Abadi et al., 2003).

1.4 Aurora* and Medusa for Distributed Stream Processing

The Aurora System is a centralized stream processor.   However, in (Cherniack et al., 2003).   Aurora* and Medusa are proposed for distributed processing.  Several architectural issues must be addressed for building a large-scale distributed version of a stream processing system such as Aurora.  In (Cherniack et al., 2003), the problem is divided into two categories:  intra-participant distribution, and inter-participant distribution.  The intra-participant distribution involves small-scale distribution within one administrative domain which can be handled by the proposed model of Aurora* (Cherniack et al., 2003).  The inter-participant distribution involves large-scale distribution across administrative boundaries, which is handled by the proposed model of Medusa (Cherniack et al., 2003).

2. Borealis Streaming Processing Engine

Borealis is described as the second generation of the Distributed SPE which also got developed at Brandeis University, Brown University and MIT (Abadi et al., 2005; Sakr & Gaber, 2014).  The Borealis streaming model inherits the core functionality of the stream processing from Aurora model, and the core functionality of the distribution from Medusa model (Abadi et al., 2005; Sakr & Gaber, 2014).   The Borealis model is an expansion and extension of both models to provide more advanced capabilities and functionalities which are commonly required by newly-emerging stream processing applications (Abadi et al., 2005; Sakr & Gaber, 2014).   Borealis is regarded to be the successor to Aurora (Abadi et al., 2005; Sakr & Gaber, 2014).

The second generation of the SPE has three main requirements which are critical and at the same time challenging.  The first requirement involves the “Dynamic Revision of Query Results” (Abadi et al., 2005; Sakr & Gaber, 2014).  Applications are forced to live with imperfect results because corrects or updates to previously processed data are only available after the fact unless the system has techniques to revise its processing and results to take into account newly available data or updates (Abadi et al., 2005; Sakr & Gaber, 2014).  The second requirement for the second generation of the SPE involves “Dynamic Query Modification,” which allows runtime with low overhead, fast and automatic modification (Abadi et al., 2005; Sakr & Gaber, 2014).  The third requirement for the second generation of SPE involves “Flexible and Highly-Scalable Optimization,” where the optimization problem will be more balanced between the sensor-heavy and server-heavy optimization.  The more flexible optimization structure is needed to deal with a large number of devices and perform cross-network sensor-heavy server-heavy resource management and optimization (Abadi et al., 2005; Sakr & Gaber, 2014). However, this requirement for such optimization framework has two additional challenges.  The first challenge is the ability to simultaneously optimize different QoS metrics such as processing latency, throughput, or sensor lifetime (Abadi et al., 2005; Sakr & Gaber, 2014).  The second challenge of such flexible optimization structure and framework is the ability to perform optimizations at different levels of granularity at the node level, sensor network level, a cluster of sensors and server level and so forth (Abadi et al., 2005; Sakr & Gaber, 2014).  These advanced challenges, capabilities, and requirements for the second-generation of SPE are added to the classical architecture of the SPE to form and introduce Borealis framework (Abadi et al., 2005; Sakr & Gaber, 2014).

2.1 The Borealis Framework

            The Borealis framework is a distributed stream processing engine where the collection of continuous queries submitted to Borealis can be seen as a giant network of operators whose processing is distributed to multiple sites (Abadi et al., 2005; Sakr & Gaber, 2014). There is a sensor proxy interface which acts as another Borealis site (Abadi et al., 2005; Sakr & Gaber, 2014).  The sensor networks can participate in query processing behind that sensor proxy interface (Abadi et al., 2005; Sakr & Gaber, 2014).

            Borealis server runs on each node with Global Catalog (GC), High Availability (HA) module, Neighborhood Optimizer (NHO), Local Monitor (LM), Admin, Query Processor (QP)at the top and meta-data, control and data at the bottom of the framework. The GC can be centralized or distributed across a subset of processing nodes, holding information about the complete query network and the location of all query fragments (Abadi et al., 2005; Sakr & Gaber, 2014).  The HA modules monitor each node to handle any failure (Abadi et al., 2005; Sakr & Gaber, 2014).  The NHO utilizes the local information and other information from other NHOs to improve the load balance between the nodes (Abadi et al., 2005; Sakr & Gaber, 2014).  The LM collects performance-related statistics, while the local system reports to the local optimizer as well as the NHOs (Abadi et al., 2005; Sakr & Gaber, 2014).  The QP is the core component of the Borealis’ framework.  The actual execution of the query is implemented in the QP (Abadi et al., 2005; Sakr & Gaber, 2014).  The QP, which is a single site processor, receives the input data streams, and the result is pulled through the I/O Queue, routing the tuples to and from remote Borealis node and clients (Abadi et al., 2005; Sakr & Gaber, 2014).  The Admin module controls the QP, and issues system control messages (Abadi et al., 2005; Sakr & Gaber, 2014).  These messages are pushed to the Local Optimizer (LO), which communicates with Run-Time major components of the QP to enhance the performance.  These Run-Time major components of the Borealis include (1) the Priority Scheduler, (2) Box Processors, and (3) Load Shedder.  The Priority Scheduler determines the order of box execution based on the priority of the tuples.   The Box Processors can change the behavior during the run-time based on the messages received from the LO.  The Load Shedder discards the low-priority tuples when the node is overloaded (Abadi et al., 2005; Sakr & Gaber, 2014).  The Storage Manager is part of the QP and responsible for storing and retrieving data which flows through the arcs of the local query diagram.  The Local Catalog is another component of the QP to store the query diagram description and metadata and is accessible by all components.  Figure 3 illustrates Borealis’ framework, adapted from (Abadi et al., 2005; Sakr & Gaber, 2014).

Figure 3.  Borealis’ Framework, adapted from (Abadi et al., 2005; Sakr & Gaber, 2014).

2.2 Borealis’ Query Model and Comparison with Aurora’s Query Model

            Borealis inherits the Aurora Model of boxes-and-arrows to specify the continuous queries, where the boxes reflect the query operators and the arrows reflect the data flow between the boxes (Abadi et al., 2005; Sakr & Gaber, 2014).  Borealis extends the data model of Aurora by supporting three types of messages of the insertion, the deletion, and the replacement (Abadi et al., 2005; Sakr & Gaber, 2014).  The Borealis’ queries are an extended version of Aurora’s operators to support revision messages (Abadi et al., 2005; Sakr & Gaber, 2014).   The query model of Borealis supports the modification of the box semantic during the runtime (Abadi et al., 2005; Sakr & Gaber, 2014).  The QoS in Borealis is like in Aurora forms the basis of resource management decision.  However, while each query output is provided with QoS function in Aurora’ model, Borealis allows QoS to be predicted at any point in the data flow (Abadi et al., 2005; Sakr & Gaber, 2014).  Thus, Borealis supports a Vector of Metrics for supplied messages to allow such prediction of QoS.

In the context of the query result revision, Borealis supports “replayable” query diagram and the processing scheme revision.  While Aurora has an append-only model where a message cannot be modified once it is placed on a stream providing an approximate or imperfect result, the Borealis’ model supports the modification of messages to processes the query intelligently and provide correct query results (Abadi et al., 2005; Sakr & Gaber, 2014).   The query diagram must be replayable when messages are revised and modified because the processing of the modified message must replay a portion of the past with the modified value (Abadi et al., 2005; Sakr & Gaber, 2014).   This replaying process is also useful for recovery and high availability (Abadi et al., 2005; Sakr & Gaber, 2014). This dynamic revision with the replaying process can add more overhead.  Thus, the “closed” model is used to generates deltas to show the effects of the revisions instead of the entire result.

In the context of the queries modification, Borealis provides online modification of continuous queries by supporting the control lines, and the time travel features.  The control lines extend the basic query model of Aurora to change operator parameters and operators themselves during the run-time (Abadi et al., 2005; Sakr & Gaber, 2014).  The Borealis’ boxes contain the standard data input lines and special control lines which carry messages with revised box parameters and new box function (Abadi et al., 2005; Sakr & Gaber, 2014).   Borealis provides a new function called “Bind” to bind the new parameters to free variables within a function definition, which will lead to a new function to be created (Abadi et al., 2005; Sakr & Gaber, 2014).   The Aurora’s connections points are leveraged to enable the time travel in Borealis.  The original purpose of the connection points was to support ad-hoc queries, which can query historical and run-time data.  This concept is extended in Borealis model to include connection point views to enable time travel applications, ad-hoc queries and the query diagram to access the connection points independently and in parallel (Abadi et al., 2005; Sakr & Gaber, 2014).   The connection point views include two operations to enable the time travel:  the replay operation, and the undo operation.  

2.3  The Optimization Model of Borealis

Borealis has an optimizer framework to optimize processing across a combined sensor and server network, to deal with the QoS in stream-based applications, and to support scalability, size-wise and geographical stream-based applications.   The optimization model contains multiple collaborating monitoring and optimization components.  The monitoring components include the local monitor at every site and end-point monitor at output sites.  The optimization components include the global optimizer, neighborhood optimizer, and local optimizer (Abadi et al., 2005).   While Aurora evaluated the QoS only at outputs and had a difficult job inferring QoS at upstream nodes, Borealis can evaluate the predicted-QoS score function on each message by utilizing the values of the Metrics Vector (Abadi et al., 2005).   Borealis utilizes Aurora’ concept of train scheduling of boxes and tuples to reduce the scheduling overhead.  While Aurora processes the message in order of arrival, Borealis contains box scheduling flexibility which allows processing message out of order because the revision technique can be used to process them later as insertions (Abadi et al., 2005).  Borealis offers a superior load shedding technique than Aurora’s technique (Abadi et al., 2005).  The Load Shedder in Borealis detects and handle overload situations by adding the “drop” operators to the processing network (Ahmad et al., 2005).  The “drop” operator aims to filter out messages, either based on the value of the tuple or in a randomized fashion, meaning out of order to overcome the overload (Ahmad et al., 2005).  The higher quality outputs can be achieved by allowing nodes in a chain to coordinate in choosing where and how much load to shed.  Distributed load shedding algorithms are used to collect local statistics from nodes and pre-computes potential drop plans at the compilation time (Ahmad et al., 2005).  Figure 4 illustrates the Optimization Components of Borealis, adapted from (Abadi et al., 2005).

Figure 4.  The Optimization Components of Borealis, adapted from (Abadi et al., 2005).

            Borealis provides a fault-tolerance technique in a distributed SPE such as replication, running multiple copies of the same query network on distinct processing nodes (Abadi et al., 2005; Balazinska, Balakrishnan, Madden, & Stonebraker, 2005).  When a node experiences a failure on one of its input streams, the node tries to find an alternate upstream replica.  All replicas must be consistent.  To ensure such consistency, data-serializing operator “SUnion” is used to take multiple streams as input and produces one output stream with deterministically ordered tuples, to ensure all operators of the replica processing the same input in the same order (Abadi et al., 2005; Balazinska et al., 2005).   To provide high availability, each SPE ensures that input data is processed and results forwarded within a user-specified time threshold of its arrival (Abadi et al., 2005; Balazinska et al., 2005).  When the failure is corrected, each SPE which experienced tentative data reconciles its state and stabilizes its output by replacing the previously tentative output with stable data tuples forwarded to downstream clients, reconciling the state of SPE based on checkpoint/redo, undo/redo and the revision tuples new concept (Abadi et al., 2005; Balazinska et al., 2005).

Conclusion

                This project discussed and analyzed advanced processing of Big Data.  There are various processing systems such as Iterative Processing, Graph Processing, Stream Processing also known as Event Processing or Real-Time Processing, and Batch Processing.    A MapReduce-based framework such as Hadoop supports the Batch-Oriented Processing.  MapReduce also lacks the built-in support for the Iterative Processing which requires parsing datasets iteratively, large Graph Processing, and Stream Processing.  Thus, various models such as Twister, HaLoop, and iMapReduce are introduced to improve the Iterative Processing of the MapReduce. With regard to the Graph Processing, MapReduce is suitable for processing flat data structures, such as vertex-oriented tasks and propagation is optimized for edge-oriented tasks on partitioned graphs.  However, various models are introduced to improve the programming models for large graph processing such as Surfer, Apache Hama, GoldenOrb, Giraph, Pregel, GraphLab.  With regard to the Stream Processing, various models are also introduced to overcome the limitation of the MapReduce framework which deals only with batch-oriented processing.  These Stream Processing models include Aurora, Borealis, IBM Space, StreamCloud, Stormy, Twitter Storm, Spark Streaming, Apache Storm, StreamMapReduce, Simple Scalable Streaming System (S4), and IBM InfoSphere Streams.  This project focused the discussion and the analysis of the Stream Processing models of Aurora and Borealis.  The discussion and the analysis of Aurora model included an overview of the Aurora model as Streaming Processing Engine (SPE), followed by the Aurora Framework and the fundamental components of the Aurora topology.  The Query Model of Auroral, which is known as Streak Query Algebra “SQuAI,” supports seven operators constructing the Aurora network and queries for expressing its stream processing requirements.   The discussion and analysis also included the “SQuAl” and the Query Model, the Run-Time Framework and the Optimization systems.  The Aurora* and Medusa as Distributed Stream Processing are also discussed and analyzed.  The second SPE is Borealis which is a Distributed SPE.  The discussion and the analysis of the Borealis involved the framework, the query model, and the optimization technique.  Borealis is an expansion to Aurora’s SPE to include and support features which are required for the Distributed Real-Time Streaming.  The comparison between Aurora and Borealis is also discussed and analyzed at all levels from the network, query model, and the optimization techniques. 

References

Abadi, D. J., Ahmad, Y., Balazinska, M., Cetintemel, U., Cherniack, M., Hwang, J.-H., . . . Ryvkina, E. (2005). The Design of the Borealis Stream Processing Engine.

Abadi, D. J., Carney, D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., . . . Zdonik, S. (2003). Aurora: a new model and architecture for data stream management. The VLDB Journal, 12(2), 120-139.

Ahmad, Y., Berg, B., Cetintemel, U., Humphrey, M., Hwang, J.-H., Jhingran, A., . . . Tatbul, N. (2005). Distributed operation in the Borealis stream processing engine. Paper presented at the Proceedings of the 2005 ACM SIGMOD international conference on Management of data.

Balazinska, M., Balakrishnan, H., Madden, S., & Stonebraker, M. (2005). Fault-tolerance in the Borealis distributed stream processing system. Paper presented at the Proceedings of the 2005 ACM SIGMOD international conference on Management of data.

Carney, D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., Seidman, G., . . . Stonebraker, M. (2002). Monitoring streams—a new class of data management applications. Paper presented at the VLDB’02: Proceedings of the 28th International Conference on Very Large Databases.

Chandarana, P., & Vijayalakshmi, M. (2014, 4-5 April 2014). Big Data analytics frameworks. Paper presented at the Circuits, Systems, Communication and Information Technology Applications (CSCITA), 2014 International Conference on.

Chen, R., Weng, X., He, B., & Yang, M. (2010). Large graph processing in the cloud. Paper presented at the Proceedings of the 2010 ACM SIGMOD International Conference on Management of data.

Chen, R., Yang, M., Weng, X., Choi, B., He, B., & Li, X. (2012). Improving large graph processing on partitioned graphs in the cloud. Paper presented at the Proceedings of the Third ACM Symposium on Cloud Computing.

Cherniack, M., Balakrishnan, H., Balazinska, M., Carney, D., Cetintemel, U., Xing, Y., & Zdonik, S. B. (2003). Scalable Distributed Stream Processing.

Cui, B., Mei, H., & Ooi, B. C. (2014). Big data: the driver for innovation in databases. National Science Review, 1(1), 27-30.

Erl, T., Khattak, W., & Buhler, P. (2016). Big Data Fundamentals: Concepts, Drivers & Techniques: Prentice Hall Press.

Fernández, A., del Río, S., López, V., Bawakid, A., del Jesus, M. J., Benítez, J. M., & Herrera, F. (2014). Big Data with Cloud Computing: an insight on the computing environment, MapReduce, and programming frameworks. Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery, 4(5), 380-409.

Gonzalez, J. E., Xin, R. S., Dave, A., Crankshaw, D., Franklin, M. J., & Stoica, I. (2014). GraphX: Graph Processing in a Distributed Dataflow Framework.

Gradvohl, A. L. S., Senger, H., Arantes, L., & Sens, P. (2014). Comparing distributed online stream processing systems considering fault tolerance issues. Journal of Emerging Technologies in Web Intelligence, 6(2), 174-179.

Grolinger, K., Hayes, M., Higashino, W. A., L’Heureux, A., Allison, D. S., & Capretz, M. A. (2014). Challenges for mapreduce in big data. Paper presented at the Services (SERVICES), 2014 IEEE World Congress on.

Gupta, R., Gupta, H., & Mohania, M. (2012). Cloud computing and big data analytics: what is new from databases perspective? Paper presented at the International Conference on Big Data Analytics.

Hu, H., Wen, Y., Chua, T.-S., & Li, X. (2014). Toward scalable systems for big data analytics: A technology tutorial. IEEE Access, 2, 652-687.

Manyika, J., Chui, M., Brown, B., Bughin, J., Dobbs, R., Roxburgh, C., & Byers, A. H. (2011). Big data: The next frontier for innovation, competition, and productivity.

Neumeyer, L., Robbins, B., Nair, A., & Kesari, A. (2010-639). S4: Distributed stream computing platform. Paper presented at the 2010 IEEE International Conference on Data Mining Workshops.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Schwarzkopf, M., Murray, D. G., & Hand, S. (2012). The seven deadly sins of cloud computing research. Paper presented at the Presented as part of the.

Scott, J. A. (2015). Getting Started with Spark: MapR Technologies, Inc.

Spiess, J., T’Joens, Y., Dragnea, R., Spencer, P., & Philippart, L. (2014). Using Big Data to Improve Customer Experience and Business Performance. Bell Labs Tech. J., 18: 13–17. doi:10.1002/bltj.21642

Zhang, Y., Chen, S., Wang, Q., & Yu, G. (2015). i^2MapReduce: Incremental MapReduce for Mining Evolving Big Data. IEEE transactions on knowledge and data engineering, 27(7), 1906-1919.

Zhang, Y., Gao, Q., Gao, L., & Wang, C. (2012). imapreduce: A distributed computing framework for iterative computation. Journal of Grid Computing, 10(1), 47-68.

RDF Data Query Processing Performance

Dr. Aly, O.
Computer Science

Abstract

The purpose of this paper is to provide a survey on the state-of-the-art techniques for applying MapReduce to improve the RDF data query processing performance.  Tremendous effort from the industry and researchers have been exerted to develop efficient and scalable RDF processing system.   The project discusses and analyzes the RDF framework and the major building blocks of the semantic web architecture.   The RDF store architecture and the MapReduce Parallel Processing Framework and Hadoop are discussed in this project.  Researchers have exerted effort in developing Semantic Web technologies which have been standardized to address the inadequacy of the current traditional analytical techniques.   This paper also discusses and analyzes the most prominent standardized semantic web technologies RDF and SPARQL.  The discussion and the analysis of the various techniques which are applied on MapReduce to improve the RDF query processing performance include techniques such as RDFPath, PigSPARQL, Interactive SPARQL Query Processing on Hadoop (Sempala), Map-Side Index Nested Loop Join (MAPSIN JOIN), HadoopRDF, RDF-3X (RDF Triple eXpress), and Rya (a Scalable RDF Triple Store for the Clouds). 

Keywords: RDF, SPARQL, MapReduce, Performance

MapReduce and RDF Data Query Processing Optimized Performance

            This project provides a survey on the state-of-the-art techniques for applying MapReduce to improve the Resource Description Framework (RDF) data query processing performance.  There has been a tremendous effort from the industry and researchers to develop efficient and scalable RDF processing systems.  Most of the complex data-processing tasks require multiple cycles of the MapReduce which are chained together into sequential.  The decomposition of a task into cycles or subtasks are often implemented.  Thus, the low overall workflow cost is a key element in the decomposition.  Each cycle of the MapReduce results in significant overhead.  When using RDF, the decomposition problem reflects the distribution of operations such as SELECT, and JOIN into subtasks which is supported by MapReduce cycle. The issue of the decomposition is related to the operations order because the neighboring operation in a query plan can be effectively grouped into the same subtasks.  When using MapReduce, the operation order is based on the requirement of key partitioning so that the neighboring operations do not cause any conflict.  Various techniques are proposed to enhance the performance of the semantic web queries using RDF and MapReduce. 

This project begins with the overview of RDF, followed by RDF Store Architecture, MapReduce Parallel Processing Framework, and Hadoop.  RDF and SPARQL using semantic query is discussed covering the syntax of SPARQL and the missing features that are required to enhance the performance of RDF using MapReduce.  Various techniques are discussed and analyzed on the application of MapReduce to improve RDF query processing performance.  Some of these techniques include HadoopRDF, RDFPath, and PigSPARQL.

Resource Description Framework (RDF)

Resource Description Framework (RDF) is described as an emerging standard for processing metadata (Punnoose, Crainiceanu, & Rapp, 2012; Tiwana & Balasubramaniam, 2001) (Punnoose et al., 2012; Tiwana & Balasubramaniam, 2001).  RDF provides interoperability between applications that exchange machine-understandable information on the Web (Sakr & Gaber, 2014; Tiwana & Balasubramaniam, 2001).   The primary goal of RDF is to define a mechanism and provide standards for the metadata and for describing resources on the Web (Boussaid, Tanasescu, Bentayeb, & Darmont, 2007; Firat & Kuzu, 2011; Tiwana & Balasubramaniam, 2001) which makes no a priori assumptions about a particular application domain or the associated semantics (Tiwana & Balasubramaniam, 2001).  These standards or mechanisms provided by the RDF can prevent users from accessing irrelevant subjects because RDF provided metadata that is relevant to the desired information (Firat & Kuzu, 2011).

RDF is also described as a Data Model (Choi, Son, Cho, Sung, & Chung, 2009; Myung, Yeon, & Lee, 2010) for representing labeled directed graphs (Choi et al., 2009; Nicolaidis & Iniewski, 2017), and useful for a Data Warehousing solution as the MapReduce framework (Myung et al., 2010).  RDF is used as an important building block of Semantic Web of Web 3.0 (see Figure 1) (Choi et al., 2009; Firat & Kuzu, 2011).  The technologies of the Semantic Web are useful for maintaining data in the Cloud (M. F. Husain, Khan, Kantarcioglu, & Thuraisingham, 2010).  These technologies of the Semantic Web provide the ability to specify and query heterogeneous data in a standard manner (M. F. Husain et al., 2010).   

RDF Data Model can be extended to ontologies to include RDF Schema (RDFS) and Ontology Web Language (OWL) to provide techniques to define and identify vocabularies specified to a certain domain, schema and relations between the elements of the vocabulary (Choi et al., 2009).   RDS can be exported in various file formats (Sun & Jara, 2014).  The most common of these formats is RDF + XML and XSD (Sun & Jara, 2014).  The OWL is used to add semantics to the schema (Sun & Jara, 2014).  For instance, if “A isAssociatedWith B,” which implies that “B is AssociatedWith A” (Sun & Jara, 2014).  The OWL allows the ability to express these two things the same way (Sun & Jara, 2014).  This similarity feature allowed by OWL is very useful for “joining” data expressed in different schemas (Sun & Jara, 2014).  This feature allows building relationship and joining up data from multiple sites, described as “Linked Data” facilitating the heterogeneous data stream integration (Sun & Jara, 2014).  OWL enables new facts to be derived from known facts using the inference rules (Nicolaidis & Iniewski, 2017).  Another example which can be used to enforce the inference technique using OWL is when a triple states that a car is a subtype of a vehicle and another triple state that a Cabrio is a subtype of a car, the new fact will be that Cabrio is a vehicle which can be inferred from the previous facts (Nicolaidis & Iniewski, 2017).

RDF Data Model is described to be a simple and flexible framework (Myung et al., 2010).   The underlying form and expression in RDF is a collection of “triples,” each consisting of a subject (s), a predicate (p), and an object (o) (Brickley, 2014; Connolly & Begg, 2015; Nicolaidis & Iniewski, 2017; Przyjaciel-Zablocki, Schätzle, Skaley, Hornung, & Lausen, 2013; Punnoose et al., 2012).   The subjects and predicates are Resources; each encoded as a Uniform Resource Identifier (URI) to ensure the uniqueness, while the object can be a Resource or a Literal such as string, date or number (Nicolaidis & Iniewski, 2017).  In (Firat & Kuzu, 2011), the basic structure of the RDF Data Model is based on a triplet of the object (O), quality (A) and value (V) (Firat & Kuzu, 2011).  The basic role of RDF is to provide Data Model of the object, quality and value (OAV) (Firat & Kuzu, 2011).  RDF Data Model is similar to the XML Data Model, where both do not include form-related information or names (Firat & Kuzu, 2011).

Figure 1:  The Major Building Blocks of the Semantic Web Architectures. Adapted from (Firat & Kuzu, 2011).

            RDF has been commonly used in applications such as Semantic Web, Bioinformatics, and Social Networks because of its great flexibility and applicability (Choi et al., 2009).   These applications require a huge computation over a large set of data (Choi et al., 2009).  Thus, the large-scale graph datasets are very common among these applications of Semantic Web, Bioinformatics, and Social Networks (Choi et al., 2009).  However, the traditional techniques for processing such large-scale of the dataset are found to be inadequate (Choi et al., 2009).   Moreover, RDF Data Model enables existing heterogeneous database systems to be integrated into a Data Warehouse because of its flexibility (Myung et al., 2010).   The flexibility of the RDF Data Model also provides users the inference capability to discover unknown knowledge which is useful for large-scale data analysis (Myung et al., 2010).   RDF triples require terabytes of disk space for storage and analysis (M. Husain, McGlothlin, Masud, Khan, & Thuraisingham, 2011; M. F. Husain et al., 2010).  Researchers are encouraged to develop efficient repositories, because there are only a few existing frameworks such as RDF-3X, Jena, Sesame, BigOWLIM for Semantic Web technologies (M. Husain et al., 2011; M. F. Husain et al., 2010).  These frameworks are single-machine RDF systems and are widely used because they are user-friendly and perform well for small and medium-sized RDF datasets (M. Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).  The RDF-3X is regarded to be the fastest single machine RDF systems regarding query performance that vastly outperforms previous single machine systems (M. Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).  However, the performance of RDF-3X diminishes for queries with unbound objects and low selectivity factor (M. Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).  These frameworks are confronted by the large RDF graphs (M. Husain et al., 2011; M. F. Husain et al., 2010).   Therefore, the storage of a large volume of RDF triples and the efficient query of the RDF triples are challenging and are regarded to be critical problems in Semantic Web (M. Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).  These challenges also limit the scaling capabilities (M. Husain et al., 2011; M. F. Husain et al., 2010; Sakr & Gaber, 2014).

RDF Store Architecture

            The main purpose of the RDF store is to build a database for storing and retrieving data of any data expressed in RDF (Modoni, Sacco, & Terkaj, 2014).  The term RDF store is used as an abstract for any system that can handle RDF data, allowing the ingestion of serialized RDF data and the retrieval of these data, and providing a set of APIs to facilitate the integration with other third-party application as the client application (Modoni et al., 2014).  The term triple store often refers to these types of systems (Modoni et al., 2014).  RDF store includes two major components; the Repository and the Middleware.  The Repository represents a set of files or database (Modoni et al., 2014). The Middleware is on top of the repository and in constant communication with it (Modoni et al., 2014).  Figure 2 illustrates the RDF store architecture.  The Middleware has its components; Storage Provider, Query Engine, Parser/Serializer, and Client Connector (Modoni et al., 2014).  The current RDF stores are categorized into three groups; database based stores, native stores, and hybrid stores (Modoni et al., 2014).  Examples of the database based stores include MySQL, Oracle 12c, which are built on top of existing commercial database engines (Modoni et al., 2014).  Examples of native stores are AllegroGraph, OWLIM which are built as database engines from scratch (Modoni et al., 2014).  Examples of the hybrid stores are Virtuoso, and Sesame which supports architectural styles; native and DBMS-backed (Modoni et al., 2014).

Figure 2:  RDF Store Architecture (Modoni et al., 2014)

MapReduce Parallel Processing Framework and Hadoop

In 2004, Google introduced MapReduce framework as a Parallel Processing framework which deals with a large set of data (Bakshi, 2012; Fadzil, Khalid, & Manaf, 2012; White, 2012-important-buildingblocks).  The MapReduce framework has gained much popularity because it has features for hiding sophisticated operations of the parallel processing (Fadzil et al., 2012).  Various MapReduce frameworks such as Hadoop were introduced because of the enthusiasm towards MapReduce (Fadzil et al., 2012).  The capability of the MapReduce framework was realized by different research areas such as data warehousing, data mining, and the bioinformatics (Fadzil et al., 2012).  MapReduce framework consists of two main layers; the Distributed File System (DFS) layer to store data and the MapReduce layer for data processing (Lee, Lee, Choi, Chung, & Moon, 2012; Mishra, Dehuri, & Kim, 2016; Sakr & Gaber, 2014).  DFS is a major feature of the MapReduce framework (Fadzil et al., 2012).  

MapReduce framework is using large clusters of low-cost commodity hardware to lower the cost (Bakshi, 2012; H. Hu, Wen, Chua, & Li, 2014; Inukollu, Arsi, & Ravuri, 2014; Khan et al., 2014; Krishnan, 2013; Mishra et al., 2016; Sakr & Gaber, 2014; White, 2012-important-buildingblocks).  MapReduce framework is using “Redundant Arrays of Independent (and inexpensive) Nodes (RAIN),” whose components are loosely coupled and when any node goes down, there is no negative impact on the MapReduce job (Sakr & Gaber, 2014; Yang, Dasdan, Hsiao, & Parker, 2007).  MapReduce framework involves the “Fault-Tolerance” by applying the replication technique and allows replacing any crashed nodes with another node without affecting the currently running job (P. Hu & Dai, 2014; Sakr & Gaber, 2014).  MapReduce framework involves the automatic support for the parallelization of execution which makes the MapReduce highly parallel and yet abstracted (P. Hu & Dai, 2014; Sakr & Gaber, 2014). 

MapReduce was introduced to solve the problem of parallel processing of a large set of data in a distributed environment which required manual management of the hardware resources (Fadzil et al., 2012; Sakr & Gaber, 2014).  The complexity of the parallelization is solved by using two techniques:  Map/Reduce technique, and Distributed File System (DFS) technique (Fadzil et al., 2012; Sakr & Gaber, 2014).  The parallel framework must be reliable to ensure good resource management in the distributed environment using off-the-shelf hardware to solve the scalability issue to support any future requirement for processing (Fadzil et al., 2012).   The earlier frameworks such as the Message Passing Interface (MPI) framework was having a reliability issue and had a fault-tolerance issue when processing a large set of data (Fadzil et al., 2012).  MapReduce framework covers the two categories of the scalability; the structural scalability, and the load scalability (Fadzil et al., 2012).  It addresses the structural scalability by using the DFS which allows forming large virtual storage for the framework by adding off-the-shelf hardware.  MapReduce framework addresses the load scalability by increasing the number of the nodes to improve the performance (Fadzil et al., 2012). 

However, the earlier version of MapReduce framework faced challenges. Among these challenges are the join operation and the lack of support for aggregate functions to join multiple datasets in one task (Sakr & Gaber, 2014).  Another limitation of the standard MapReduce framework is found in the iterative processing which is required for analysis techniques such as PageRank algorithm, recursive relational queries, and social network analysis (Sakr & Gaber, 2014).  The standard MapReduce does not share the execution of work to reduce the overall amount of work  (Sakr & Gaber, 2014).  Another limitation was found in the lack of support of data index and column storage but support only for a sequential method when scanning the input data. Such a lack of data index affected the query performance (Sakr & Gaber, 2014).  Moreover, many argued that MapReduce is not regarded to be the optimal solution for structured data.   It is known as shared-nothing architecture, which supports scalability (Bakshi, 2012; Jinquan, Jie, Shengsheng, Yan, & Yuanhao, 2012; Sakr & Gaber, 2014; White, 2012-important-buildingblocks), and the processing of large unstructured data sets (Bakshi, 2012).  MapReduce has the limitation of performance and efficiency (Lee et al., 2012).

Hadoop is a software framework which is derived from Big Table and MapReduce and managed by Apache.  It was created by Doug Cutting and was named after his son’s toy elephant (Mishra et al., 2016).  Hadoop allows applications to run on huge clusters of commodity hardware based on MapReduce (Mishra et al., 2016).  The underlying concept of Hadoop is to allow the parallel processing of the data across different computing nodes to speed up computations and hide the latency (Mishra et al., 2016).  The Hadoop Distributed File System (HDFS) is one of the major components of the Hadoop framework for storing large files (Bao, Ren, Zhang, Zhang, & Luo, 2012; Cloud Security Alliance, 2013; De Mauro, Greco, & Grimaldi, 2015) and allowing access to data scattered over multiple nodes in without any exposure to the complexity of the environment (Bao et al., 2012; De Mauro et al., 2015).  The MapReduce programming model is another major component of the Hadoop framework (Bao et al., 2012; Cloud Security Alliance, 2013; De Mauro et al., 2015) which is designed to implement the distributed and parallel algorithms efficiently (De Mauro et al., 2015).  HBase is the third component of Hadoop framework (Bao et al., 2012).  HBase is developed on the HDFS and is a NoSQL (Not only SQL) type database (Bao et al., 2012). 

The key features of Hadoop include the scalability and flexibility, cost efficiency and fault tolerance (H. Hu et al., 2014; Khan et al., 2014; Mishra et al., 2016; Polato, Ré, Goldman, & Kon, 2014; Sakr & Gaber, 2014).  Hadoop allows the nodes in the cluster to scale up and down based on the computation requirements and with no change in the data formats (H. Hu et al., 2014; Polato et al., 2014).  Hadoop also provides massively parallel computation to commodity hardware decreasing the cost per terabyte of storage which makes the massively parallel computation affordable when the volume of the data gets increased (H. Hu et al., 2014).  The Hadoop technology offers the flexibility feature as it is not tight with a schema which allows the utilization of any data either structured, non-structures, and semi-structured, and the aggregation of the data from multiple sources (H. Hu et al., 2014; Polato et al., 2014).  Hadoop also allows nodes to crash without affecting the data processing.  It provides fault tolerance environment where data and computation can be recovered without any negative impact on the processing of the data (H. Hu et al., 2014; Polato et al., 2014; White, 2012-important-buildingblocks). 

RDF and SPARQL Using Semantic Query

Researchers have exerted effort in developing Semantic Web technologies which have been standardized to address the inadequacy of the current traditional analytical techniques (M. Husain et al., 2011; M. F. Husain et al., 2010). RDF, SPARQL (Simple Protocol And RDF Query Language) are the most prominent standardized semantic web technologies (M. Husain et al., 2011; M. F. Husain et al., 2010).   The Data Access Working Group (DAWG) of the World Wide Web Consortium (W3C) in 2007 recommended SPARQL and provided standards to be the query language for RDF, a protocol definition for sending SPARQL queries from a client to a query processor and an XML-based serialization format for results returned by the SPARQL query (Konstantinou, Spanos, Stavrou, & Mitrou, 2010; Sakr & Gaber, 2014).  RDF is regarded to be the standard for storing and representing data  (M. Husain et al., 2011; M. F. Husain et al., 2010).  SPARQL is the query language to retrieve data from RDF triplestore (M. Husain et al., 2011; M. F. Husain et al., 2010; Nicolaidis & Iniewski, 2017; Sakr & Gaber, 2014; Zeng, Yang, Wang, Shao, & Wang, 2013).   Like RDF, SPARQL is built on the “triple pattern,” which also contains the “subject,” “predicate,” and “object” and is terminated with a full stop (Connolly & Begg, 2015).  RDF triple is regarded to be a SPARQL triple pattern (Connolly & Begg, 2015).  URIs are written inside angle brackets for identifying resources; literal strings are denoted with either double or single quote; properties, like Name, can be identified by their URI or more normally using a QName-style syntax to improve readability (Connolly & Begg, 2015). The triple pattern can include variables which are not like the triple (Connolly & Begg, 2015). Any or all of the values of subject, predicate, and object in a triple pattern may be replaced by a variable, which indicates data items of interest that will be returned by a query (Connolly & Begg, 2015).   

The semantic query plays a significant role in the Semantic Web, and the standardization of SPARQL plays a significant role to achieve such semantic queries (Konstantinou et al., 2010).  Unlike the traditional query languages, SPARQL does not consider the graph level, but rather it models the graph as a set of triples (Konstantinou et al., 2010).  Thus, when using the SPARQL query, the graph pattern is identified, and the nodes which match this pattern are returned (Konstantinou et al., 2010; Zeng et al., 2013).  SPARQL syntax is similar to SQL such as SELECT FROM WHERE syntax which is the most striking syntax (Konstantinou et al., 2010).  The core syntax of SPARQL is a conjunctive set of triple patterns called as “basic graph pattern” (Zeng et al., 2013).  Table 1 shows the syntax of SPARQL to retrieve data from RDF using SELECT statement. 

Table 1:  Example of SPARQL syntax (Sakr & Gaber, 2014)

Although SPARQL syntax is similar to SQL in the context of SELECT to retrieve data, SPARQL is not as mature as SQL (Konstantinou et al., 2010).  The current form of SPARQL allows the access to the raw data using URIs from RDF or OWL graph and letting the user perform the result processing (Konstantinou et al., 2010).  However, SPARQL is expected to be the gateway to query information and knowledge supporting as many features as SQL does (Konstantinou et al., 2010).   SPARQL does not support any aggregated functions such as MAX, MIN, SUM, AVG, COUNT, and the GROUP BY operations (Konstantinou et al., 2010).  Moreover, SPARQL supports ORDER BY only on a global level and not solely on the OPTIONAL part of the query (Konstantinou et al., 2010).  For mathematical operations, SPARQL does not extend its support beyond the basic mathematical operations (Konstantinou et al., 2010).   SPARQL does not support the nested queries, meaning it does not allow CONSTRUCT query in the FROM part of the query.  Moreover, SPARQL is missing the functionality offered by SELECT WHERE LIKE statement in SQL, allowing for keyword-based queries (Konstantinou et al., 2010).  While SPARQL offers regex() function for string pattern matching, it cannot emulate the functionality of the LIKE operator (Konstantinou et al., 2010).  SPARQL enables only the unbound variables in the SELECT part and rejecting the use of functions or other operators (Konstantinou et al., 2010). This limitation places SPARQL as elementary query language where URIs or literals only are returned, while users look for some result processing in the practical use cases (Konstantinou et al., 2010).  SPARQL can be enhanced to include these missing features and functionality to include stored procedures, triggers, and operations for data manipulations such as update, insert, and delete (Konstantinou et al., 2010). 

There is a group called SPARQL Working Group who are working on integrating these missing features in SPARQL.  SPARQL/Update is an extension to SPARQL included in the leading Semantic Web development framework “Jena” allowing the update operation, the creation and the removal of the RDF graphs (Konstantinou et al., 2010). ARQ is a query engine for Jena which supports the SPARQL RDF Query language (Apache, 2017a).  Some of the key features of the ARQ include the update, the GROUP BY, access and extension of the SPARQL algebra, and support for the federated query (Apache, 2017a).   LARQ integrates SPARQL with Apache’s full-text search framework Lucene (Konstantinou et al., 2010) adding free text searches to SPARQL (Apache, 2017b).  SPARQL+ extension of the ARC RDF sore offers most of the common aggregates and extends the SPARUL’s INSERT with CONSTRUCT clause (Konstantinou et al., 2010).  The OpenLink’s Virtuoso extends SPARQL with aggregate functions, nesting, and subqueries, allowing the user to insert SPARQL queries inside SQL (Konstantinou et al., 2010).  SPASQL offers a similar functionality embedding SPARQL into SQL (Konstantinou et al., 2010).    

Although SPARQL is missing a lot of SQL features, it offers other features which are not part of SQL (Konstantinou et al., 2010).  Some of these features include the OPTIONAL operator which does not modify the results in case of non-existence and it can be met in almost all of the query languages for RDF (Konstantinou et al., 2010).  This feature is equivalent to the LEFT OUTER JOIN in SQL (Konstantinou et al., 2010).  However, SPARQL syntax is much more user-friendly and intuitive than SQL (Konstantinou et al., 2010). 

Techniques Applied on MapReduce

To Improve RDF Query Processing Performance

With the explosive growth of the data size, the traditional approach of analyzing the data in a centralized server is not adequate to scale up (Punnoose et al., 2012; Sakr & Gaber, 2014), and cannot scale concerning the increasing RDF datasets (Sakr & Gaber, 2014).   Although SPARQL is used to query RDF data, the query of RDF dataset at the web scale is challenging because the computation of SPARQL queries requires several joins between subsets of the dataset (Sakr & Gaber, 2014).  New methods are introduced to improve the parallel computing and allow storage and retrieval of RDF across large compute clusters which enables processing data of unprecedented magnitude (Punnoose et al., 2012).   Various solutions are introduced to solve these challenges and achieve scalable RDF processing using the MapReduce framework such as PigSPARQL, and RDFPath.

  1. RDFPath

In (Przyjaciel-Zablocki, Schätzle, Hornung, & Lausen, 2011), the RDFPath is proposed as a declarative path query language for RDF which provides a natural mapping to the MapReduce programming model by design, while remaining extensible (Przyjaciel-Zablocki et al., 2011).  It supports the exploration of graph properties such as shortest connections between two nodes in an RDF graph (Przyjaciel-Zablocki et al., 2011).  RDFPath is regarded to be a valuable tool for the analysis of social graphs (Przyjaciel-Zablocki et al., 2011).  RDFPath combines an intuitive syntax for path queries with an effective execution strategy using MapReduce (Przyjaciel-Zablocki et al., 2011).  RDFPath does benefit from the horizontal scaling properties of MapReduce when adding more nodes to improve the overall executions time significantly (Przyjaciel-Zablocki et al., 2011).  Using RDFPath, large RDF graphs can be handled while scaling linearly with the size of the graph that RDFPathh can be used to investigate graph properties such as a variant of the famous six degrees of separation paradigm typically encountered in social graphs (Przyjaciel-Zablocki et al., 2011).   It focuses on the path queries and studies their implementation based on MapReduce.  There are various RDF query languages such as RQL, SeRQL, RDQL, Triple, N3, Versa, RxPath, RPL, and SPARQL (Przyjaciel-Zablocki et al., 2011).  RDFPath has a competitive expressiveness to these other RDF query languages (Przyjaciel-Zablocki et al., 2011).   A comparison of RDFPath capabilities with these other RDF query language shows that RDFPath has the same capabilities of SPARQL 1.1for the adjacent nodes, adjacent edges, the degree of a node, and fixed-length path.  However, RDFPath shows more capabilities than SPARQL 1.1 in areas like the distance between two nodes and shortest paths as it has partial support for these two properties.  However, SPARQL 1.1 shows full support to the aggregate functions while RDFPath shows only partial support (Przyjaciel-Zablocki et al., 2011).  Table 2 shows the comparison of RDFPath with other RDF query languages including SPARQL.  

Table 2: Comparison of RDF Query Language, adapted from (Przyjaciel-Zablocki et al., 2011).

2. PigSPARQL

PigSPARQL is regarded as a competitive yet easy to use SPARQL query processing system on MapReduce that allows ad-hoc SPARQL query processing n large RDF graphs out of the box (Schätzle, Przyjaciel-Zablocki, Hornung, & Lausen, 2013).  PigSPARQL is described as a system for processing SPARQL queries using the MapReduce framework by translating them into Pig Latin programs where each Pig Latin program is executed by a series of MapReduce jobs on a Hadoop cluster (Sakr & Gaber, 2014; Schätzle et al., 2013). PigSPARQL utilizes the query language of Pig, which is a data analysis platform on top of Hadoop MapReduce, as an intermediate layer between SPARQL and MapReduce (Schätzle et al., 2013).  That intermediate layer provides an abstraction level which makes PigSPARQL independent of Hadoop version and accordingly ensures the compatibility to future changes of the Hadoop framework as they will be covered by the underlying Pig layer (Schätzle et al., 2013).  This intermediate layer of Pig Latin approach provides the sustainability of PigSPARQL and is an attractive long-term baseline for comparing various MapReduce based SPARQL implementations which are also underpinned by the competitiveness with the existing systems such as HadoopRDF (Schätzle et al., 2013).  As illustrated in Figure 3, the PigSPARQL workflow begins with the SPARQL that is mapped to Pig Latin by parsing the SPARQL query to generate an abstract syntax tree which is translated into a SPARQL Algebra tree (Schätzle et al., 2013).  Several optimizations are applied on the Algebra level like the early execution of filters and a re-arrangement of triple patterns by selectivity (Schätzle et al., 2013).  The optimized Algebra tree is traversed bottom-up, and an equivalent sequence of Pig Latin expressions are generated for every SPARQL Algebra operator (Schätzle et al., 2013).  Pig automatically maps the resulting Pig Latin script into a sequence of MapReduce iterations at the runtime (Schätzle et al., 2013).

PigSPARQL is described as easy to use and competitive baseline for the comparison of MapReduce based SPARQL processing.  PigSPARQL exceeds the functionalities of most existing research prototypes with the support of SPARQL 1.0 (Schätzle et al., 2013). 

Figure 3: PigSPARQL Workflow From SPARQL to MapReduce, adapted from (Schätzle et al., 2013).

3. Interactive SPARQL Query Processing on Hadoop: Sempala

            In (Schätzle, Przyjaciel-Zablocki, Neu, & Lausen, 2014), an interactive SPARQL query processing techniques “Sempala” on Hadoop is proposed.  Sempala is a SPARQL-over-SQL-on-Hadoop approach designed with selective queries (Schätzle et al., 2014).  It shows significant performance improvements compared to existing approaches (Schätzle et al., 2014).  The approach of Sempala is inspired by the trend of applying SQL-on-Hadoop field where several new systems are developed for interactive SQL query processing such as Hive, Sharl, Presto, Phoenix, Impala and so forth (Schätzle et al., 2014).  Thus, Sempala as the SPARQL-over-SQL approach is introduced to follow the trend and provide interactive-time SPARQL query processing on Hadoop (Schätzle et al., 2014).  With Sempala, the data is stored in RFD in a columnar layout on HDFS and use Impala, which is an open source massive parallel processing (MPP) SQL query engine for Hadoop, to serve as the execution layer on top (Schätzle et al., 2014).   The architecture of Sempala is illustrated in Figure 4. 

Figure 4:  Sempala Architecture adapted from (Schätzle et al., 2014).

            Two main components of the architecture of the proposed Sempala; RDF Loader and Query Compiler (Schätzle et al., 2014).  The RDF Loader converts an RDF dataset into the data layout used by Sempala.  The Query Compiler rewrites a given SPARQL query into the SQL dialect of Impala based on the layout of the data (Schätzle et al., 2014).   The Query Compiler of Sempala is based on the algebraic representation of SPARQL expressions defined by W3C recommendation (Schätzle et al., 2014).  Jena ARQ is used to parse a SPARQL query into the corresponding algebra tree (Schätzle et al., 2014).  Some basic algebraic optimization such as filter pushing is applied (Schätzle et al., 2014).   The final step is to traverse the tree bottom up to generate the equivalent Impala SQL expressions based on the unified property table layout (Schätzle et al., 2014).  In a comparison of Sempala with other Hadoop based systems such as Hive, PigSPARQL, MapMerge, and MAPSIN.   Hive is the standard SQL warehouse for Hadoop based on MapReduce (Schätzle et al., 2014).  The same query with minor syntactical modification can run on the same data because Impala is developed to be highly compatible with Hive (Schätzle et al., 2014).  Sempala seems to follow a similar approach as PigSPARQL.   However, in PigSPARQL, the Pig is used as the underlying system and intermediate level between MapReduce and SPARQL (Schätzle et al., 2014).  MapMerge is an efficient map-side merge join implementation for scalable SPARQL BGP (“BasicGraphPatterns” (W3C, 2016)) which reduces the shuffling of the data between map and reduce phases in MapReduce (Schätzle et al., 2014).  MAPSIN is an approach that uses HBase, which is standard NoSQL database for Hadoop to store RDF data and applies a map-side index nested loop join which avoids the reduce phase of the MapReduce (Schätzle et al., 2014).  The findings of (Schätzle et al., 2014) shows that Sempala outperforms Hive and PigSPARQL, while MapMerge and MAPSIN could not be used because they only support SPARQL BGP (Schätzle et al., 2014).

4. Map-Side Index Nested Loop Join (MAPSIN JOIN)

            MapReduce is facing the challenge of processing joins because the datasets are very large (Sakr & Gaber, 2014).  Two datasets can be joined using MapReduce, but they have to be located on the same machine, which is not practical (Sakr & Gaber, 2014).  Thus, solutions such as reduce-side approach are used and regarded to be the most prominent and flexible join technique in MapReduce (Sakr & Gaber, 2014).  The reduce-side approach is also known as “Repartition Join” because datasets at the map phase are read and repartition according to the join key at the shuffle phase, while the actual computation for join is done in the reduce phase (Sakr & Gaber, 2014).  The problem with this approach is that the datasets are transferred through the network with no regard to the join output which can consume a lot of the bandwidth of the network and cause bottleneck (Sakr & Gaber, 2014; Schätzle et al., 2013).  Another solution called map-side join solution is introduced, where the actual join processing is done in the map phase to avoid the shuffle and reduce phase and avoid transferring both datasets over the network (Sakr & Gaber, 2014).   The most common approach is the map-side merge join, although it is hard to cascade, in addition to the advantage of avoiding the shuffle and reduce phase is lost (Sakr & Gaber, 2014).  Thus, the MAPSIN approach is proposed which is a map-side index nested loop join based on HBase (Sakr & Gaber, 2014; Schätzle et al., 2013).  The MAPSIN join has the indexing capabilities of HBase which improves the query performance of the selective queries (Sakr & Gaber, 2014; Schätzle et al., 2013).  The capabilities retain the flexibility of reduce-side joins while utilizing the effectiveness of a map-side join without any modification to the underlying framework (Sakr & Gaber, 2014; Schätzle et al., 2013). 

Comparing MAPSIN with PigSPARQL, MAPSIN performs faster than PigSPARQL when using a sophisticated storage schema based on HBase which works well for selective queries but diminishes significantly in performance for less selective queries (Schätzle et al., 2013).  However, MAPSIN does not support the queries of LUBM (Lehigh University Benchmark (W3C, 2016).  The query runtime of MAPSIN is close to the runtime of the merge join approach (Schätzle et al., 2013). 

5. HadoopRDF

            HadoopRDF is proposed by (Tian, Du, Wang, Ni, & Yu, 2012) to combine the advantages of high fault tolerance and high throughput of the MapReduce distributed framework and the sophisticated indexing and query answering mechanism (Tian et al., 2012).  HadoopRDF is developed on Hadoop cluster with many computers and echo node in the cluster has a sesame server to supply the service for storing and retrieving the RDF data (Tian et al., 2012).  HadoopRDF is a MapReduce-based RDF system which stores data directly in HDFS and does not require any modification to the Hadoop framework (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014; Tian et al., 2012).  The basic idea is to substitute the rudimentary HDFS without indexes and a query execution engine, with more elaborated RDF stores (Tian et al., 2012).  The architecture of HadoopRDF is illustrated in Figure 5.

Figure 5: HadoopRDF Architecture, adapted from (Tian et al., 2012).

            The architecture of HadoopRDF is similar to the architecture of Hadoop which scales up to thousands of nodes (Tian et al., 2012).  Hadoop framework is the core of the HadoopRDF (Tian et al., 2012).  Hadoop is built on top of HDFS, which is a replicated key-value store under the control of a central NameNode (Tian et al., 2012).  Files in HDFS are broken into chunks fixed size, and the replica of these chunks are distributed across a group of DataNodes (Tian et al., 2012).  The NameNode tracks the size and location of each replica (Tian et al., 2012). Hadoop which is a MapReduce framework is used for the computational purpose in the data-intensive application (Tian et al., 2012).  In the architecture of HadoopRDF, the RDF stores are incorporated into the MapReduce framework.  HadoopRDF is an advanced SPARQL engine which splits the original RDF graph according to predicates and objects and utilizes a cost-based query execution plan for reduce-side join (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014; Schätzle et al., 2013).  HadoopRDF can re-balance automatically when the cluster size changes but join processing is also done in the reduce phase (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014).  The findings of (M. Husain et al., 2011) indicated that HadoopRDF is more scalable and handles low selectivity queries more efficiently than RDF-3X.  Moreover, the result showed that HadoopRDF is much more scalable than BigOWLIM and provides more efficient queries for the large data set (M. Husain et al., 2011).   HadoopRDF requires a pre-processing phase like most systems (Przyjaciel-Zablocki et al., 2013; Sakr & Gaber, 2014).

6. RDF-3X:  RDF Triple eXpress

            RDF-3X is proposed by (Neumann & Weikum, 2008).  The RDF-3X engine is an implementation of SPARQL which achieves excellent performance by pursuing a RISC-style architecture with a streamlined architecture (Neumann & Weikum, 2008).   RISC is Reduced Instruction Set Computer which is a type of microprocessor architecture that utilizes a small, highly-optimized set of instructions, rather than a more specialized set of instruction often found in other types of architectures (Neumann & Weikum, 2008).  Thus, RDF-3X follows the concept of RISC-style with “reduced instruction set” designed to support RDF.  RDF-3X is described to be a generic solution for storing and indexing RDF triples that eliminates the need for physical-design turning (Neumann & Weikum, 2008).  RDF-3X provides a query optimizer for choosing optimal join orders using a cost model based on statistical synopses for entire join paths (Neumann & Weikum, 2008).   It also provides a powerful and simple query processor which leverage fast merge joins to the large-scale data (Neumann & Weikum, 2008).  Three major components in RDF-3X; physical design, query processor, and the query optimizer.  The physical design component is workload-independent by creating appropriate indexes over a single “giant triples table” (Neumann & Weikum, 2008).  The query processor is RISC-style by relying mostly on merge joins over sorted index lists.  The query optimizer focuses on join order in its generation of the execution plan (Neumann & Weikum, 2008).

The findings of (Neumann & Weikum, 2008) showed that RDF-3X addressed the challenge of schema-free data and copes very well with data that exhibit a large diversity of property names (Neumann & Weikum, 2008).  The optimizer of RDF-3X is known to produce efficient query execution plan (Galarraga, Hose, & Schenkel, 2014).  The RDF-3X maintains local indexes for all possible orders and combinations of the triple components and for aggregations which enable efficient local data access (Galarraga et al., 2014).  RDF-3X does not support LUMB.  RDF-3X is a single-node RDF-store which builds indexes over all possible permutations of subject, predicate and object (Huang, Abadi, & Ren, 2011; M. Husain et al., 2011; Schätzle et al., 2014; Zeng et al., 2013).  RDF-X3 is regarded to be the fastest existing semantic web repository and state-of-the-art “benchmark” engine for single place machines (M. Husain et al., 2011; Przyjaciel-Zablocki et al., 2013).  Thus, it outperforms any other solution for queries with bound objects and aggregate queries (M. Husain et al., 2011). However, the performance of RDF-3X diminishes exponentially for unbound queries and queries with even simple joins if the selectivity factor is low (M. Husain et al., 2011; Przyjaciel-Zablocki et al., 2013).  The experiment of (M. Husain et al., 2011) showed that RDF-3X is not only slower for such queries, it often aborts and cannot complete the query (M. Husain et al., 2011). 

7. Rya: A Scalable RDF Triple Store for the Clouds

            In (Punnoose et al., 2012), the Rya is proposed as a new scalable system for storing and retrieving RDF data in cluster nodes.  In Rya, OWL model is used as a set of triples and store them in the triple store (Punnoose et al., 2012).  Storing all the data in the triple store provides the benefits of using Hadoop MapReduce to run large batch processing jobs against the data set (Punnoose et al., 2012).  The first phase of the process is performed only once at the time when the OWL model is loaded into Rya (Punnoose et al., 2012).  In phase 1, MapReduce job runs to iterate through the entire graph of relationships and output the implicit relationships found as explicit RDF triples stores into the RDF store (Punnoose et al., 2012).  The second phase of the process is performed every time a query is run, and once all explicit and implicit relationships are stored in Rya, the Rya query planner can expand the query at the runtime to utilize all these relationships (Punnoose et al., 2012).   Three table index for indexing RDF triples is used to enhance the performance (Punnoose et al., 2012).  The results of (Punnoose, Crainiceanu, & Rapp, 2015) showed that Rya outperformed SHARD. Moreover, in comparison with the graph-partitioning algorithm introduced by (Huang et al., 2011), as indicated in (Punnoose et al., 2015), the performance of Rya showed superiority in many cases over Graph Partitioning (Punnoose et al., 2015).

Conclusion

This project provided a survey on the state-of-the-art techniques for applying MapReduce to improve the RDF data query processing performance.  Tremendous effort from the industry and researchers have been exerted to develop efficient and scalable RDF processing system.   The project discussed the RDF framework and the major building blocks of the semantic web architecture.   The RDF store architecture and the MapReduce Parallel Processing Framework and Hadoop are discussed in this project.  Researchers have exerted effort in developing Semantic Web technologies which have been standardized to address the inadequacy of the current traditional analytical techniques.   This paper also discussed the most prominent standardized semantic web technologies RDF and SPARQL.  The project also discussed and analyzed in details various techniques applied on MapReduce to improve the RDF query processing performance.  These techniques include RDFPath, PigSPARQL, Interactive SPARQL Query Processing on Hadoop (Sempala), Map-Side Index Nested Loop Join (MAPSIN JOIN), HadoopRDF, RDF-3X (RDF Triple eXpress), and Rya (a Scalable RDF Triple Store for the Clouds). 

References

Apache, J. (2017a). ARQ – A SPARQL Processor for Jena

Apache, J. (2017b). LARQ – Adding Free Text Searches to SPARQL

Bakshi, K. (2012). Considerations for big data: Architecture and approach. Paper presented at the Aerospace Conference, 2012 IEEE.

Bao, Y., Ren, L., Zhang, L., Zhang, X., & Luo, Y. (2012). Massive sensor data management framework in cloud manufacturing based on Hadoop. Paper presented at the Industrial Informatics (INDIN), 2012 10th IEEE International Conference on.

Boussaid, O., Tanasescu, A., Bentayeb, F., & Darmont, J. (2007). Integration and dimensional modeling approaches for complex data warehousing. Journal of Global Optimization, 37(4), 571. doi:10.1007/s10898-006-9064-6

Brickley, D., & Guha, R. V. (Eds.). (2014). RDF schema 1.1. Retrieved from the W3C Web site: http://www.w3.org/TR/2014/REC-rdf-schema-20140225/.

Choi, H., Son, J., Cho, Y., Sung, M. K., & Chung, Y. D. (2009). SPIDER: a system for scalable, parallel/distributed evaluation of large-scale RDF data. Paper presented at the Proceedings of the 18th ACM conference on Information and knowledge management.

Cloud Security Alliance. (2013). Big Data Analytics for Security Intelligence. Big Data Working Group.

Connolly, T., & Begg, C. (2015). Database Systems: A Practical Approach to Design, Implementation, and Management (6th Edition ed.): Pearson.

De Mauro, A., Greco, M., & Grimaldi, M. (2015). What is big data? A consensual definition and a review of key research topics. Paper presented at the AIP Conference Proceedings.

Fadzil, A. F. A., Khalid, N. E. A., & Manaf, M. (2012). Performance of scalable off-the-shelf hardware for data-intensive parallel processing using MapReduce. Paper presented at the Computing and Convergence Technology (ICCCT), 2012 7th International Conference on.

Firat, M., & Kuzu, A. (2011). Semantic web for e-learning bottlenecks: disorientation and cognitive overload. International Journal of Web & Semantic Technology, 2(4), 55.

Galarraga, L., Hose, K., & Schenkel, R. (2014). Partout: a distributed engine for efficient RDF processing. Paper presented at the Proceedings of the 23rd International Conference on World Wide Web.

Hu, H., Wen, Y., Chua, T., & Li, X. (2014). Toward Scalable Systems for Big Data Analytics: A Technology Tutorial. Practical Innovation, Open Solution, 2, 652-687. doi:10.1109/ACCESS.2014.2332453

Hu, P., & Dai, W. (2014). Enhancing fault tolerance based on Hadoop cluster. International Journal of Database Theory and Application, 7(1), 37-48.

Huang, J., Abadi, D. J., & Ren, K. (2011). Scalable SPARQL querying of large RDF graphs. Proceedings of the VLDB Endowment, 4(11), 1123-1134.

Husain, M., McGlothlin, J., Masud, M. M., Khan, L., & Thuraisingham, B. M. (2011). Heuristics-based query processing for large RDF graphs using cloud computing. IEEE transactions on knowledge and data engineering, 23(9), 1312-1327.

Husain, M. F., Khan, L., Kantarcioglu, M., & Thuraisingham, B. (2010). Data intensive query processing for large RDF graphs using cloud computing tools. Paper presented at the Cloud Computing (CLOUD), 2010 IEEE 3rd International Conference on.

Inukollu, V. N., Arsi, S., & Ravuri, S. R. (2014). Security Issues Associated with Big Data in Cloud Computing. International Journal of Network Security & Its Applications, 6(3), 45. doi:10.5121/ijnsa.2014.6304

Jinquan, D., Jie, H., Shengsheng, H., Yan, L., & Yuanhao, S. (2012). The Hadoop Stack: New Paradigm for Big Data Storage and Processing. Intel Technology Journal, 16(4), 92-110.

Khan, N., Yaqoob, I., Hashem, I. A. T., Inayat, Z., Ali, M. W. K., Alam, M., . . . Gani, A. (2014). Big Data: Survey, Technologies, Opportunities, and Challenges. The Scientific World Journal, 1-18. doi:10.1155/2014/712826

Konstantinou, N., Spanos, D.-E., Stavrou, P., & Mitrou, N. (2010). Technically approaching the semantic web bottleneck. International Journal of Web Engineering and Technology, 6(1), 83-111.

Krishnan, K. (2013). Data warehousing in the age of big data: Newnes.

Lee, K.-H., Lee, Y.-J., Choi, H., Chung, Y. D., & Moon, B. (2012). Parallel data processing with MapReduce: a survey. ACM SIGMOD Record, 40(4), 11-20.

Mishra, B. S. P., Dehuri, S., & Kim, E. (2016). Techniques and Environments for Big Data Analysis: Parallel, Cloud, and Grid Computing (Vol. 17): Springer.

Modoni, G. E., Sacco, M., & Terkaj, W. (2014). A survey of RDF store solutions. Paper presented at the Engineering, Technology and Innovation (ICE), 2014 International ICE Conference on.

Myung, J., Yeon, J., & Lee, S.-g. (2010). SPARQL basic graph pattern processing with iterative MapReduce. Paper presented at the Proceedings of the 2010 Workshop on Massive Data Analytics on the Cloud.

Neumann, T., & Weikum, G. (2008). RDF-3X: a RISC-style engine for RDF. Proceedings of the VLDB Endowment, 1(1), 647-659.

Nicolaidis, I., & Iniewski, K. (2017). Building Sensor Networks: CRC Press.

Polato, I., Ré, R., Goldman, A., & Kon, F. (2014). A comprehensive view of Hadoop research—A systematic literature review. Journal of Network and Computer Applications, 46, 1-25.

Przyjaciel-Zablocki, M., Schätzle, A., Hornung, T., & Lausen, G. (2011). Rdfpath: Path query processing on large rdf graphs with mapreduce. Paper presented at the Extended Semantic Web Conference.

Przyjaciel-Zablocki, M., Schätzle, A., Skaley, E., Hornung, T., & Lausen, G. (2013). Map-side merge joins for scalable SPARQL BGP processing. Paper presented at the Cloud Computing Technology and Science (CloudCom), 2013 IEEE 5th International Conference on.

Punnoose, R., Crainiceanu, A., & Rapp, D. (2012). Rya: a scalable RDF triple store for the clouds. Paper presented at the Proceedings of the 1st International Workshop on Cloud Intelligence.

Punnoose, R., Crainiceanu, A., & Rapp, D. (2015). SPARQL in the cloud using Rya. Information Systems, 48, 181-195.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Schätzle, A., Przyjaciel-Zablocki, M., Hornung, T., & Lausen, G. (2013). PigSPARQL: a SPARQL query processing baseline for big data. Paper presented at the Proceedings of the 12th International Semantic Web Conference (Posters & Demonstrations Track)-Volume 1035.

Schätzle, A., Przyjaciel-Zablocki, M., Neu, A., & Lausen, G. (2014). Sempala: interactive SPARQL query processing on hadoop. Paper presented at the International Semantic Web Conference.

Sun, Y., & Jara, A. J. (2014). An extensible and active semantic model of information organizing for the Internet of Things. Personal and Ubiquitous Computing, 18(8), 1821-1833. doi:10.1007/s00779-014-0786-z

Tian, Y., Du, J., Wang, H., Ni, Y., & Yu, Y. (2012). Hadooprdf: A scalable rdf data analysis system. 8th ICIC, 633-641.

Tiwana, A., & Balasubramaniam, R. (2001). Integrating knowledge on the web. IEEE Internet Computing, 5(3), 32-39.

W3C. (2016). RDF  Store Benchmarking. Paper presented at the Retrieved from https://www.w3.org/wiki/RdfStoreBenchmarking.

White, T. (2012-important-buildingblocks). Hadoop: The definitive guide: ” O’Reilly Media, Inc.”.

Yang, H.-c., Dasdan, A., Hsiao, R.-L., & Parker, D. S. (2007). Map-reduce-merge: simplified relational data processing on large clusters. Paper presented at the Proceedings of the 2007 ACM SIGMOD international conference on Management of data.

Zeng, K., Yang, J., Wang, H., Shao, B., & Wang, Z. (2013). A distributed graph engine for web scale RDF data. Paper presented at the Proceedings of the VLDB Endowment.

Various Efforts to Improve Performance of Incremental Computation

Dr. Aly, O.
Computer Science

Hadoop was developed by Yahoo and Apache to run jobs in hundreds of terabytes of data (Yan, Yang, Yu, Li, & Li, 2012).  A various large corporation such as Facebook, Amazon have used Hadoop as it offers high efficiency, high scalability, and high reliability (Yan et al., 2012).  Hadoop has faced various limitation such as low-level programming paradigm and schema, strictly batch processing, time skew and incremental computation (Alam & Ahmed, 2014).  The incremental computation is regarded to be one of the major shortcomings of Hadoop technology (Alam & Ahmed, 2014).   The efficiency on handling incremented data is at the expense of losing the incompatibility with programming models which are offered by non-incremental systems such as MapReduce, which requires the implementation of incremental algorithms and increasing the complexity of the algorithm and the code (Alam & Ahmed, 2014).   The caching technique is proposed by (Alam & Ahmed, 2014) as a solution.  This caching solution will be at three levels; the Job, the Task and the Hardware (Alam & Ahmed, 2014). 

Incoop is another solution proposed by (Bhatotia, Wieder, Rodrigues, Acar, & Pasquin, 2011).   The Incoop proposed solution is to extend the open-source implementation of Hadoop of MapReduce programming paradigm to run unmodified MapReduce program in an incremental method (Bhatotia et al., 2011; Sakr & Gaber, 2014).  Incoop allows programmers to increment the MapReduce programs automatically without any modification to the code (Bhatotia et al., 2011; Sakr & Gaber, 2014).  Moreover, information about the previously executed MapReduce tasks are recorded by Incoop to be reused in subsequent MapReduce computation when possible (Bhatotia et al., 2011; Sakr & Gaber, 2014).  

The Incoop is not a perfect solution, and it has some shortcomings which are addressed by (Sakr & Gaber, 2014; Zhang, Chen, Wang, & Yu, 2015).  Some enhancements are implemented to Incoop to include incremental HDFS called Inc-HDFS, Contraction Phase, and “Memoization-aware Scheduler” (Sakr & Gaber, 2014).  The Inc-HDFS provides the delta technique in the inputs of two consecutive job runs and splits the input based on the contents where the compatibility with HDFS is maintained.  The Contraction phase is a new phase in the MapReduce framework consisting of breaking up the Reduce tasks into smaller sub-computation forming an inverted tree allowing the small portion of the input changes to the path from the corresponding leaf to the root to be computed (Sakr & Gaber, 2014).  The Memoization-aware Scheduler is a modified version of the scheduler of Hadoop taking advantage of the locality of memorized results (Sakr & Gaber, 2014).

Another solution called  i2MapReduce proposed by (Zhang et al., 2015) which was compared to Incoop by (Zhang et al., 2015).  The i2MapReduce does not perform the task-level computation but rather a key-value pair level incremental processing.  This solution also supports more complex iterative computation, which is used in data mining and reduces the I/O overhead by applying various techniques (Zhang et al., 2015).  IncMR is an enhanced framework for the large-scale incremental data processing (Yan et al., 2012).  It inherits the simplicity of the standard MapReduce, it does not modify HDFS and utilizes the same APIs of the MapReduce (Yan et al., 2012).  When using IncMR, all programs can complete incremental data processing without any modification (Yan et al., 2012). 

In conclusion, various efforts are exerted by researchers to overcome the incremental computation limitation of Hadoop, such as Incoop, Inc-HDFS, i2MapReduce, and IncMR.  Each proposed solution is an attempt to enhance and extend the standard Hadoop to avoid overheads such as I/O, to increase the efficiency, and without increasing the complexing of the computation and without causing any modification to the code.

References

Alam, A., & Ahmed, J. (2014). Hadoop architecture and its issues. Paper presented at the Computational Science and Computational Intelligence (CSCI), 2014 International Conference on.

Bhatotia, P., Wieder, A., Rodrigues, R., Acar, U. A., & Pasquin, R. (2011). Incoop: MapReduce for incremental computations. Paper presented at the Proceedings of the 2nd ACM Symposium on Cloud Computing.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Yan, C., Yang, X., Yu, Z., Li, M., & Li, X. (2012). Income: Incremental data processing based on MapReduce. Paper presented at the Cloud Computing (CLOUD), 2012 IEEE 5th International Conference on.

Zhang, Y., Chen, S., Wang, Q., & Yu, G. (2015). i^2MapReduce: Incremental MapReduce for Mining Evolving Big Data. IEEE transactions on knowledge and data engineering, 27(7), 1906-1919.