Advanced Processing Techniques for Big Data

Dr. Aly, O.
Computer Science

Abstract

The purpose of this project is to discuss and analyze advanced processing techniques for Big Data.  There are various processing systems such as Iterative Processing, Graph Processing, Stream Processing also known as Event Processing or Real-Time Processing, and Batch Processing.    A MapReduce-based framework such as Hadoop supports the Batch-Oriented Processing.  MapReduce lacks the built-in support for the Iterative Processing which requires parsing datasets iteratively, large Graph Processing, and Stream Processing.  Thus, various models such as Twister, and iMapReduce are introduced to improve the Iterative Processing of the MapReduce, and Surfer, Apache Hama, Pregel, GraphLab for large Graph Processing.  Other models are also introduced to support the Stream Processing such as Aurora, Borealis, and IBM InfoSphere Streams.  This project focuses the discussion and the analysis of the Stream Processing models of Aurora and Borealis.  The discussion and the analysis of Aurora model includes an overview of the Aurora model as Streaming Processing Engine (SPE), followed by the Aurora Framework and the fundamental components of the Aurora topology.  The Query Model of Aurora, which is known as Stream Query Algebra “SQuAI,” supports seven operators constructing the Aurora network and queries for expressing its stream processing requirements.   The discussion and analysis also include the SQuAl and the Query Model, the Run-Time Framework and the Optimization systems to overcome bottlenecks at the network.  The Aurora* and Medusa as Distributed Stream Processing are also discussed and analyzed.  The second SPE is Borealis which is a Distributed SPE.  The discussion and the analysis of the Borealis involved the framework, the query model, and the optimization techniques to overcome bottlenecks at the network.  A comparison between Aurora and Borealis is also discussed and analyzed. 

Keywords: Stream Processing, Event Processing, Aurora, Borealis.

Introduction

            When dealing with Big Data, its different characteristics and attributes such as volume, velocity, variety, veracity, and value must be taken into consideration (Chandarana & Vijayalakshmi, 2014).   Thus, different types of the framework are required to run different types of analytics (Chandarana & Vijayalakshmi, 2014).  The workload of the large-scale data processing has different types of workloads (Chandarana & Vijayalakshmi, 2014).   Organizations deploy a combination of different types of workloads to achieve the business goal (Chandarana & Vijayalakshmi, 2014).  These various types of workloads involve Batch-Oriented Processing, Online-Transaction Processing, Stream Processing, Interactive ad-hoc Query and Analysis (Chandarana & Vijayalakshmi, 2014), and Online Analytical Processing (Erl, Khattak, & Buhler, 2016). 

For the Batch-Oriented Processing, a MapReduce-based framework such as Hadoop can be deployed for recurring tasks such as large-scale Data Mining or Aggregation (Chandarana & Vijayalakshmi, 2014; Erl et al., 2016; Sakr & Gaber, 2014).  For the OLTP such as user-facing e-commerce transactions, the Apache HBase can be deployed (Chandarana & Vijayalakshmi, 2014).  The OLTP system processes transaction-oriented data (Erl et al., 2016).  For the Stream Processing, Storm framework can be deployed to handle stream sources such as social media feeds or sensor data (Chandarana & Vijayalakshmi, 2014).  For the Interactive ad-hoc Query and Analysis, the Apache Drill framework can be deployed (Chandarana & Vijayalakshmi, 2014).  For the OLAP, which form an integral part of Business Intelligence, Data Mining, and Machine Learning, the systems are used for processing data analysis queries (Erl et al., 2016).   

Apache Hadoop framework allows distributed processing for large data sets across clusters of computers using simple programming models (Chandarana & Vijayalakshmi, 2014).   The Apache Hadoop framework involves four major modules; Hadoop Core, Hadoop Distributed Files System (HDFS), Hadoop YARN, and Hadoop Map Reduce.  The Hadoop Core is used as the common utilities which support other modules.  The HDFS module provides high throughput access to application data.  The Hadoop YARN module is for job scheduling and resource management.  The Hadoop MapReduce is for parallel processing of large-scale dataset (Chandarana & Vijayalakshmi, 2014).   

Moreover, there are various processing systems such as Iterative Processing (Schwarzkopf, Murray, & Hand, 2012), Graph Processing, and Stream Processing (Sakr & Gaber, 2014; Schwarzkopf et al., 2012).  The Iterative Processing systems utilize the in-memory caching (Schwarzkopf et al., 2012).   Many data analysis application requires the iterative processing of the data which includes algorithms for text-based search and machine learning.  However, because MapReduce lacks the built-in support for iterative processing which requires parsing datasets iteratively (Zhang, Chen, Wang, & Yu, 2015; Zhang, Gao, Gao, & Wang, 2012), various models such as Twister, HaLoop, and iMapReduce are introduced to improve the iterative processing of the MapReduce (Zhang et al., 2015).   With regard to the Graph Processing, MapReduce is suitable for processing flat data structures, such as vertex-oriented tasks and propagation is optimized for edge-oriented tasks on partitioned graphs. However, to improve the programming models for large graph processing, various models such as Surfer (Chen, Weng, He, & Yang, 2010; Chen et al., 2012), GraphX (Gonzalez et al., 2014), Apache Hama, GoldenOrb, Giraph, Phoebus, GPS (Cui, Mei, & Ooi, 2014), Pregel (Cui et al., 2014; Hu, Wen, Chua, & Li, 2014; Sakr & Gaber, 2014), and GraphLab (Cui et al., 2014; Hu et al., 2014; Sakr & Gaber, 2014).  With regard to the Steam Processing, because MapReduce is design for Batch-Oriented Computation such as log analysis and text processing (Chandarana & Vijayalakshmi, 2014; Cui et al., 2014; Erl et al., 2016; Sakr & Gaber, 2014; Zhang et al., 2015; Zhang et al., 2012), and is not adequate for supporting real-time stream processing tasks (Sakr & Gaber, 2014) various Steam Processing models are introduced such as DEDUCE, Aurora, Borealis, IBM Spade, StreamCloud, Stormy (Sakr & Gaber, 2014), Twitter Storm (Grolinger et al., 2014; Sakr & Gaber, 2014), Spark Streaming, Apache Storm (Fernández et al., 2014; Gupta, Gupta, & Mohania, 2012; Hu et al., 2014; Scott, 2015), StreamMapReduce (Grolinger et al., 2014), Simple Scalable Streaming System (S4) (Fernández et al., 2014; Grolinger et al., 2014; Gupta et al., 2012; Hu et al., 2014; Neumeyer, Robbins, Nair, & Kesari, 2010-639), and IBM InfoSphere Streams (Gupta et al., 2012).

            The project focuses on two models of the Stream Processing.  The discussion and the analysis will be on Aurora stream processing systems and Borealis stream processing systems.  The discussion and the analysis will also address their characteristics, architectures, performance optimization capability, and scalability.  The project will also discuss and analyze the performance bottlenecks, the cause of such bottlenecks and the strategies to remove these bottlenecks.  The project begins with a general discussion on the Stream Processing.

Stream Processing Engines

            Stream Processing is defined by (Manyika et al., 2011) as technologies designed to process large real-time streams of event data.  The Stream Processing allows applications such as algorithms trading in financial services, RFID even processing applications, fraud detection (Manyika et al., 2011; Scott, 2015), process monitoring, and location-based services in telecommunications (Manyika et al., 2011).  Stream Processing reflects the Real-Time Streaming, and also known as “Event Stream Processing” (Manyika et al., 2011).   The “Event Stream Processing” is also known as “Streaming Analytics” which is used to process customer-centric data “on the fly” without the need for long-term storage (Spiess, T’Joens, Dragnea, Spencer, & Philippart, 2014).

            In the Real-Time mode, the data is processed in-memory because it is captured before it gets persisted to the disk (Erl et al., 2016).  The response time ranges from a sub-second to under a minute (Erl et al., 2016).  The Real-Time mode reflects the velocity feature and characteristics of Big Data datasets (Erl et al., 2016).  When Big Data is processed using the Real-Time or Even Stream Processing, the data arrives continuously in a stream, or at an interval in events (Erl et al., 2016).  The individual data for streaming is small. However, the continuous nature leads to such streamed data result in very large datasets (Erl et al., 2016; Gradvohl, Senger, Arantes, & Sens, 2014).   Real-Time mode also involves “Interactive Mode” (Erl et al., 2016). The “Interactive Mode” refers to the Query Processing in the Real-Time (Erl et al., 2016). 

            The systems of the Event Stream Processing (ESP) are designed to provide high-performance analysis of streams with low latency (Gradvohl et al., 2014).  The first Event Stream Processing (ESP) systems, which were developed in the early 2000s, include Aurora, Borealis, STREAM, TelegraphCQ, NiagaraCQ, and Cougar (Gradvohl et al., 2014).  During that time, the systems were centralized systems namely running on a single server aiming to overcome the issues of stream processing by the traditional database (Gradvohl et al., 2014).   Tremendous efforts have been exerted to enhance and improve the data stream processing from centralized stream processing systems to stream processing engines with the ability to distribute queries among a cluster of nodes (Sakr & Gaber, 2014).  This next discussion will focus on two of the scalable processing of streaming data; Aurora and Borealis. 

  1. Aurora Streaming Processing Engine

Aurora was introduced through a project effort from Brandeis University, Brown University, and MIT (Abadi et al., 2003; Sakr & Gaber, 2014).   The prototype of Aurora implementation was introduced in 2003 (Abadi et al., 2003; Sakr & Gaber, 2014).  The GUI interface of Aurora is based on Java allowing construction and execution of Aurora networks, which supports the construction of arbitrary Aurora networks and query (Abadi et al., 2003; Sakr & Gaber, 2014).  The Aurora system is described as a processing model to manage data streams for monitoring applications, which are distinguished substantially from the traditional business data processing (Abadi et al., 2003; Sakr & Gaber, 2014).   The main aim of the monitoring applications is to monitor continuous streams of data (Abadi et al., 2003; Sakr & Gaber, 2014).  As an example of these Monitoring, Applications is the military applications which monitor readings from sensors worn by soldiers such as blood pressure, heart rate, position, and so forth.  Another example of these Monitoring Applications includes the financial analysis applications which monitor the stock data streams reported from various stock exchanges (Abadi et al., 2003; Sakr & Gaber, 2014).  The Tracking Applications which monitor the location of large numbers of the object are other types of Monitoring Applications (Abadi et al., 2003; Sakr & Gaber, 2014). 

Due to the nature of the Monitoring Applications, they can benefit from the Database Management System (DBMS) because of the high volume of monitored data and the requirement of the query for these applications (Abadi et al., 2003; Sakr & Gaber, 2014).  However, the existing DBMS systems are unable to fully support such applications because DBMS systems target Business Applications and not Monitoring Applications (Abadi et al., 2003; Sakr & Gaber, 2014).  DBMS gets its data from humans issuing transactions, while the Monitoring Applications get their data from external sources such as sources (Abadi et al., 2003; Sakr & Gaber, 2014).  The role of DBMS when supporting the Monitoring Applications is to detect and alert humans of any abnormal activities (Abadi et al., 2003; Sakr & Gaber, 2014).  This model is described as DBMS-Active, Human-Passive (DAHP) Model (Abadi et al., 2003; Sakr & Gaber, 2014).  This model is different from the traditional DBMS model which is described as Human-Active, DBMS-Passive (HADP) Model, where humans initiate queries and transactions on the DBMS passive repository (Abadi et al., 2003; Sakr & Gaber, 2014).

Besides, the Monitoring Applications require not only the latest value of the object but also the historical values (Abadi et al., 2003; Sakr & Gaber, 2014).   The Monitoring Applications are trigger-oriented applications to send the alert message when abnormal activities are detected (Abadi et al., 2003; Sakr & Gaber, 2014).  Besides, the Monitoring Applications requires approximate answers due to the nature of the data stream processing where data can get lost or omit for processing reasons.  The last characteristic of the Monitoring Applications involves the Real-Time requirement and the Quality-of-Service (QoS).  Table 1 summarizes these five major characteristics of the Monitoring Applications, for which Aurora systems are designed to manage data streams.

Table 1: Monitoring Applications Characteristics.

1.1 Aurora Framework

The traditional DBM could not be used to implement these Monitoring Applications with these challenging characteristics (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  The prevalent requirements of these Monitoring Applications are the data and information streams, triggers, imprecise data, and real-time (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  Thus, Aurora systems are designed to support these Monitoring Applications with these challenging characteristics and requirements (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  The underlying concept of the Aurora System Model is to process the incoming data streams as an application administrator and use boxes and arrows paradigm as a data-flow system, where the tuples flow through a loop-free, directed graph of processing operations (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  The output streams are presented to applications which get programmed to deal with the asynchronous tuples in an output stream (Abadi et al., 2003; Sakr & Gaber, 2014).   The Aurora System Model also maintains historical storage to support ad-hoc queries (Abadi et al., 2003; Sakr & Gaber, 2014).  The Aurora systems handle data from a variety of sources such as computer programs which generate values at regular or irregular intervals or hardware sensors (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).  Figure 1 illustrates the Aurora System Model reflecting the input data stream, the operator boxes, the continuous and ad-hoc queries, and the output to applications.  

Figure 1:  Overview of Aurora System Model and Architecture.  Adapted from (Abadi et al., 2003; Carney et al., 2002; Cherniack et al., 2003; Sakr & Gaber, 2014).

1.2  Aurora Query Model: SQuAl Using Seven Primitive Operations

            The Aurora Stream Query Algebra (SQuAl) supports seven operators which are used to construct Aurora networks and queries for expressing its stream processing requirements (Abadi et al., 2003; Sakr & Gaber, 2014).  Many of these operations have analogs in the relational query operation.  For instance, the “filter” operator in Aurora Query Algebra, which applies any number of predicates to each incoming tuple, routing the tuples based on the satisfied predicates, is like the relational operator “select” (Abadi et al., 2003; Sakr & Gaber, 2014).   The “aggregate” operators in Aurora Query Algebra computes stream aggregation to address the fundamental push-based nature of data streams, applying a function such as a moving average across a window of values in a stream (Abadi et al., 2003; Sakr & Gaber, 2014).  The windowed operations are required when the data is stale or time imprecise (Abadi et al., 2003; Sakr & Gaber, 2014).  The application administrator in the Aurora System Model can connect the output of one box to the input of several others which implements the “implicit split” operations rather than the “explicit split” of the relational operations (Abadi et al., 2003; Sakr & Gaber, 2014).  Besides, the Aurora System Model contains an “explicit union” operation where two streams can be put together (Abadi et al., 2003; Sakr & Gaber, 2014).   The Aurora System Model also represents a collection of streams with a common schema, called “Arcs” (Abadi et al., 2003; Sakr & Gaber, 2014).   The Arc does not have any specific number of streams which makes it easier to have streams come and goes without any modifications to the Aurora network (Abadi et al., 2003; Sakr & Gaber, 2014).

            In Aurora Query Model, the stream is an append-only sequence of tuples with uniform schema, where each tuple in a stream has a timestamp for QoS calculations (Abadi et al., 2003; Sakr & Gaber, 2014).  When using Aurora Query Model, there is no arrival order assumed which help in gaining latitude for producing outputs out of order for serving high-priority tuples first (Abadi et al., 2003; Sakr & Gaber, 2014).   Moreover, this no arrival order assumption also helps in redefining the windows for attributes, and in merging multiple streams (Abadi et al., 2003; Sakr & Gaber, 2014).   Some operators are described as “order-agnostic” such as such as Filter, Map, and Union.  Some other operators are described as “order-sensitive” such as BSort, Aggregate, Join, and Resample where they can only be guaranteed to execute with finite buffer space and in a finite time if they can assume some ordering over their input streams (Abadi et al., 2003; Sakr & Gaber, 2014).  Thus, the order-sensitive operators require order specification arguments which indicate the arrival order of the expected tuple (Abadi et al., 2003; Sakr & Gaber, 2014).   

            The Aurora Query Model supports three main operations modes: (1) the continuous queries of the real-time processing, (2) the views, and (2) the ad-hoc queries (Abadi et al., 2003; Sakr & Gaber, 2014).  These three operations modes utilize the same conceptual building blocks technique processing flows based on QoS specifications (Abadi et al., 2003; Sakr & Gaber, 2014).  In Aurora Query Model, each output is associated with two-dimensional QoS graphs which specify the utility of the output with regard to several performance-related and quality-related attributes (Abadi et al., 2003; Sakr & Gaber, 2014).   The stream-oriented operators which constitute the Aurora network and queries are designed to operate in a data flow mode where data elements are processed as they appear on the input (Abadi et al., 2003; Sakr & Gaber, 2014).

1.3 Aurora Run-Time Framework and Optimization

The main purpose of the Aurora run-time operations is to process data flows through a potentially large workflow diagram (Abadi et al., 2003; Sakr & Gaber, 2014).  The Aurora Run-Time Architecture involves five main techniques: (1) the QoS data structure, (2) the Aurora Storage Management (ASM), (3) the Run-Time Scheduling (RTS), (4) the Introspection, and (5) the Load Shedding.   

The QoS is a multi-dimensional function which involves response times, tuple drops, and values produced (Abadi et al., 2003; Sakr & Gaber, 2014).  The ASM is designed to store all tuples required by the Aurora network.  The ASM requires two main operations; one to manage storage for the tuples being passed through an Aurora network, and the second operations must maintain extra tuple storage which may be required at the connection point.  Thus, the ASM involves two main management operations: (1) the Queue Management, and (2) the Connection Point Management (Abadi et al., 2003; Sakr & Gaber, 2014).  

The RTS in Aurora is challenging because of the need to simultaneously address several issues such as large system scale, real-time performance requirements, and dependencies between box executions (Abadi et al., 2003; Sakr & Gaber, 2014).  Besides, the processing of tuple in Aurora spans many scheduling and execution steps, where the input tuple goes through many boxes before potentially contributing to an output stream, which may require secondary storage (Abadi et al., 2003; Sakr & Gaber, 2014). The Aurora systems reduce the overall processing costs by using two main non-linearities when processing tuples: “Interbox Non-Linearity,” and the “Intrabox Non-Linearity” techniques.  The Aurora systems take advantages of the Non-Linearity technique in both the Interbox and the Intrabox tuple processing through the “Train Scheduling” (Abadi et al., 2003; Sakr & Gaber, 2014).  The “Train Scheduling” is a set of scheduling heuristics which attempt (1) to have boxes queue as many tuples as possible without processing, thus generating long tuple trains, (2) to process complete trains at once, thus using the “Intrabox Non-Linearity” technique, and (3) to pass them to subsequent boxes without having to go to disk, thus employing the “Interbox Non-linearity” technique (Abadi et al., 2003; Sakr & Gaber, 2014).  The primary goal of the “Train Scheduling” is to minimize the number of I/O operations performed per tuple.  The secondary goal of the “Train Scheduling is to minimize the number of box calls made per tuple. With regard to the Introspection technique, Aurora systems employ static and dynamic or run-time introspection techniques to predict and detect overload situation (Abadi et al., 2003; Sakr & Gaber, 2014).  The purpose of the static introspection technique is to determine if the hardware running the Aurora network is sized correctly.  The dynamic analysis which is based on the run-time introspection technique uses timestamps for all tuples (Abadi et al., 2003; Sakr & Gaber, 2014).   With regard to the “Load Shedding”, Aurora systems reduces the volume of the tuple processing via the load shedding if an overload is detected as a result of the static or dynamic analysis, by either dropping the tuples or filtering the tuples (Abadi et al., 2003; Sakr & Gaber, 2014).   Figure 2 illustrates the Aurora Run-Time Architecture, adapted from (Abadi et al., 2003; Sakr & Gaber, 2014).

Figure 2:  Aurora Run-Time Framework. Adapted from (Abadi et al., 2003).

The Aurora optimization techniques involve two main optimization systems: (1) the dynamic continuous query optimization, and (2) the ad-hoc query optimization. The dynamic continuous query optimization involves the inserting projections, the combining boxes, and the reordering boxes optimization techniques (Abadi et al., 2003). The ad-hoc query optimization involves the historical information because Aurora semantics require the historical sub-network to be run first.  This historical information is organized in a B-tree data model (Abadi et al., 2003).  The initial boxes in an ad-hoc query can pull information from the B-tree associated with the corresponding connection point (Abadi et al., 2003).  When the historical operation is finished, the Aurora optimization technique switches the implementation to the standard push-based data structures and continues processing in the conventional mode (Abadi et al., 2003).

1.4 Aurora* and Medusa for Distributed Stream Processing

The Aurora System is a centralized stream processor.   However, in (Cherniack et al., 2003).   Aurora* and Medusa are proposed for distributed processing.  Several architectural issues must be addressed for building a large-scale distributed version of a stream processing system such as Aurora.  In (Cherniack et al., 2003), the problem is divided into two categories:  intra-participant distribution, and inter-participant distribution.  The intra-participant distribution involves small-scale distribution within one administrative domain which can be handled by the proposed model of Aurora* (Cherniack et al., 2003).  The inter-participant distribution involves large-scale distribution across administrative boundaries, which is handled by the proposed model of Medusa (Cherniack et al., 2003).

2. Borealis Streaming Processing Engine

Borealis is described as the second generation of the Distributed SPE which also got developed at Brandeis University, Brown University and MIT (Abadi et al., 2005; Sakr & Gaber, 2014).  The Borealis streaming model inherits the core functionality of the stream processing from Aurora model, and the core functionality of the distribution from Medusa model (Abadi et al., 2005; Sakr & Gaber, 2014).   The Borealis model is an expansion and extension of both models to provide more advanced capabilities and functionalities which are commonly required by newly-emerging stream processing applications (Abadi et al., 2005; Sakr & Gaber, 2014).   Borealis is regarded to be the successor to Aurora (Abadi et al., 2005; Sakr & Gaber, 2014).

The second generation of the SPE has three main requirements which are critical and at the same time challenging.  The first requirement involves the “Dynamic Revision of Query Results” (Abadi et al., 2005; Sakr & Gaber, 2014).  Applications are forced to live with imperfect results because corrects or updates to previously processed data are only available after the fact unless the system has techniques to revise its processing and results to take into account newly available data or updates (Abadi et al., 2005; Sakr & Gaber, 2014).  The second requirement for the second generation of the SPE involves “Dynamic Query Modification,” which allows runtime with low overhead, fast and automatic modification (Abadi et al., 2005; Sakr & Gaber, 2014).  The third requirement for the second generation of SPE involves “Flexible and Highly-Scalable Optimization,” where the optimization problem will be more balanced between the sensor-heavy and server-heavy optimization.  The more flexible optimization structure is needed to deal with a large number of devices and perform cross-network sensor-heavy server-heavy resource management and optimization (Abadi et al., 2005; Sakr & Gaber, 2014). However, this requirement for such optimization framework has two additional challenges.  The first challenge is the ability to simultaneously optimize different QoS metrics such as processing latency, throughput, or sensor lifetime (Abadi et al., 2005; Sakr & Gaber, 2014).  The second challenge of such flexible optimization structure and framework is the ability to perform optimizations at different levels of granularity at the node level, sensor network level, a cluster of sensors and server level and so forth (Abadi et al., 2005; Sakr & Gaber, 2014).  These advanced challenges, capabilities, and requirements for the second-generation of SPE are added to the classical architecture of the SPE to form and introduce Borealis framework (Abadi et al., 2005; Sakr & Gaber, 2014).

2.1 The Borealis Framework

            The Borealis framework is a distributed stream processing engine where the collection of continuous queries submitted to Borealis can be seen as a giant network of operators whose processing is distributed to multiple sites (Abadi et al., 2005; Sakr & Gaber, 2014). There is a sensor proxy interface which acts as another Borealis site (Abadi et al., 2005; Sakr & Gaber, 2014).  The sensor networks can participate in query processing behind that sensor proxy interface (Abadi et al., 2005; Sakr & Gaber, 2014).

            Borealis server runs on each node with Global Catalog (GC), High Availability (HA) module, Neighborhood Optimizer (NHO), Local Monitor (LM), Admin, Query Processor (QP)at the top and meta-data, control and data at the bottom of the framework. The GC can be centralized or distributed across a subset of processing nodes, holding information about the complete query network and the location of all query fragments (Abadi et al., 2005; Sakr & Gaber, 2014).  The HA modules monitor each node to handle any failure (Abadi et al., 2005; Sakr & Gaber, 2014).  The NHO utilizes the local information and other information from other NHOs to improve the load balance between the nodes (Abadi et al., 2005; Sakr & Gaber, 2014).  The LM collects performance-related statistics, while the local system reports to the local optimizer as well as the NHOs (Abadi et al., 2005; Sakr & Gaber, 2014).  The QP is the core component of the Borealis’ framework.  The actual execution of the query is implemented in the QP (Abadi et al., 2005; Sakr & Gaber, 2014).  The QP, which is a single site processor, receives the input data streams, and the result is pulled through the I/O Queue, routing the tuples to and from remote Borealis node and clients (Abadi et al., 2005; Sakr & Gaber, 2014).  The Admin module controls the QP, and issues system control messages (Abadi et al., 2005; Sakr & Gaber, 2014).  These messages are pushed to the Local Optimizer (LO), which communicates with Run-Time major components of the QP to enhance the performance.  These Run-Time major components of the Borealis include (1) the Priority Scheduler, (2) Box Processors, and (3) Load Shedder.  The Priority Scheduler determines the order of box execution based on the priority of the tuples.   The Box Processors can change the behavior during the run-time based on the messages received from the LO.  The Load Shedder discards the low-priority tuples when the node is overloaded (Abadi et al., 2005; Sakr & Gaber, 2014).  The Storage Manager is part of the QP and responsible for storing and retrieving data which flows through the arcs of the local query diagram.  The Local Catalog is another component of the QP to store the query diagram description and metadata and is accessible by all components.  Figure 3 illustrates Borealis’ framework, adapted from (Abadi et al., 2005; Sakr & Gaber, 2014).

Figure 3.  Borealis’ Framework, adapted from (Abadi et al., 2005; Sakr & Gaber, 2014).

2.2 Borealis’ Query Model and Comparison with Aurora’s Query Model

            Borealis inherits the Aurora Model of boxes-and-arrows to specify the continuous queries, where the boxes reflect the query operators and the arrows reflect the data flow between the boxes (Abadi et al., 2005; Sakr & Gaber, 2014).  Borealis extends the data model of Aurora by supporting three types of messages of the insertion, the deletion, and the replacement (Abadi et al., 2005; Sakr & Gaber, 2014).  The Borealis’ queries are an extended version of Aurora’s operators to support revision messages (Abadi et al., 2005; Sakr & Gaber, 2014).   The query model of Borealis supports the modification of the box semantic during the runtime (Abadi et al., 2005; Sakr & Gaber, 2014).  The QoS in Borealis is like in Aurora forms the basis of resource management decision.  However, while each query output is provided with QoS function in Aurora’ model, Borealis allows QoS to be predicted at any point in the data flow (Abadi et al., 2005; Sakr & Gaber, 2014).  Thus, Borealis supports a Vector of Metrics for supplied messages to allow such prediction of QoS.

In the context of the query result revision, Borealis supports “replayable” query diagram and the processing scheme revision.  While Aurora has an append-only model where a message cannot be modified once it is placed on a stream providing an approximate or imperfect result, the Borealis’ model supports the modification of messages to processes the query intelligently and provide correct query results (Abadi et al., 2005; Sakr & Gaber, 2014).   The query diagram must be replayable when messages are revised and modified because the processing of the modified message must replay a portion of the past with the modified value (Abadi et al., 2005; Sakr & Gaber, 2014).   This replaying process is also useful for recovery and high availability (Abadi et al., 2005; Sakr & Gaber, 2014). This dynamic revision with the replaying process can add more overhead.  Thus, the “closed” model is used to generates deltas to show the effects of the revisions instead of the entire result.

In the context of the queries modification, Borealis provides online modification of continuous queries by supporting the control lines, and the time travel features.  The control lines extend the basic query model of Aurora to change operator parameters and operators themselves during the run-time (Abadi et al., 2005; Sakr & Gaber, 2014).  The Borealis’ boxes contain the standard data input lines and special control lines which carry messages with revised box parameters and new box function (Abadi et al., 2005; Sakr & Gaber, 2014).   Borealis provides a new function called “Bind” to bind the new parameters to free variables within a function definition, which will lead to a new function to be created (Abadi et al., 2005; Sakr & Gaber, 2014).   The Aurora’s connections points are leveraged to enable the time travel in Borealis.  The original purpose of the connection points was to support ad-hoc queries, which can query historical and run-time data.  This concept is extended in Borealis model to include connection point views to enable time travel applications, ad-hoc queries and the query diagram to access the connection points independently and in parallel (Abadi et al., 2005; Sakr & Gaber, 2014).   The connection point views include two operations to enable the time travel:  the replay operation, and the undo operation.  

2.3  The Optimization Model of Borealis

Borealis has an optimizer framework to optimize processing across a combined sensor and server network, to deal with the QoS in stream-based applications, and to support scalability, size-wise and geographical stream-based applications.   The optimization model contains multiple collaborating monitoring and optimization components.  The monitoring components include the local monitor at every site and end-point monitor at output sites.  The optimization components include the global optimizer, neighborhood optimizer, and local optimizer (Abadi et al., 2005).   While Aurora evaluated the QoS only at outputs and had a difficult job inferring QoS at upstream nodes, Borealis can evaluate the predicted-QoS score function on each message by utilizing the values of the Metrics Vector (Abadi et al., 2005).   Borealis utilizes Aurora’ concept of train scheduling of boxes and tuples to reduce the scheduling overhead.  While Aurora processes the message in order of arrival, Borealis contains box scheduling flexibility which allows processing message out of order because the revision technique can be used to process them later as insertions (Abadi et al., 2005).  Borealis offers a superior load shedding technique than Aurora’s technique (Abadi et al., 2005).  The Load Shedder in Borealis detects and handle overload situations by adding the “drop” operators to the processing network (Ahmad et al., 2005).  The “drop” operator aims to filter out messages, either based on the value of the tuple or in a randomized fashion, meaning out of order to overcome the overload (Ahmad et al., 2005).  The higher quality outputs can be achieved by allowing nodes in a chain to coordinate in choosing where and how much load to shed.  Distributed load shedding algorithms are used to collect local statistics from nodes and pre-computes potential drop plans at the compilation time (Ahmad et al., 2005).  Figure 4 illustrates the Optimization Components of Borealis, adapted from (Abadi et al., 2005).

Figure 4.  The Optimization Components of Borealis, adapted from (Abadi et al., 2005).

            Borealis provides a fault-tolerance technique in a distributed SPE such as replication, running multiple copies of the same query network on distinct processing nodes (Abadi et al., 2005; Balazinska, Balakrishnan, Madden, & Stonebraker, 2005).  When a node experiences a failure on one of its input streams, the node tries to find an alternate upstream replica.  All replicas must be consistent.  To ensure such consistency, data-serializing operator “SUnion” is used to take multiple streams as input and produces one output stream with deterministically ordered tuples, to ensure all operators of the replica processing the same input in the same order (Abadi et al., 2005; Balazinska et al., 2005).   To provide high availability, each SPE ensures that input data is processed and results forwarded within a user-specified time threshold of its arrival (Abadi et al., 2005; Balazinska et al., 2005).  When the failure is corrected, each SPE which experienced tentative data reconciles its state and stabilizes its output by replacing the previously tentative output with stable data tuples forwarded to downstream clients, reconciling the state of SPE based on checkpoint/redo, undo/redo and the revision tuples new concept (Abadi et al., 2005; Balazinska et al., 2005).

Conclusion

                This project discussed and analyzed advanced processing of Big Data.  There are various processing systems such as Iterative Processing, Graph Processing, Stream Processing also known as Event Processing or Real-Time Processing, and Batch Processing.    A MapReduce-based framework such as Hadoop supports the Batch-Oriented Processing.  MapReduce also lacks the built-in support for the Iterative Processing which requires parsing datasets iteratively, large Graph Processing, and Stream Processing.  Thus, various models such as Twister, HaLoop, and iMapReduce are introduced to improve the Iterative Processing of the MapReduce. With regard to the Graph Processing, MapReduce is suitable for processing flat data structures, such as vertex-oriented tasks and propagation is optimized for edge-oriented tasks on partitioned graphs.  However, various models are introduced to improve the programming models for large graph processing such as Surfer, Apache Hama, GoldenOrb, Giraph, Pregel, GraphLab.  With regard to the Stream Processing, various models are also introduced to overcome the limitation of the MapReduce framework which deals only with batch-oriented processing.  These Stream Processing models include Aurora, Borealis, IBM Space, StreamCloud, Stormy, Twitter Storm, Spark Streaming, Apache Storm, StreamMapReduce, Simple Scalable Streaming System (S4), and IBM InfoSphere Streams.  This project focused the discussion and the analysis of the Stream Processing models of Aurora and Borealis.  The discussion and the analysis of Aurora model included an overview of the Aurora model as Streaming Processing Engine (SPE), followed by the Aurora Framework and the fundamental components of the Aurora topology.  The Query Model of Auroral, which is known as Streak Query Algebra “SQuAI,” supports seven operators constructing the Aurora network and queries for expressing its stream processing requirements.   The discussion and analysis also included the “SQuAl” and the Query Model, the Run-Time Framework and the Optimization systems.  The Aurora* and Medusa as Distributed Stream Processing are also discussed and analyzed.  The second SPE is Borealis which is a Distributed SPE.  The discussion and the analysis of the Borealis involved the framework, the query model, and the optimization technique.  Borealis is an expansion to Aurora’s SPE to include and support features which are required for the Distributed Real-Time Streaming.  The comparison between Aurora and Borealis is also discussed and analyzed at all levels from the network, query model, and the optimization techniques. 

References

Abadi, D. J., Ahmad, Y., Balazinska, M., Cetintemel, U., Cherniack, M., Hwang, J.-H., . . . Ryvkina, E. (2005). The Design of the Borealis Stream Processing Engine.

Abadi, D. J., Carney, D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., . . . Zdonik, S. (2003). Aurora: a new model and architecture for data stream management. The VLDB Journal, 12(2), 120-139.

Ahmad, Y., Berg, B., Cetintemel, U., Humphrey, M., Hwang, J.-H., Jhingran, A., . . . Tatbul, N. (2005). Distributed operation in the Borealis stream processing engine. Paper presented at the Proceedings of the 2005 ACM SIGMOD international conference on Management of data.

Balazinska, M., Balakrishnan, H., Madden, S., & Stonebraker, M. (2005). Fault-tolerance in the Borealis distributed stream processing system. Paper presented at the Proceedings of the 2005 ACM SIGMOD international conference on Management of data.

Carney, D., Çetintemel, U., Cherniack, M., Convey, C., Lee, S., Seidman, G., . . . Stonebraker, M. (2002). Monitoring streams—a new class of data management applications. Paper presented at the VLDB’02: Proceedings of the 28th International Conference on Very Large Databases.

Chandarana, P., & Vijayalakshmi, M. (2014, 4-5 April 2014). Big Data analytics frameworks. Paper presented at the Circuits, Systems, Communication and Information Technology Applications (CSCITA), 2014 International Conference on.

Chen, R., Weng, X., He, B., & Yang, M. (2010). Large graph processing in the cloud. Paper presented at the Proceedings of the 2010 ACM SIGMOD International Conference on Management of data.

Chen, R., Yang, M., Weng, X., Choi, B., He, B., & Li, X. (2012). Improving large graph processing on partitioned graphs in the cloud. Paper presented at the Proceedings of the Third ACM Symposium on Cloud Computing.

Cherniack, M., Balakrishnan, H., Balazinska, M., Carney, D., Cetintemel, U., Xing, Y., & Zdonik, S. B. (2003). Scalable Distributed Stream Processing.

Cui, B., Mei, H., & Ooi, B. C. (2014). Big data: the driver for innovation in databases. National Science Review, 1(1), 27-30.

Erl, T., Khattak, W., & Buhler, P. (2016). Big Data Fundamentals: Concepts, Drivers & Techniques: Prentice Hall Press.

Fernández, A., del Río, S., López, V., Bawakid, A., del Jesus, M. J., Benítez, J. M., & Herrera, F. (2014). Big Data with Cloud Computing: an insight on the computing environment, MapReduce, and programming frameworks. Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery, 4(5), 380-409.

Gonzalez, J. E., Xin, R. S., Dave, A., Crankshaw, D., Franklin, M. J., & Stoica, I. (2014). GraphX: Graph Processing in a Distributed Dataflow Framework.

Gradvohl, A. L. S., Senger, H., Arantes, L., & Sens, P. (2014). Comparing distributed online stream processing systems considering fault tolerance issues. Journal of Emerging Technologies in Web Intelligence, 6(2), 174-179.

Grolinger, K., Hayes, M., Higashino, W. A., L’Heureux, A., Allison, D. S., & Capretz, M. A. (2014). Challenges for mapreduce in big data. Paper presented at the Services (SERVICES), 2014 IEEE World Congress on.

Gupta, R., Gupta, H., & Mohania, M. (2012). Cloud computing and big data analytics: what is new from databases perspective? Paper presented at the International Conference on Big Data Analytics.

Hu, H., Wen, Y., Chua, T.-S., & Li, X. (2014). Toward scalable systems for big data analytics: A technology tutorial. IEEE Access, 2, 652-687.

Manyika, J., Chui, M., Brown, B., Bughin, J., Dobbs, R., Roxburgh, C., & Byers, A. H. (2011). Big data: The next frontier for innovation, competition, and productivity.

Neumeyer, L., Robbins, B., Nair, A., & Kesari, A. (2010-639). S4: Distributed stream computing platform. Paper presented at the 2010 IEEE International Conference on Data Mining Workshops.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Schwarzkopf, M., Murray, D. G., & Hand, S. (2012). The seven deadly sins of cloud computing research. Paper presented at the Presented as part of the.

Scott, J. A. (2015). Getting Started with Spark: MapR Technologies, Inc.

Spiess, J., T’Joens, Y., Dragnea, R., Spencer, P., & Philippart, L. (2014). Using Big Data to Improve Customer Experience and Business Performance. Bell Labs Tech. J., 18: 13–17. doi:10.1002/bltj.21642

Zhang, Y., Chen, S., Wang, Q., & Yu, G. (2015). i^2MapReduce: Incremental MapReduce for Mining Evolving Big Data. IEEE transactions on knowledge and data engineering, 27(7), 1906-1919.

Zhang, Y., Gao, Q., Gao, L., & Wang, C. (2012). imapreduce: A distributed computing framework for iterative computation. Journal of Grid Computing, 10(1), 47-68.

Extract Knowledge from a Large-Scale Dataset

Dr. Aly, O.
Computer Science

Introduction

The purpose of this discussion is to discuss and analyze one research work which involves knowledge extraction from a large-scale dataset with specific real-world semantics by applying machine learning.  The discussion begins with an overview of Linked Open Data, Semantic Web Application, and Machine Learning.   The discussion also addresses Knowledge Extraction from Linked Data, its eight-phase lifecycle, and examples of Linked Data-driven applications.  The discussion focuses on DBpedia as an example of such applications. 

Linked Open Data, Semantic Web Application, and Machine Learning

The term Linked Data, as indicated in (Ngomo, Auer, Lehmann, & Zaveri, 2014), refers to a set of best practices for publishing and interlinking structured data on the Web.  The Linked Open Data (LOD) is regarded to be the next generation systems recommended on the web (Sakr & Gaber, 2014).  The LOD describes the principles which are designated by Tim Berners-Lee to publish and connect the data on the web (Bizer, Heath, & Berners-Lee, 2011; Sakr & Gaber, 2014).  These Linked Data principles include the use of URIs as names for things, HTTP URIs, RDF, SPARQL, and the Link of RDF to other URIs (Bizer et al., 2011; Ngomo et al., 2014; Sakr & Gaber, 2014). The LOD cloud, as indicated in (Sakr & Gaber, 2014), covers more than an estimated fifty billion facts from various domains like geography, media, biology, chemistry, economy, energy and so forth.  Semantic web offers cost-effective techniques to publish data in a distributed environment (Sakr & Gaber, 2014).  The Semantic Web technologies are used to publish structured data on the web, set links between data from one data source to data within other data sources (Bizer et al., 2011).  The LOD implements the Semantic Web concepts where organizations can upload their data on the web for open use of their information (Sakr & Gaber, 2014).   The underlying technologies behind the LOD involves the Uniform Resource Identifiers (URI), HyperText Transfer Protocol (HTTP), Resource Description Framework (RDF).  The properties of the LOD include any data, anyone can publish on the Web of Data, Data publishers are not constrained in choice of vocabularies to present data, and entities are connected by RDF links (Bizer et al., 2011).  The LOD applications include Linked Data Browsers, The Search Engines, and Domain-Specific Applications (Bizer et al., 2011).  Examples of the Linked Data Browsers are Tabulator Browser (MIT, USA), Marbles (FU Berlin, DE), OpenLink RDF Browser (OpenLink, UK), Zitgist RDF Browser (Zitgist, USA), Humboldt (HP Labs, UK), Disco Hyperdata Browser (FU Berlin, DE), and Fenfire (DERI, Irland) (Bizer et al., 2011).   Examples of the Search Engines include two types, (1) Human-Oriented Search Engines, and (2) Application-Oriented Indexes.  The Human-Oriented Search Engines examples are Falcons (IWS, China), and “Sig.ma” (DERI, Ireland).  The Application-Oriented Indexes examples include Swoogle (UMBC, USA), VisiNav (DERI, Irland), and Watson (Open University, UK).   The Domain-Specific Applications examples include a “mashing up” data from various Linked Data sources such as Revyu, DBpedia Mobile, Talis Aspire, BBC Programs and Music, and DERI Pipes (Bizer et al., 2011).  There are various challenges for the LOP applications such as user Interfaces and Interaction Paradigms; Application Architectures; Schema Mapping and Data Fusion; Link Maintenance; Licensing; Trust, Quality, and Relevance; and Privacy (Bizer et al., 2011).

Machine Learning is described by (Holzinger, 2017) as the fastest growing technical field.  The primary goal of the Machine Learning (ML) is to develop software which can learn from the previous experience, where a usable intelligence can be reached.  ML is categorized into “supervised,” and “unsupervised” learning techniques depending on whether the output values are required to be present in the training data (Baştanlar & Özuysal, 2014).  The “unsupervised” learning technique requires only the input feature values in the training data, and the learning algorithm discovers hidden structure in the training data based on them (Baştanlar & Özuysal, 2014).  The “Clustering” techniques which partition the data into coherent groups fall into this category of “unsupervised” learning (Baştanlar & Özuysal, 2014).  The “unsupervised” techniques are used in the biometrics for problems such as microarray and gene expression analysis (Baştanlar & Özuysal, 2014).  Market segment analysis, grouping people according to their social behavior, and categorization of articles according to their topics are the common tasks which involve clustering and “unsupervised” learning (Baştanlar & Özuysal, 2014).  Typical clustering algorithms are K-means, Hierarchical Clustering, and Spectral Clustering (Baştanlar & Özuysal, 2014).  The “supervised” learning techniques require the value of the output variable for each training sample to be known (Baştanlar & Özuysal, 2014). 

To improve the structure, semantic richness and quality of Linked Data, the existing algorithm of the ML need to be extended from basic Description Logics such as ALC to expressive ones such as SROIQ(D) to serve as the basis of OWL 2 (Auer, 2010).  The algorithms need to be optimized for processing very large-scale knowledge bases (Auer, 2010).  Moreover, tools and algorithms must be developed for user-friendly knowledge representation, maintenance, and repair which enable detecting and fixing any inconsistency and modeling errors (Auer, 2010).

Knowledge Extraction from Linked Data

There is a difference between Data Mining (DM) and Knowledge Discovery (KD) (Holzinger & Jurisica, 2014).  Data Mining is described in (Holzinger & Jurisica, 2014) as “methods, algorithms, and tools to extract patterns from data by combining methods from computational statistics and machine learning: Data mining is about solving problems by analyzing data present in databases” (Holzinger & Jurisica, 2014).  Knowledge Discovery, on the other hands, is described in (Holzinger & Jurisica, 2014) as “Exploratory analysis and modeling of data and the organized process of identifying valid, novel, useful and understandable patterns from these data sets.”  However, some researchers argue there is no difference between DM and KD.  In (Holzinger & Jurisica, 2014), Knowledge Discovery and Data Mining (KDD) are of equal importance and necessary in combination.

Linked Data has a lifecycle with eight phases: (1) Extraction, (2) Storage and Querying, (3) Authoring, (4) Linking, (5) Enrichment, (6) Quality Analysis, (7) Evolution and Repair, and (8) Search, Browsing, and Exploration (Auer, 2010; Ngomo et al., 2014).  The information which is represented in unstructured forms or semi-structured forms must be mapped to the RDF data model which reflects the Extraction first phase (Auer, 2010; Ngomo et al., 2014).  When there is a critical mass of RDF data, techniques must be implemented to store, index and query this RDF data efficiently which reflects the Storage and Querying the second phase.  New structured information or correction and extension to the existing information can be implemented at the third phase called “Authoring” phase (Auer, 2010; Ngomo et al., 2014).  If information about the same or related entities is published by different publishers, links between those different information assets have to be established in the Linking phase.   During the Linking phase, there is a lack of classification, structure and schema information, which can be tackled in the Enrichment phase by enriching data with the higher-level structure to aggregate and query the data more efficiently.  The Data Web, similar to the Document Web can contain a variety of information of different quality, and hence strategies for quality evaluation and assessment of the data published must be established which is implemented at the Quality Analysis phase.  When a quality problem is detected, strategies to repair the problem and to support the evolution of Linked Data must be implemented at the Evolution and Repair phase.  The last phase of the Linked Data is for users to browse, search and explore the structured information available on the Data Web fast and in a user-friendly fashion (Auer, 2010; Ngomo et al., 2014).

The Linked Data-driven applications are categorized into four categories: (1) Content reuse applications such as BBC’s Musing store which reuses metadata from DBPedia and MusicBrainz, (2) Semantic Tagging and Rating applications such as Faviki which employs unambiguous identifiers from DBPedia, (3) Integrated Question-Answering Systems such as DBPedia Mobile with the ability to indicate locations from the DBPedia dataset in the user’s vicinity, and (4) Event Data Management Systems such as Virtuoso’s ODS-Calendar with the ability to organize events, tasks, and notes (Konstantinou, Spanos, Stavrou, & Mitrou, 2010).  

Integrative, and interactive Machine Learning is the future for Knowledge Discovery and Data Mining.  This concept is demonstrated by (Sakr & Gaber, 2014) using the environmental domain, and in (Holzinger & Jurisica, 2014) using the biomedical domain.  Other applications of LOD include DBpedia which reflects the Linked Data version of Wikipedia, BBC’s Platform for the World Cup 2010 and the 2012 Olympic game (Kaoudi & Manolescu, 2015).   RDF dataset can involve a large volume of data such as data.gov which contains more than five billion triples, while the latest version of DBpedia corresponding to more than 2 billion triples (Kaoudi & Manolescu, 2015).  The Linked Cancer Genome Atlas dataset consists of 7.36 billion triples and is estimated to reach 20 billion as cited in (Kaoudi & Manolescu, 2015).  Such triple atoms have been reported to be frequent in real-world SPARQL queries, reaching about 78% of the DBpedia query log and 99.5% of the Semantic Web Dog query log as cited in (Kaoudi & Manolescu, 2015).  The focus of this discussion is on DBPedia to extract out the knowledge from a large-scale dataset with real-world semantics by applying machine learning. 

DBpedia

DBpedia is a community project which extracts structured, multi-lingual knowledge from Wikipedia and makes it available on the Web using Semantic Web and Linked Data technologies (Lehmann et al., 2015; Morsey, Lehmann, Auer, Stadler, & Hellmann, 2012). DBpedia is interlinked with several external data sets following the Linked Data principles (Lehmann et al., 2015). It develops a large-scale, multi-lingual knowledge based by extracting structured data from Wikipedia editions (Lehmann et al., 2015; Morsey et al., 2012) in 111 languages (Lehmann et al., 2015).  Using DBpedia knowledge base, several tools have been developed such as DBpedia Mobile, Query Builder, Relation Finder, and Navigator (Morsey et al., 2012).  DBpedia is used for several commercial applications such as Muddy Boots, Open Calais, Faviki, Zemanta, LODr, and TopBraid Composer (Bizer et al., 2009; Morsey et al., 2012). 

The technical framework of DBpedia extraction involves four phases of Input, Parsing, Extraction, and Output (Lehmann et al., 2015).   The input phase involves Wikipedia pages which are read from an external source using either a dump from Wikipedia or MediaWiki API (Lehmann et al., 2015).  The Parsing phase involves parsing Wikipedia pages using wiki parser which transforms the source code of a Wikipedia page into an Abstract Syntax Tree (Lehmann et al., 2015).  In the Extraction phase, the Abstract Syntax Tree of each Wikipedia page is forwarded to the extractor to extract many things such as labels, abstracts, or geographical coordinates (Lehmann et al., 2015).   A set of RDF statements is yielded by each extractor which consumes an Abstract Syntax Tree (Lehmann et al., 2015).  The Output phase involves writing the collected RDF statements to a sink supporting different formats such as N-Tiples (Lehmann et al., 2015).   The DBpedia extraction framework uses various extractors for translating different parts of Wikipedia pages to RDF statements (Lehmann et al., 2015).  The extraction framework of DBpedia is divided into four categories:  Mapping-Based Infobox Extraction, Raw Infobox Extraction, Feature Extraction, and Statistical Extraction (Lehmann et al., 2015).  The DBpedia extraction framework involves two workflows: Dump-based Extraction, and the Live Extraction.  The Dump-based Extraction workflow employs the Database Wikipedia page collection as the source of article texts, and the N-Triples serializer as the output destination (Bizer et al., 2009).  The knowledge base result is made available as Linked Data for download and via DBpedia’s main SPARQL endpoint (Bizer et al., 2009).  The Live Extraction workflow employs the update stream to extract new RDF whenever an article is changed in Wikipedia (Bizer et al., 2009).  The Live Wikipedia page collection access the article text to obtain the current version of the article encoded according to the OAI-PMH protocol (The Open Archives Initiative Protocol for Metadata Harvesting) (Bizer et al., 2009).  The SPARQL-Update Destination removes the existing information and inserts new triples into a separate triple store (Bizer et al., 2009).   The time for DBpedia to reflect the latest update of Wikipedia is between one to two minutes by (Bizer et al., 2009).  The update stream causes performance issue and the bottleneck as the changes need more than one minutes to arrive from Wikipedia (Bizer et al., 2009).

References

Auer, S. (2010). Towards creating knowledge out of interlinked data.

Baştanlar, Y., & Özuysal, M. (2014). Introduction to machine learning miRNomics: MicroRNA Biology and Computational Analysis (pp. 105-128): Springer.

Bizer, C., Heath, T., & Berners-Lee, T. (2011). The linked data-the story so far.

Bizer, C., Lehmann, J., Kobilarov, G., Auer, S., Becker, C., Cyganiak, R., & Hellmann, S. (2009). DBpedia-A crystallization point for the Web of Data. Web Semantics: Science, Services, and Agents on the World Wide Web, 7(3), 154-165.

Holzinger, A. (2017). Introduction To MAchine Learning & Knowledge Extraction (MAKE). Machine Learning & Knowledge Extraction.

Holzinger, A., & Jurisica, I. (2014). Knowledge discovery and data mining in biomedical informatics: The future is in integrative, interactive machine learning solutions Interactive knowledge discovery and data mining in biomedical informatics (pp. 1-18): Springer.

Kaoudi, Z., & Manolescu, I. (2015). RDF in the clouds: a survey. The VLDB Journal, 24(1), 67-91.

Konstantinou, N., Spanos, D.-E., Stavrou, P., & Mitrou, N. (2010). Technically approaching the semantic web bottleneck. International Journal of Web Engineering and Technology, 6(1), 83-111.

Lehmann, J., Isele, R., Jakob, M., Jentzsch, A., Kontokostas, D., Mendes, P. N., . . . Auer, S. (2015). DBpedia–a large-scale, multilingual knowledge base extracted from Wikipedia. Semantic Web, 6(2), 167-195.

Morsey, M., Lehmann, J., Auer, S., Stadler, C., & Hellmann, S. (2012). Dbpedia and the live extraction of structured data from Wikipedia. The program, 46(2), 157-181.

Ngomo, A.-C. N., Auer, S., Lehmann, J., & Zaveri, A. (2014). Introduction to linked data and its lifecycle on the web. Paper presented at the Reasoning Web International Summer School.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Graph Dataset Partitioning Methods

Dr. Aly, O.
Computer Science

Introduction

Cloud Computing programs are designed to serve certain purposes.  They can be tailored for Graph or Data Parallelism, which require the utilization of the data striping and distribution or graph partitioning and mapping (Sakr & Gaber, 2014).  The Data Parallelism (DPLL) is distinguished from the Graph Parallelism (GPLL). The DPLL is described as a form of parallelizing computation as a result of distributing data across multiple nodes and running corresponding tasks in parallel on those machines.  The DPLL is achieved when each machine runs one or many tasks over different partitions of data (Sakr & Gaber, 2014).  MapReduce uses the DPLL where Hadoop Distributed File System (HDFS) partition the input datasets into blocks allowing MapReduce to effectively utilize the DPLL through running a map task per one or many blocks. The GPLL, on the other hand, is described as another form of parallelism which focuses more on distributing graphs as opposed to data (Sakr & Gaber, 2014).  The GPLL is widely used in many domains such as Machine Learning (ML), Data Mining (DM), Physics, and many others.

The graph can be distributed over machines or nodes in a distributed system using various types of Graph Partitioning (GPRT) (Hendrickson & Kolda, 2000; Sakr & Gaber, 2014).  When using the GPRT techniques, the work and the vertices are divided over distributed nodes for efficient distributed computation (Hendrickson & Kolda, 2000; Sakr & Gaber, 2014).  As the case with DPLL, the underlying concept of the GPRT is to process different parts of the graph in parallel by distributing a large graph across many nodes in the cluster (Sakr & Gaber, 2014).  Thus, GPRT allows and enables GPLL (Sakr & Gaber, 2014).  The main objectives of the GPRT are to distribute the work uniformly over many processors by partitioning the vertices into these processors equally weighted partitions while minimizing inter-processor communication reflected by edges crossing between partitions (Hendrickson & Kolda, 2000; Sakr & Gaber, 2014).  This technique is called “Edge Cut Metric” (Hendrickson & Kolda, 2000; Sakr & Gaber, 2014). Pregel and GraphLab are good examples of using the GPRT technique.   

There are Standard GPRT techniques which have various flaws and limitations as discussed below.  There are alternative models such as Bipartite Graph Model, Hypergraph Model, Multi-Constraint Partitioning Model, and Skewed Partitioning Model.  These models are only viable if efficient and effective algorithms can be developed to partition them (Hendrickson & Kolda, 2000).   Various research studies have demonstrated that the Multi-Level paradigm for partitioning is proved to be robust and general. Several researchers developed this paradigm independently in the early 1990s and became popular and known as Chaco and METIS partitioning tools.  Regarding the graph processing, there are various Cloud-Based Graph Processing Platforms such as Pregel, PEGASUS, HADI, Surfer, Trinity, and GraphLab (Sakr & Gaber, 2014). 

This discussion and analysis of the alternative Graph Partitioning Models are limited only to Bipartite Graph Model and Multi-Constraint Partitioning Model.  The discussion begins with the Standard Graph Partitioning Techniques with an analysis of their flaws and limitations.  Moreover, the discussion and analysis of the Cloud-Based Graph Processing Platforms are limited to Pregel and GraphLab.

The Standard Graph Partitioning Approach

The graph is a set of vertices and edges, where vertices of the graph can represent units of computations, and the edges can encode data dependencies (Hendrickson & Kolda, 2000).  The Standard Graph Partitioning approach partition the vertices of the graph into equally weighted sets in such a way that the weight of the edges is minimized when they are crossing between sets (Hendrickson & Kolda, 2000).  Chaco and METIS are well-known software packages which are using this standard approach.  There are shortcomings and limitations to the standard Graph Partitioning approach.  The “Edge Cut Metric” has four major flaws (Hendrickson & Kolda, 2000).  The first flaw of the “Edge Cut Metric” involves the edge cuts which are not proportional to the total communication volume.  Thus, it overcounts the true volume of communication because two or more edges which may be representing the same information flow are not recognized by the edge cut metric.  The second flaw of the “Edge Cut Metric” involves the time to send a message on a parallel computer which is a function of the latency and the size of the message (Hendrickson & Kolda, 2000).  The Graph Partitioning approaches try to minimize the total volume, but not the total number of messages.  In certain scenarios, the message volume can be less important than the message latencies (Hendrickson & Kolda, 2000).  The third flaw of the “Edge Cut Metric” and measure involves the performance of a parallel application which is limited by the slowest processor.  The communication effort can be out of balance although the computational work is in balance.  Thus, the focus should be on minimizing the maximum volume and number of messages handled by any single processor instead of focusing on minimizing the total communication volume or even the total number of messages (Hendrickson & Kolda, 2000).   The last flaw of the “Edge Cut Metric” involves the number of switches the message is routed through to travel distances.  Communication should be restricted to processors which are close to each other to prevent the contention of messages and enhance the overall throughput of the message traffic (Hendrickson & Kolda, 2000). 

The Standard Graph Partitioning technique suffer from four limitations because of the lack of “expressibility” in the model (Hendrickson & Kolda, 2000).  The emphasis in the discussion of (Hendrickson & Kolda, 2000) was on two standard models called “Directed,” and “Undirected” Graph Models.  The first limitation involves the symmetric and un-symmetric data dependencies.  The “Undirected Graph” model can only express symmetric data dependencies (Hendrickson & Kolda, 2000).  If the model is un-symmetric, the dependencies are un-symmetric, which can be represented using a “Directed Graph” model. In the “Directed Graph” model, the edges are directed from the data producing vertex to the data consuming vertex (Hendrickson & Kolda, 2000).  The “Standard” Model cannot represent un-symmetric data dependencies.  There are two solutions to make the “Standard” Model represent un-symmetric dependencies (Hendrickson & Kolda, 2000).  The first solution is to convert the directed edges to undirected edges (un-symmetric to symmetric).  The second solution is an extension to the first solution based on the communication if one-way or two-way.  If the edge represents only one-way communication, it gets a weight of one, and if it represents two-way communication, it gets a weight of two (Hendrickson & Kolda, 2000).  The second limitation is represented in the symmetric model, which forces the partition of the input and output data to be identical (Hendrickson & Kolda, 2000).    Although this approach might be desirable in certain cases, it poses unnecessary restriction in many cases (Hendrickson & Kolda, 2000).   The third limitation of the Standard Model involves the assumption that the input and output of the calculation are the sizes.  The last limitation of the Standard Model involves the several distinct phases for the calculation, which include the application of a matrix and a preconditioner in an iterative method, solving a differential equation and applying boundary conditions, simulating different phenomena in a multi-physics calculation and advancing a grid and detecting contacts in a transient dynamics computation (Hendrickson & Kolda, 2000).  Such a combination of phases cannot be described using the “Undirected Graph” Model.

Graph Partitioning Models Alternatives – Bipartite Model, and Multi-Constraint Partitioning Model

To overcome some of the above flaws and challenges of the Standard Graph Models, some models are proposed and discussed in (Hendrickson & Kolda, 2000).  “Bipartite Graph Model” is proposed to solve the un-symmetric data dependencies and un-symmetric partitions which cannot be encoded by the Undirected Graph Model, because this standard model can encode only symmetric data dependencies and symmetric partitions as discussed in the above flaws section. 

The “Bipartite” Model can be applied to other applications involving un-symmetric dependencies and multiple phases (Hendrickson & Kolda, 2000).  There are three advantages for the “Bipartite” Model.  The first advantage involves the capability of encoding a class which is un-symmetric that cannot be encoded by the standard “Undirected” Graph Model.  The second advantage is that the “Bipartite” model allows for the initial partition to be different from the final partition by representing each vertex twice, once as an initial vertex and once as a final vertex allowing for a reduction in communication (Hendrickson & Kolda, 2000).  However, this model cannot provide a symmetric partition which can be preferred in many applications (Hendrickson & Kolda, 2000).  The third advantage involves the load balance which is offered by partitioning both the initial and the final vertices (Hendrickson & Kolda, 2000).  Although the “Bipartite” Graph Model cover the “expressibility” which is missing from the Standard Graph Models, it is still facing the Edge Cut Metric challenge, sharing the other associated problems which the Standard Models are suffering from (Hendrickson & Kolda, 2000).   The Edge Cut Metric issue can be resolved by enhancing the quantity of the graph using other methods and techniques than the cut edges (Hendrickson & Kolda, 2000). Moreover, the “Bipartite” model cannot encode more than the two main computational operations (Hendrickson & Kolda, 2000).

A solution to the limited two-operation encoding limitation involves a k-partite graph where the first set of vertices is connected to a second set, which is connected to a third set and so on.  This technique is called Multi-Constraint methodology.   This Multi-Constraint technique is not an alternative to the other models, but rather an enhancement to the other models (Hendrickson & Kolda, 2000).  The goal of this model is to partition the vertices of that graph in a way to minimize the communication, while balancing the k weight that is assigned to each vertex, resulting in balancing each computation phase (Hendrickson & Kolda, 2000).  Moreover, this model was developed to minimize the edge cuts (Hendrickson & Kolda, 2000).  Using this model, the edges represent the data dependencies in every phase of all computations (Hendrickson & Kolda, 2000).  This Multi-Constraint Model includes the “Bipartite” and k-partite techniques as a special case.  Although this model is powerful, it is difficult to implement (Hendrickson & Kolda, 2000).

Cloud-Based Graph Processing Platforms of Pregel and GraphLab

Pregel is known as a vertex-oriented graph processing engine which implements a Bulk Synchronous Parallel (BSP) model, where programs are expressed as a sequence of iterations (Sakr & Gaber, 2014). In each of these programs, a vertex can receive messages which are sent in the previous iteration, send messages to other vertices and modify its state.  Google introduced the Pregel system as a scalable platform for the implementation of graph algorithms (Sakr & Gaber, 2014).  Pregel implements the algorithm of the vertex-oriented PageRank under the Message Passing paradigm (Sakr & Gaber, 2014).  Pregel passes the results of the computation between workers, executing Compute() on all vertices in parallel (Sakr & Gaber, 2014).  In Pregel, messages are passed over the network, and vertices can vote to halt if there is no work to do (Sakr & Gaber, 2014).  In Pregel, each vertex in a graph is assigned a unique ID, and partitioning of the graph is accomplished using a hash(ID) mode N function where N is the number of partitions.  The hash function can be modified by users.  After the graph is partitioned, partitions are mapped to cluster machines using a mapping function of a user choice (Sakr & Gaber, 2014).  In Pregel, if graphs or cluster sizes are modified after partitioning, the entire graphs need to be re-partitioned before any processing (Sakr & Gaber, 2014). 

GraphLab, in contrast to Pregel, employs a two-phase partitioning strategy (Sakr & Gaber, 2014).  The input graph is partitioned in the first phase into some partitions using a hash-based random algorithm, while the number of partitions must be larger than the number of the cluster nodes (Sakr & Gaber, 2014). The partition in GraphLab is called an “atom,” where commands are stored to generate vertices and edges (Sakr & Gaber, 2014).  GraphLab maintains in each atom some information about the atom’s neighboring vertices and edges which is denoted as ghosts which are used as a caching capability for efficient adjacent data accessibility (Sakr & Gaber, 2014).  In the second phase of the GraphLab’s two-phase partitioning strategy, GraphLab stores the connectivity structure and the locations of atoms in an atom index file which is referred to as meta-graph.  This meta-graph contains the number of the vertices, and edges encoding connectivity among atoms, and is split uniformly across the cluster nodes (Sakr & Gaber, 2014).  Atoms are then loaded by cluster machines, and each machine constructs its partitions by executing the commands in each of its assigned atoms (Sakr & Gaber, 2014).  GraphLabs allows future modification to the graph to be appended as additional commands in atoms without any requirement to repartition the entire graph by generating partitions through executing commands stored in the atoms (Sakr & Gaber, 2014).  Moreover, the same graph atoms can be reused for different sizes of clusters by simply re-dividing the corresponding atom index file and re-executing atom commands (Sakr & Gaber, 2014).     

Moreover, GraphLab is designed and developed for ML, and DM algorithms, which are not supported naturally by MapReduce (Sakr & Gaber, 2014).  The abstraction of the GraphLab allows asynchronous, dynamic, graph-parallel computation while ensuring the consistency of the data, and achieving a high degree of parallel performance in the Shared-Memory setting (Sakr & Gaber, 2014).  This asynchronous parallel model of GraphLab is different from Pregel BSP model (Sakr & Gaber, 2014).  The GraphLab framework is extended to the distributed setting while ensuring strong data consistency (Sakr & Gaber, 2014).

How To Handle the Unevenness of the Cloud Network Bandwidth

The large graph can have over hundreds of gigabytes which require data-intensive processing (Chen et al., 2012; Hendrickson & Kolda, 2000; Sakr & Gaber, 2014).  The Cloud Computing environment is described to be a good fit for processing large graph, the (Chen et al., 2012; Hendrickson & Kolda, 2000; Sakr & Gaber, 2014).  The graph processing systems support a vertex-oriented execution model which allows users to develop custom logics on vertices (Chen et al., 2012).  However, the random access on the vertex-oriented computation can generate a significant amount of network traffic (Chen et al., 2012).  The GPRT technique is known to be effective to reduce the network traffic in graph processing (Chen et al., 2012; Hendrickson & Kolda, 2000; Sakr & Gaber, 2014).  However, in the Cloud environment, the GPRT needs to be effectively integrated into large graph processing (Chen et al., 2012; Sakr & Gaber, 2014).  The network bandwidth of the Cloud environment is uneven among various machine pairs, where the machines are grouped first into “pods,” which are connected high-level switches (Chen et al., 2012; Sakr & Gaber, 2014).  The bandwidth of the intra-pod is higher than the bandwidth of the cross-pod (Chen et al., 2012; Sakr & Gaber, 2014).  Moreover, the users are not aware of the topology information due to the virtualization techniques which is employed by the Cloud (Chen et al., 2012; Sakr & Gaber, 2014).   Thus, the unevenness of the Cloud network bandwidth requires network optimizations and tuning on graph partitioning and processing (Chen et al., 2012; Sakr & Gaber, 2014).   A framework is proposed by (Chen et al., 2012; Sakr & Gaber, 2014), which is described as the network performance aware graph partitioning with the aim to improve the network performance of the graph partitioning process (Chen et al., 2012; Sakr & Gaber, 2014).  The underlying concept of the framework is to partition, store, and process the graph partitions based on their number of cross-partition edges (Chen et al., 2012; Sakr & Gaber, 2014).  Thus, the partition with a large number of cross-partition edges is stored in the machines with the high network bandwidth between them because the network traffic for those graph partitions requires high bandwidth (Chen et al., 2012; Sakr & Gaber, 2014).  This framework partitions both the data graph and machine graph and is known as “Machine Graph” framework (Chen et al., 2012; Sakr & Gaber, 2014).  This framework is described as complete weighted undirected graph models the machines which are chosen for graph partitioning to capture the unevenness of the network bandwidth (Chen et al., 2012; Sakr & Gaber, 2014).  Each chosen machine is modeled as a vertex; an edge represents the connectivity between the two machines (Chen et al., 2012; Sakr & Gaber, 2014).  The bandwidth between any two machines is represented as the weight of an edge (Chen et al., 2012; Sakr & Gaber, 2014).   The machine graph can be built with no knowledge or control of the physical network topology (Chen et al., 2012; Sakr & Gaber, 2014). 

Partition Sketch is another framework proposed by (Chen, Weng, & He; Sakr & Gaber, 2014) for the unevenness of the network bandwidth in the Cloud environment.  Using the Partition Sketch, the process of a multi-level graph partitioning algorithm is modeled as a “tree structure” (Sakr & Gaber, 2014).  In this framework, each node represents the graph acting as the input for the partition operation at a level of the entire graph partitioning process (Chen et al.; Sakr & Gaber, 2014).  It begins with the root node which represents the input graph, followed by the non-leaf nodes at the level of ( i + 1) which represents the partitions of the ith iteration, and the leaf nodes represent the graph partitions which are generated by the multi-level graph partitioning algorithm.  This framework of the Partition Sketch is described as a binary tree since the graph partitioning is often implemented using “bisection” (Chen et al.; Sakr & Gaber, 2014).  The “Graph Machine,” and “Partition Sketch” framework are proposed frameworks for the network bandwidth aware graph partitioning techniques for the cloud (Chen et al.; Sakr & Gaber, 2014).   These frameworks enhance a popular multilevel graph partitioning algorithm with the network performance awareness (Chen et al.; Sakr & Gaber, 2014). 

References

Chen, R., Weng, X., & He, B. On the efficiency and programmability of large graph processing in the cloud.

Chen, R., Yang, M., Weng, X., Choi, B., He, B., & Li, X. (2012). Improving large graph processing on partitioned graphs in the cloud. Paper presented at the Proceedings of the Third ACM Symposium on Cloud Computing.

Hendrickson, B., & Kolda, T. G. (2000). Graph partitioning models for parallel computing. Parallel computing, 26(12), 1519-1534.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Network Traffic Anomalies

Dr. Aly, O.
Computer Science

Introduction

Fraud detection is a critical component for the integrity of the Internet services (Huang, Al-Azzawi, & Brani, 2014; Sakr & Gaber, 2014; Soldo & Metwally, 2012).  The fraudulent activities include generating charges for online advertisers without a real interest in the products advertised (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  Thess fraudulent activities are referred to as hit inflation attack or abusive ad clicks (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  These fraudulent activities are classified into attacks from publishers and attacks from advertisers.   The goal of the attacks from the publishers is to increase the revenues of the publisher, while the goal of the attacks from advertisers is to increase the activities such as impressions or clicks associated with the advertisements of their competitors to deplete the advertising budget of the competitors (Sakr & Gaber, 2014; Soldo & Metwally, 2012). 

Such abusive ad clicks and hits inflation attacks must be detected for the integrity of the numerous internet services (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  There are three approaches for generating abusive click attacks with three types of traffic.  The first approach is through ads on the publisher site which are clicked by legitimate users which are called white pointers.  The second approach is through ads which are clicked by both legitimate users and fraudsters which is called gray pointers.  The third traffic is through the large botnet to generate fake traffic (Sakr & Gaber, 2014; Soldo & Metwally, 2012).   Preventing such abusive traffic is very critical and is described to be “at the heart of analyzing large data sets” (Sakr & Gaber, 2014; Soldo & Metwally, 2012) because it involves developing statistical models at a global scale (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  However, preventing or combating abusive traffic is not a trivial task and is a very challenging issue. 

IP Estimated Size Challenges

The IP size is defined in (Sakr & Gaber, 2014; Soldo & Metwally, 2012) as the number of users sharing the same IP address.  The size of any IP can change abruptly, as new users could join the local network and share the same public IP, and other users leave, or the IP address gets reassigned to a different host.  The same host could be shared among several users.  Moreover, the users could be connected through the same Network Address Translation (NAT).  The sizes of a significant portion of the IP space must be estimated, which is a very challenging task (Sakr & Gaber, 2014; Soldo & Metwally, 2012).

To preserve the users’ privacy, the sizes of IPs are estimated using the application-level log files (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  However, this technique is not straightforward technique as there are various challenges associated with the application-level log files such as using distinct user identification, e.g., cookie IDs or user agents (UAs) per IP fails to accurately estimate sizes (Sakr & Gaber, 2014; Soldo & Metwally, 2012).   Thus, the estimation of the IP sizes using such approach of distinct user identification can either overestimate or underestimate the result (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  When filtering the traffic based on these inaccurate sizes, the result can lead to high false-negative and false-positive rates (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  Statistical models can be developed using the log files to estimate the IP sizes (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  However, this statistical model approach has different challenges because it contains not only legitimate traffic but also abusive traffic which can result in degrading the quality of the statistical models and the estimated sizes (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  To overcome such a challenge when using the statistical models, the models should be developed only from the traffic of trusted users, which can introduce sampling bias in the traffic (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  To decrease such sampling bias in the estimation phase, the trusted traffic of each IP is only used during a period to estimate the size for that period (Sakr & Gaber, 2014; Soldo & Metwally, 2012). This specific period IP size estimation can reduce the accuracy of the filtering (Sakr & Gaber, 2014; Soldo & Metwally, 2012).

In (Soldo & Metwally, 2012), to overcome the above challenges, statistical models is proposed for size estimation in an autonomous, passive and privacy-preserving technique from aggregated log files, and predicted size using time series analysis on the estimated IP size (Soldo & Metwally, 2012). Thus, to estimate the IP size, a regression model is developed for the numbers of trusted cookies behind the IPs, the baseline of the measured sizes, using two types of features: (1) the activity rate, and (2) the explanatory diversity of the observed traffic (Soldo & Metwally, 2012).  Models are developed from the log files aggregated at the IP level.  The majority of the IPs have very few trusted cookies behind them.  Stratified sampling is used to avoid underrepresenting IPs with large measured sizes (Soldo & Metwally, 2012).   Estimates are calculated based on each IPs traffic rate and diversity (Soldo & Metwally, 2012).  A regression-based model is developed using four main regressions: (1) Linear Regression (LR), (2) Quantile Regression (QR), (3) Principal Component Analysis (PCA) & Multivariate Adaptive Regression Splines (MARS), and (4) Percentage Regression (PR) (Soldo & Metwally, 2012).  The LR model is commonly used regression technique which minimizes the root mean square error.  The QR is a more robust regression model that helps in the outliers.  The PCA+MARS technique is used to develop regression models for IP size estimation.  This technique uses a non-parametric regression technique which captures non-linear behavior by developing a piecewise nonlinear model (Soldo & Metwally, 2012).  The PR is a regression technique to minimize the relative error (Soldo & Metwally, 2012).  The accuracy of each model is evaluated and analyzed using the baseline against the result of each model.  The results showed no winning model.  Other alternative approaches can be used to predict the IP size such as autoregressive moving average (ARMA) model which is used to model time series with both autoregressive and moving an average part (Soldo & Metwally, 2012).  However, this model also faces many challenges. 

PredictSizes Algorithm Proposed Solution

Due to these challenges mentioned above, efficient prediction algorithm is required.  In (Soldo & Metwally, 2012), the PredictSizes algorithm is proposed to employ some concepts from seasonal autoregressive integrated moving average (ARIMA) models.  This ARIMA model is a general model of the ARM and is applied to non-stationary data with some trend or seasonality (Soldo & Metwally, 2012).  This model analyzes the periodicity of the size estimates (Soldo & Metwally, 2012).  For each periodicity, the model analyzes a sliding window of estimates and seeks their stable representative size by making iterative variance reduction until the estimates lie within an acceptable confidence interval.  The model also combines the estimates of all periodicities (Soldo & Metwally, 2012).  The model of PredictSizes deals with each periodicity of each IP separately.  Thus, it can be massively parallelized using the MapReduce framework as the estimates of the size are stored in files shared by the period IDs and IPs (Soldo & Metwally, 2012).  The algorithm of PredictSizes combines all the stable sizes of all the periodicities using a “Combiner” function which implements sanity checks on the predicted sizes (Soldo & Metwally, 2012).  The experimentation of (Soldo & Metwally, 2012) showed that this sanity checks proved to be very effective in detecting abrupt legitimate size changes early leading to the reduction of the false positives caused by over-filtering legitimate traffic (Soldo & Metwally, 2012).  The accuracy of the prediction is assessed and evaluated using a random sample the collected ten million IPs.  The result showed that the accuracy of the predictions increases when the estimated size increases, where the accuracy is more operationally desired (Soldo & Metwally, 2012). 

Machine-Generated Attacks Detection and IP Size Distribution

In (Sakr & Gaber, 2014; Soldo & Metwally, 2012), the emphasis is on the publishers hit inflation, and abusive ad clicks attacks. However, the techniques apply to the advertisers as well. These techniques are used to detect machine-generated traffic.  A framework is proposed by (Sakr & Gaber, 2014; Soldo & Metwally, 2012) which automatically detect and classifies the anomalous deviations using statistical tests.  The framework has low complexity and is said to be easy to parallelize, making it suitable for large-scale detection.  It is based on the fundamental characteristics of machine-generated traffic such as DHCP re-assignment.  The main goal of this proposed framework is to automatically detect and filter out the fraudulent clicks since the click traffic received by a publisher can be a mixture of both legitimate and fraudulent clicks (Sakr & Gaber, 2014; Soldo & Metwally, 2012).

The IP size is used as a discriminative feature for detecting machine-generated traffic (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  Two main IP size distributions: (1) a website which receives average desktop traffic, and (2) a website which received average mobile traffic (Sakr & Gaber, 2014; Soldo & Metwally, 2012).   The researchers of (Soldo & Metwally, 2012) observed that the IP size is small when a website receives mainly desktop traffic where a small number of users share the same IP address. However, the IP size distribution is found to be highly skewed toward the left (Sakr & Gaber, 2014; Soldo & Metwally, 2012).   The IP size distribution exhibits two distinguished modes where the website receives mainly mobile traffic because the mobile users access the Internet using either public IP address or through large proxies (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  There are two attacks identified by (Soldo & Metwally, 2012) as the two diverse examples in the wide spectrum of possible attacks using machine-generated traffic regarding resources required and sophistication level.  The Botnet-based attacks and the Proxy-based attacks.  The Botnet-based Attacks occur when the attacker controls a large number of hosts through a botnet, where the attack can be highly distributed across the available hosts to maximize the overall amount of traffic generated while maintaining a low activity profile for each host individually (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  However, the Proxy-based attacks occur when the attacker controls a few hosts but still wants to generate a large amount of traffic, the attacker can use anonymizing proxies such as TOR (The Onion Router) nodes, to hide the actual source IPs involved in such traffic attack (Sakr & Gaber, 2014; Soldo & Metwally, 2012). 

The machine-generated traffic attacks induce an anomalous IP size distribution (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  Thus, a detection system based on the IP size histogram is implemented which automatically filters fraudulent clicks associated with any publisher (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  The first step is to identify the proper grouping of publishers, who share a similar IP size distribution (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  The group is implemented by categorizing the publishers based on the same type of services such as web search, services for mobile users, content sites, and parked domain websites (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  This approach provides fine-grained grouping technique of publishers which takes into account the various factors which can affect the IP size (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  A statistical threshold model of the click traffic is computed for each group of publishers, by aggregating the click traffic, which is followed by setting a minimum quality score (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  The percentile threshold is computed for each group and each bucket.  This technique considers the observed empirical distributions, the number of available samples IP sizes, and the desired confidence level (Sakr & Gaber, 2014; Soldo & Metwally, 2012).  The result showed that the proposed model could accurately estimate the fraud scores with three percentage average prediction error. 

Other Network Anomaly Detection Approaches

In (Huang et al., 2014), other approaches are discussed to include network anomaly detection focusing on non-signature-based approaches.  Three Non-Signature-Based approaches include three main approaches:  PCA-based, Sketch-based, and Signal-Analysis-Based.  In (Mahoney, 2003), two-stage anomaly detection system for identifying suspicious traffic.  The traffic is filtered to pass only the packets of most interest such as the first few packets of incoming service requests.  The most protocols of IP, TCP, Telnet, FTP, SMPT, and HTTP are modeled at the packet byte level to flag events using the byte values (Mahoney, 2003). 

References

Huang, H., Al-Azzawi, H., & Brani, H. (2014). Network traffic anomaly detection. arXiv preprint arXiv:1402.0856.

Mahoney, M. V. (2003). Network traffic anomaly detection based on packet bytes. Paper presented at the Proceedings of the 2003 ACM symposium on Applied computing.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.

Soldo, F., & Metwally, A. (2012). Traffic anomaly detection based on the IP size distribution. Paper presented at the INFOCOM, 2012 Proceedings IEEE.

Traditional Distributed Models and Cloud Computing

Dr. Aly, O.
Computer Science

Introduction

The traditional distributed models involve the Shared Memory and the Message Passing programming models.  The SM and the MP techniques are two major parallel computing architecture models (Coulouris, Dollimore, & Kindberg, 2005; Manoj, Manjunath, & Govindarajan, 2004).  They provide the basic interaction model for distributed tasks and lack any facility to automatically parallelize and distribute tasks or tolerate faults (Hammoud, Rehman, & Sakr, 2012; Sakr & Gaber, 2014). Advanced models such as MapReduce, Pregel, and GraphLab are introduced to overcome the inefficiencies and challenges of these traditional distributed models especially when porting them to the Cloud environment (Fernández et al., 2014; Low et al., 2012; Sakr & Gaber, 2014).  These new models are built upon these two traditional models of the SM and MP and offer various key properties for the Cloud environment (Sakr & Gaber, 2014).  These models are also referred to as “distributed analytics engines” (Sakr & Gaber, 2014). 

Shared Memory Distributed Programming

The SM system has a global memory that is accessible to all the processors in the system (Manoj et al., 2004).  The SM techniques include two models based on the nature of sharing of this global memory across processors. The first model is the Uniform-Memory-Access (UMA) (Manoj et al., 2004; Pulipaka, 2016). The second model is the Non-Uniform-Memory-Access (NUMA) (Coulouris et al., 2005; Dean & Ghemawat, 2004; Hennessy & Patterson, 2011; Manoj et al., 2004; Mishra, Dehuri, & Kim, 2016).   The access time to the memory in UMA model from two processors is equal.  The access time to the memory in NUMA varies for different processors (Manoj et al., 2004).   Hardware Distributed Shared Memory (DSM) is an example of the NUMA including Stanford DASH, SCI, DDM, and KSRI (Manoj et al., 2004). When using a uniprocessor environment, the programming in SM systems involves updating shared data, which is regarded to be a simple programming process (Manoj et al., 2004).  However, with the increasing number of processors, the environment experiences increased contention and long latencies in accessing the shared memory (Manoj et al., 2004).  The contention and latencies diminish the performance and limit scalability (Manoj et al., 2004).  Additional issues with the SM systems involve data access synchronization, cache coherency, and memory consistency (Manoj et al., 2004).  The developers must ensure appropriate memory access order through synchronization primitives (Manoj et al., 2004).  Moreover, when using hardware in the implementation of the SM abstraction, the cost of the system gets increased (Manoj et al., 2004).

In SM programming model, the data is not explicitly communicated but implicitly exchanged via sharing which entails the use of synchronization techniques within the distributed programs (Sakr & Gaber, 2014).  The tasks can communicate by reading and writing to the shared memory or disks locations.   The tasks can access any location in the distributed memories/disk which is similar to the threads of a single process in operating systems, where all threads share the process address space and communicate by reading and writing to that space (Sakr & Gaber, 2014).  The distributed tasks are prevented from simultaneously writing to a shared data to avoid inconsistent or corrupted data when using synchronization which is required to control the order in which read/write operations are performed by various tasks (Sakr & Gaber, 2014).  The communication between two tasks is implicit through reads and writes to shared arrays and variables and synchronization is explicit through locks and barriers (Sakr & Gaber, 2014). 

 The SM programming model must meet two main requirements: developers do not need to explicitly encode functions to send/receive messages, and the underlying storage layer provides a shared view of all tasks.  MapReduce satisfies these two requirements using a map and reduce, and the communications occur only between the map and reduce phases under the full control of the MapReduce engine. Moreover, the synchronization is also handled by the MapReduce engine (Sakr & Gaber, 2014).   For the storage layer, MapReduce utilizes the Hadoop Distributed File System (HDFS) which provides a shared abstraction for all tasks, where any task transparently access any location in HDFS (Sakr & Gaber, 2014).  Thus, MapReduce provides the shared-memory abstraction which is provided internally by Hadoop that entails MapReduce engine and HDSF (Sakr & Gaber, 2014).  GraphLab also offers a shared-memory abstraction by eliminating the need for users to send/receive messages in update functions explicitly and provides a shared view among vertices in a graph (Fernández et al., 2014; Low et al., 2012; Sakr & Gaber, 2014).  GraphLab allows scopes of vertices to overlap and vertices to read and write from and to their scopes which can cause a conflict of read-write and write-write between vertices sharing scope (Fernández et al., 2014; Low et al., 2012; Sakr & Gaber, 2014). 

Message Passing Distributed Programming

In MP programming model the distributed tasks communicate by sending and receiving messages where the distributed tasks do not share an address space at which they can access each other’s memories (Sakr & Gaber, 2014).  The MP programming model provides the abstraction which is similar to that of the process and not the threads in the operating system (Sakr & Gaber, 2014).  This model incurs communications overheads for explicitly sending and receiving messages that contain data such as variable network latency, potentially excessive data transfer (Hammoud et al., 2012; Sakr & Gaber, 2014).  When using MP programming model, no explicit synchronization is needed because for every send operation; there is a corresponding receive operation (Sakr & Gaber, 2014).  Moreover, no illusion for single shared address space is required for the distributed system for tasks to interact (Sakr & Gaber, 2014). 

Message Passing Interface (MPI) is a popular example of the MP programming model, which is an industry-standard library for writing message-passing programs (Hammoud et al., 2012; Sakr & Gaber, 2014).  Pregel is regarded the common analytics engine which utilizes the message-passing programming model (Malewicz et al., 2010; Sakr & Gaber, 2014). In Pregel, vertices can communicate only by sending and receiving messages, which should be explicitly encoded (Malewicz et al., 2010; Sakr & Gaber, 2014). The SM programs are easier to develop than the MP programs (Manoj et al., 2004; Sakr & Gaber, 2014).  The code structure of SM programs is often not much different than its respective sequential one, with only additional directives to be added to specify parallel/distributed tasks, the scope of variables, and synchronization points (Manoj et al., 2004; Sakr & Gaber, 2014).  Large-scale distributed systems such as the Cloud imply non-uniform access latencies such as accessing remote data takes more time than accessing local data, thus enforcing programmers to lay out data close to relevant tasks (Sakr & Gaber, 2014). 

References

Coulouris, G. F., Dollimore, J., & Kindberg, T. (2005). Distributed systems: concepts and design: pearson education.

Dean, J., & Ghemawat, S. (2004). MapReduce: simplified data processing on large clusters. OSDI’04 Proceedings of the 6th conference on Symposium on Opearting Systems Design and Implementation” dalam: International Journal of Engineering Science Invention. 10-100.

Fernández, A., del Río, S., López, V., Bawakid, A., del Jesus, M. J., Benítez, J. M., & Herrera, F. (2014). Big Data with Cloud Computing: an insight on the computing environment, MapReduce, and programming frameworks. Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery, 4(5), 380-409.

Hammoud, M., Rehman, M. S., & Sakr, M. F. (2012). Center-of-gravity reduce task scheduling to lower mapreduce network traffic. Paper presented at the Cloud Computing (CLOUD), 2012 IEEE 5th International Conference on.

Hennessy, J. L., & Patterson, D. A. (2011). Computer architecture: a quantitative approach: Elsevier.

Low, Y., Bickson, D., Gonzalez, J., Guestrin, C., Kyrola, A., & Hellerstein, J. M. (2012). Distributed GraphLab: a framework for machine learning and data mining in the cloud. Proceedings of the VLDB Endowment, 5(8), 716-727.

Malewicz, G., Austern, M. H., Bik, A. J., Dehnert, J. C., Horn, I., Leiser, N., & Czajkowski, G. (2010). Pregel: a system for large-scale graph processing. Paper presented at the Proceedings of the 2010 ACM SIGMOD International Conference on Management of data.

Manoj, N., Manjunath, K., & Govindarajan, R. (2004). CAS-DSM: A compiler assisted software distributed shared memory. International Journal of Parallel Programming, 32(2), 77-122.

Mishra, B. S. P., Dehuri, S., & Kim, E. (2016). Techniques and Environments for Big Data Analysis: Parallel, Cloud, and Grid Computing (Vol. 17): Springer.

Pulipaka, G. (2016). Distributed Shared Memory Programming for Hadoop, MapReduce, and HPC Architecture. Retrieved from  https://medium.com/@gp_pulipaka/distributed-shared-memory-programming-for-hadoop-mapreduce-and-hpc-357a1b226ff6.

Sakr, S., & Gaber, M. (2014). Large Scale and big data: Processing and Management: CRC Press.