Hadoop: Manageable Size of Data.

Dr. O. Aly
Computer Science

Abstract

The purpose of this project is to discuss how data can be handled before Hadoop can take action on breaking data into manageable sizes.  The discussion begins with an overview of Hadoop providing a brief history of Hadoop and the difference between Hadoop 1.x and Hadoop 2.x. The discussion involves the Big Data Analytics process using Hadoop which involves six significant steps including the pre-processing data and ETL process where the data must be converted and cleaned before processing it.  Before data processing, some consideration must be taken for data preprocessing, modeling and schema design in Hadoop for better processing and data retrieval as it will affect how data can be split among various nodes in the distributed environment because not all tools can split the data.  This consideration begins with the data storage format, followed by Hadoop file types consideration and XML and JSON format challenges in Hadoop.  The compression of the data must be considered carefully because not all compression types are “splittable.” The discussion also involves the schema design consideration for HDFS and HBase since they are used often in the Hadoop ecosystem. 

Keywords: Big Data Analytics; Hadoop; Data Modelling in Hadoop; Schema Design in Hadoop.

Introduction

In the age of Big Data, dealing with large datasets in terabytes and petabytes is a reality and requires specific technology as the traditional technology was found inappropriate for it (Dittrich & Quiané-Ruiz, 2012).  Hadoop is developed to store, and process such large datasets efficiently.  Hadoop is becoming a data processing engine for Big Data (Dittrich & Quiané-Ruiz, 2012).  One of the significant advantages of Hadoop MapReduce is allowing non-expert users to run easily analytical tasks over Big Data (Dittrich & Quiané-Ruiz, 2012). However, before the analytical process takes place, some schema design and data modeling consideration must be taken for Hadoop so that the data process can be efficient (Grover, Malaska, Seidman, & Shapira, 2015).  Hadoop requires splitting the data. Some tools can split the data while others cannot split the data natively and requires integration (Grover et al., 2015). 

This project discusses these considerations to ensure the appropriate schema design for Hadoop and its components of HDFS, HBase where the data gets stored in a distributed environment.   The discussion begins with an overview of Hadoop first, followed by the data analytics process and ends with the data modeling techniques and consideration for Hadoop which can assist in splitting the data appropriately for better data processing performance and better data retrieval.

Overview of Hadoop

            Google published and disclosed its MapReduce technique and implementation early around 2004 (Karanth, 2014).  It also introduced the Google File System (GFS) which is associated with MapReduce implementation.  The MapReduce, since then, has become the most common technique to process massive data sets in parallel and distributed settings across many companies (Karanth, 2014).  In 2008, Yahoo released Hadoop as an open-source implementation of the MapReduce framework (Karanth, 2014; sas.com, 2018). Hadoop and its file system HDFS are inspired by Google’s MapReduce and GFS (Ankam, 2016; Karanth, 2014).  

The Apache Hadoop is the parent project for all subsequence projects of Hadoop (Karanth, 2014).  It contains three essential branches 0.20.1 branch, 0.20.2 branch, and 0.21 branch.  The 0.20.2 branch is often termed MapReduce v2.0, MRv2, or Hadoop 2.0.  Two additional releases for Hadoop involves the Hadoop-0.20-append and Hadoop-0.20-Security, introducing HDFS append and security-related features into Hadoop respectively.  The timeline for Hadoop technology is outlined in Figure 1.


Figure 1.  Hadoop Timeline from 2003 until 2013 (Karanth, 2014).

Hadoop version 1.0 was the inception and evolution of Hadoop as a simple MapReduce job-processing framework (Karanth, 2014).  It exceeded its expectations with wide adoption of massive data processing.  The stable version of the 1.x release includes features such as append and security.  Hadoop version 2.0 release came out in 2013 to increase efficiency and mileage from existing Hadoop clusters in enterprises.  Hadoop is becoming a common cluster-computing and storage platform from being limited to MapReduce only, because it has been moving faster than MapReduce to stay leading in massive scale data processing with the challenge of being backward compatible (Karanth, 2014). 

            In Hadoop 1.x, the JobTracker was responsible for the resource allocation and job execution (Karanth, 2014).  MapReduce was the only supported model since the computing model was tied to the resources in the cluster. The yet another resource negotiator (YARN) was developed to separate concerns relating to resource management and application execution, which enables other application paradigms to be added into Hadoop computing cluster. The support for diverse applications result in the efficient and effective utilization of the resources and integrates well with the infrastructure of the business (Karanth, 2014).  YARN maintains backward compatibility with Hadoop version 1.x APIs  (Karanth, 2014).  Thus, the old MapReduce program can still execute in YARN with no code changes, but it has to be recompiled (Karanth, 2014).

            YARN abstracts out the resource management functions to form a platform layer called ResourceManager (RM) (Karanth, 2014).  Every cluster must have RM to keep track of cluster resource usage and activity.  RM is also responsible for allocation of the resources and resolving contentions among resource seekers in the cluster.  RM utilizes a generalized resource model and is agnostic to application-specific resource needs.  RM does not need to know the resources corresponding to a single Map or Reduce slot (Karanth, 2014). Figure 2 shows Hadoop 1.x and Hadoop 2.x with YARN layer.   


Figure 2. Hadoop 1.x vs. Hadoop 2.x (Karanth, 2014).

Hadoop 2.x involves various enhancement at the storage layer as well.   These enhancements include the high availability feature to have a hot standby of NameNode (Karanth, 2014), when the active NameNode fails, the standby can become active NameNode in a matter of minutes.  The Zookeeper or any other HA monitoring service can be utilized to track NameNode failure (Karanth, 2014).  The failover process to promote the hot standby as the active NameNode is triggered with the assistance of the Zookeeper.  The HDFS federation is another enhancement in Hadoop 2.x, which is a more generalized storage model, where the block storage has been generalized and separated from the filesystem layer (Karanth, 2014).  The HDFS snapshots is another enhancement to the Hadoop 2.x which provides a read-only image of the entire or a particular subset of a filesystem to protect against user errors, backup, and disaster recovery.   Other enhancements added in Hadoop 2.x include the Protocol Buffers (Karanth, 2014). The wire protocol for RPCs within Hadoop is based on Protocol Buffers.  Hadoop 2.x is aware of the type of storage and expose this information to the application, to optimize data fetch and placement strategies (Karanth, 2014).  HDFS append support has been another enhancement in Hadoop 2.x.

Hadoop is regarded to be the de facto open-source framework for dealing with large-scale, massively parallel, and distributed data processing (Karanth, 2014).  The framework of Hadoop includes two layers for computation and data layer (Karanth, 2014).  The computation layer is used for parallel and distributed computation processing, while the data layer is used for a highly fault-tolerant data storage layer which is associated with the computation layer.  These two layers run on commodity hardware, which is not expensive, readily available, and compatible with other similar hardware (Karanth, 2014).

Hadoop Architecture

Apache Hadoop has four projects: Hadoop Common, Hadoop Distributed File System, Yet Another Resource Negotiator (YARN), and MapReduce (Ankam, 2016).  The HDFS is used to store data, MapReduce is used to process data, and YARN is used to manage the resources such as CPU and memory of the cluster and common utilities that support Hadoop framework (Ankam, 2016; Karanth, 2014).  Apache Hadoop integrates with other tools such as Avro, Hive, Pig, HBase, Zookeeper, and Apache Spark (Ankam, 2016; Karanth, 2014).

            Hadoop three significant components for Big Data Analytics.  The HDFS is a framework for reliable distributed data storage (Ankam, 2016; Karanth, 2014).  Some considerations must be taken when storing data into HDFS (Grover et al., 2015).  The multiple frameworks for parallel processing of data include MapReduce, Crunch, Cascading, Hive, Tez, Impala, Pig, Mahout, Spark, and Giraph (Ankam, 2016; Karanth, 2014). The Hadoop architecture includes NameNodes and DataNodes.  It also includes Oozie for workflow, Pig for scripting, Mahout for machine learning, Hive for the data warehouse.  Sqoop for data exchange, and Flume for log collection.  YARN is in Hadoop 2.0 as discussed earlier for distributed computing, while HCatalog for Hadoop metadata management.  HBase is for columnar database and Zookeeper for coordination (Alguliyev & Imamverdiyev, 2014).  Figure 3 shows the Hadoop ecosystem components.


Figure 3.  Hadoop Architecture (Alguliyev & Imamverdiyev, 2014).

Big Data Analytics Process Using Hadoop

The process of Big Data Analytics involves six essential steps (Ankam, 2016). The identification of the business problem and outcomes is the first step.  Examples of business problems include sales are going down, or shopping carts are abandoned by customers, a sudden rise in the call volumes, and so forth.  Examples of the outcome include improving the buying rate by 10%, decreasing shopping cart abandonment by 50%, and reducing call volume by 50% by next quarter while keeping customers happy.  The required data must be identified where data sources can be data warehouse using online analytical processing, application database using online transactional processing, log files from servers, documents from the internet, sensor-generated data, and so forth, based on the case and the problem.  Data collection is the third step in analyzing the Big Data (Ankam, 2016).  Sqoop tool can be used to collect data from the relational database, and Flume can be used for stream data.  Apache Kafka can be used for reliable intermediate storage.  The data collection and design should be implemented using the fault tolerance strategy (Ankam, 2016).  The preprocessing data and ETL process is the fourth step in the analytical process.  The collected data comes in various formats, and the data quality can be an issue. Thus, before processing it, it needs to be converted to the required format and cleaned from inconsistent, invalid or corrupted data.  Apache Hive, Apache Pig, and Spark SQL can be used for preprocessing massive amounts of data.  The analytics implementation is the fifth steps which should be in order to answer the business questions and problems. The analytical process requires understanding the data and relationships between data points.  The types of data analytics include descriptive and diagnostic analytics to present the past and current views of the data, to answer questions such as what and why happened.  The predictive analytics is performed to answer questions such as what would happen based on a hypothesis. Apache Hive, Pig, Impala, Drill, Tez, Apache Spark, and HBase can be used for data analytics in batch processing mode.  Real-time analytics tools including Impala, Tez, Drill, and Spark SQL can be integrated into the traditional business intelligence (BI) using any of BI tools such as Tableau, QlikView, and others for interactive analytics. The last step in this process involves the visualization of the data to present the analytics output in a graphical or pictorial format to understand the analysis better for decision making.  The finished data is exported from Hadoop to a relational database using Sqoop, for integration into visualization systems or visualizing systems are directly integrated into tools such as Tableau, QlikView, Excel, and so forth.  Web-based notebooks such as Jupyter, Zeppelin, and Data bricks cloud are also used to visualize data by integrating Hadoop and Spark components (Ankam, 2016). 

Data Preprocessing, Modeling and Design Consideration in Hadoop

            Before processing any data, and before collecting any data for storage, some considerations must be taken for data modeling and design in Hadoop for better processing and better retrieval (Grover et al., 2015).  The traditional data management system is referred to as Schema-on-Write system which requires the definition of the schema of the data store before the data is loaded (Grover et al., 2015).  This traditional data management system results in long analysis cycles, data modeling, data transformation loading, testing, and so forth before the data can be accessed (Grover et al., 2015).   In addition to this long analysis cycle, if anything changes or wrong decision was made, the cycle must start from the beginning which will take longer time for processing (Grover et al., 2015).   This section addresses various types of consideration before processing the data from Hadoop for analytical purpose.

Data Pre-Processing Consideration

The dataset may have various levels of quality regarding noise, redundancy, and consistency (Hu, Wen, Chua, & Li, 2014).  Preprocessing techniques must be used to improve data quality should be in place in Big Data systems (Hu et al., 2014; Lublinsky, Smith, & Yakubovich, 2013).  The data pre-processing involves three techniques: data integration, data cleansing, and redundancy elimination.

The data integration techniques are used to combine data residing in different sources and provide users with a unified view of the data (Hu et al., 2014).  The traditional database approach has well-established data integration system including the data warehouse method, and the data federation method (Hu et al., 2014).  The data warehouse approach is also known as ETL consisting of extraction, transformation, and loading (Hu et al., 2014).  The extraction step involves the connection to the source systems and selecting and collecting the required data to be processed for analytical purposes.  The transformation step involves the application of a series of rules to the extracted data to convert it into a standard format.  The load step involves importing extracted and transformed data into a target storage infrastructure (Hu et al., 2014).  The federation approach creates a virtual database to query and aggregate data from various sources (Hu et al., 2014).  The virtual database contains information or metadata about the actual data, and its location and does not contain data itself (Hu et al., 2014).  These two data pre-processing are called store-and-pull techniques which is not appropriate for Big Data processing, with high computation and high streaming, and dynamic nature (Hu et al., 2014).  

The data cleansing process is a vital process to keep the data consistent and updated to get widely used in many fields such as banking, insurance, and retailing (Hu et al., 2014).  The cleansing process is required to determine the incomplete, inaccurate, or unreasonable data and then remove these data to improve the quality of the data (Hu et al., 2014). The data cleansing process includes five steps (Hu et al., 2014).  The first step is to define and determine the error types.  The second step is to search and identify error instances.  The third step is to correct the errors, and then document error instances and error types. The last step is to modify data entry procedures to reduce future errors.  Various types of checks must be done at the cleansing process, including the format checks, completeness checks, reasonableness checks, and limit checks (Hu et al., 2014).  The process of data cleansing is required to improve the accuracy of the analysis (Hu et al., 2014).  The data cleansing process depends on the complex relationship model, and it has extra computation and delay overhead (Hu et al., 2014).  Organizations must seek a balance between the complexity of the data-cleansing model and the resulting improvement in the accuracy analysis (Hu et al., 2014). 

The data redundancy is the third data pre-processing step where data is repeated increasing the overhead of the data transmission and causes limitawtions for storage systems, including wasted space, inconsistency of the data, corruption of the dta, and reduced reliability (Hu et al., 2014).  Various redundancy reduction methods include redundancy detection and data compression (Hu et al., 2014).  The data compression method poses an extra computation burden in the data compression and decompression processes (Hu et al., 2014).

Data Modeling and Design Consideration

Schema-on-Write system is used when the application or structure is well understood and frequently accessed through queries and reports on high-value data (Grover et al., 2015).        The term Schema-on-Read is used in the context of Hadoop data management system (Ankam, 2016; Grover et al., 2015). This term refers to the raw data, that is not processed and can be loaded to Hadoop using the required structure at processing time based on the requirement of the processing application (Ankam, 2016; Grover et al., 2015).  The Schema-on-Read is used when the application or structure of data is not well understood (Ankam, 2016; Grover et al., 2015).  The agility of the process is implemented through the schema-on-read providing valuable insights on data not previously accessible (Grover et al., 2015).

            Five factors must be considered before storing data into Hadoop for processing (Grover et al., 2015).  The data storage format must be considered as there are some file formats and compression formats supported on Hadoop.  Each type of format has strengths that make it better suited to specific applications.   Although Hadoop Distributed File System (HDFS) is a building block of Hadoop ecosystem, which is used for storing data, several commonly used systems implemented on top of HDFS such as HBase for traditional data access functionality, and Hive for additional data management functionality (Grover et al., 2015).  These systems of HBase for data access functionality and Hive for data management functionality must be taken into consideration before storing data into Hadoop (Grover et al., 2015). The second factor involves the multitenancy which is a common approach for clusters to host multiple users, groups and application types. The multi-tenant clusters involve essential considerations for data storage.  The schema design factor should also be considered before storing data into Hadoop even if Hadoop is a schema-less (Grover et al., 2015).  The schema design consideration involves directory structures for data loaded into HDFS and the output of the data processing and analysis, including the schema of objects stored in systems such as HBase and Hive.  The last factor for consideration before storing data into Hadoop is represented in the metadata management.  Metadata is related to the stored data and is often regarded as necessary as the data.  The understanding of the metadata management plays a significant role as it can affect the accessibility of the data.  The security is another factor which should be considered before storing data into Hadoop system.  The security of the data decision involves authentication, fine-grained access control, and encryption. These security measures should be considered for data at rest when it gets stored as well as in motion during the processing (Grover et al., 2015).  Figure 4 summarizes these considerations before storing data into the Hadoop system. 


Figure 4.  Considerations Before Storing Data into Hadoop.

Data Storage Format Considerations

            When architecting a solution on Hadoop, the method of storing the data into Hadoop is one of the essential decisions. Primary considerations for data storage in Hadoop involve file format, compression, data storage system (Grover et al., 2015).  The standard file formats involve three types:  text data, structured text data, and binary data.  Figure 5 summarizes these three standard file formats.


Figure 5.  Standard File Formats.

The text data is widespread use of Hadoop including log file such as weblogs, and server logs (Grover et al., 2015).  These text data format can come in many forms such as CSV files, or unstructured data such as emails.  Compression of the file is recommended, and the selection of the compression is influenced by how the data will be used (Grover et al., 2015).  For instance, if the data is for archival, the most compact compression method can be used, while if the data are used in processing jobs such as MapReduce, the splittable format should be used (Grover et al., 2015).  The splittable format enables Hadoop to split files into chunks for processing, which is essential to efficient parallel processing (Grover et al., 2015).

In most cases, the use of container formats such as SequenceFiles or Avro provides benefits making it the preferred format for most file system including text (Grover et al., 2015).  It is worth noting that these container formats provide functionality to support splittable compression among other benefits (Grover et al., 2015).   The binary data involves images which can be stored in Hadoop as well.  The container format such as SequenceFile is preferred when storing binary data in Hadoop.  If the binary data splittable unit is more than 64MB, the data should be put into its file, without using the container format (Grover et al., 2015).

XML and JSON Format Challenges with Hadoop

The structured text data include formats such as XML and JSON, which can present unique challenges using Hadoop because splitting XML and JSON files for processing is not straightforward, and Hadoop does not provide a built-in InputFormat for either (Grover et al., 2015).  JSON presents more challenges to Hadoop than XML because no token is available to mark the beginning or end of a record.  When using these file format, two primary consideration must be taken.  The container format such as Avro should be used because Avro provides a compact and efficient method to store and process the data when transforming the data into Avro (Grover et al., 2015).  A library for processing XML or JSON should be designed.  XMLLoader in PiggyBank library for Pig is an example when using XML data type.  The Elephant Bird project is an example of a JSON data type file (Grover et al., 2015). 

Hadoop File Types Considerations

            Several Hadoop-based file formats created to work well with MapReduce (Grover et al., 2015).  The Hadoop-specific file formats include file-based data structures such as sequence files, serialization formats like Avro, and columnar formats such as RCFile and Parquet (Grover et al., 2015).  These files types share two essential characteristics that are important for Hadoop application: splittable compression and agnostic compression.  The ability of splittable files play a significant role during the data processing, and should not be underestimated when storing data in Hadoop because it allows large files to be split for input to MapReduce and other types of jobs, which is a fundamental part of parallel processing and a key to leveraging data locality feature of Hadoop (Grover et al., 2015).  The agnostic compression is the ability to compress using any compression codec without readers having to know the codec because the codec is stored in the header metadata of the file format (Grover et al., 2015).  Figure 6 summarizes these Hadoop-specific file formats with the typical characteristics of splittable compression and agnostic compression.


Figure 6. Three Hadoop File Types with the Two Common Characteristics.  

1.      SequenceFiles Format Consideration

SequenceFiles format is the most widely used Hadoop file-based formats.  SequenceFile format store data as binary key-value pairs (Grover et al., 2015).  It involves three formats for records stored within SequenceFiles:  uncompressed, record-compressed, and block-compressed.  Every SequenceFile uses a standard header format containing necessary metadata about the file such as the compression codec used, key and value class names, user-defined metadata, and a randomly generated syn marker.  The SequenceFiles arewell supported in Hadoop. However, it has limited support outside the Hadoop ecosystem as it is only supported in Java language.  The frequent use case for SequenceFiles is a container for smaller files.  However, storing a large number of small files in Hadoop can cause memory issue and excessive overhead in processing.  Packing smaller files into a SequenceFile can make the storage and processing of these files more efficient because Hadoop is optimized for large files (Grover et al., 2015).   Other file-based formats include the MapFiles, SetFiles, Array-Files, and BloomMapFiles.  These formats offer a high level of integration for all forms of MapReduce jobs, including those run via Pig and Hive because they were designed to work with MapReduce (Grover et al., 2015).  Figure 7 summarizes the three formats for records stored within SequenceFiles.


Figure 7.  Three Formats for Records Stored within SequenceFile.

2.      Serialization Formats Consideration

Serialization is the process of moving data structures into bytes for storage or for transferring data over the network (Grover et al., 2015).   The de-serialization is the opposite process of converting a byte stream back into a data structure (Grover et al., 2015).  The serialization process is the fundamental building block for distributed processing systems such as Hadoop because it allows data to be converted into a format that can be efficiently stored and transferred across a network connection (Grover et al., 2015).  Figure 8 summarizes the serialization formats when architecting for Hadoop.


Figure 8.  Serialization Process vs. Deserialization Process.

The serialization involves two aspects of data processing in a distributed system of interprocess communication using data storage, and remote procedure calls or RPC (Grover et al., 2015).  Hadoop utilizes Writables as the main serialization format, which is compact and fast but uses Java only.  Other serialization frameworks have been increasingly used within Hadoop ecosystems, including Thrift, Protocol Buffers and Avro (Grover et al., 2015).  Avro is a language-neutral data serialization system (Grover et al., 2015).  It was designed to address the limitation of the Writables of Hadoop which is lack of language portability.  Similar to Thrift and Protocol Buffers, Avro is described through a language-independent schema (Grover et al., 2015).   Avro, unlike Thrift and Protocol Buffers, the code generation is optional.  Table 1 provides a comparison between these serialization formats.

Table 1:  Comparison between Serialization Formats.

3.      Columnar Format Consideration

Row-oriented systems have been used to fetch data stored in the database (Grover et al., 2015).  This type of data retrieval has been used as the analysis heavily relied on fetching all fields for records that belonged to a specific time range.  This process is efficient if all columns of the record are available at the time or writing because the record can be written with a single disk seek.  The column storage has recently been used to fetch data.  The use of columnar storage has four main benefits over the row-oriented system (Grover et al., 2015).  The skips I/O and decompression on columns that are not part of the query is one of the benefits of the columnar storage.  Columnar data storage works better for queries that access a small subset of columns than the row-oriented data storage, which can be used when many columns are retrieved.  The compression on columns provides efficiency because data is more similar within the same column than it is in a block of rows.  The columnar data storage is more appropriate for data warehousing-based applications where aggregations are implemented using specific columns than an extensive collection of records (Grover et al., 2015).  Hadoop applications have been using the columnar file formats including the RCFile format, Optimized Row Columnar (ORC), and Parquet.  The RCFile format has been used as a Hive Format.  It was developed to provide fast data loading, fast query processing, and highly efficient storage space utilization.  It breaks files into row splits, and within each split uses columnar-oriented storage.  Despite its advantages of the query and compression performance compared to SequenceFiles, it has limitations, that prevent the optimal performance for query times and compression.  The newer version of the columnar formats ORC and Parquet are designed to address many of the limitations of the RCFile (Grover et al., 2015). 

Compression Consideration

Compression is another data storage consideration because it plays a crucial role in reducing the storage requirements, and in improving the data processing performance (Grover et al., 2015).  Some compression formats supported on Hadoop are not splittable (Grover et al., 2015).  MapReduce framework splits data for input to multiple tasks; the nonsplittable compression format is an obstacle to efficient processing.  Thus, the splittability is a critical consideration in selecting the compression format and file format for Hadoop.  Various compression types for Hadoop include Snappy, LZO, Gzip, bzip2.  Google developed Snappy for speed. However, it does not offer the best compression size. It is designed to be used with a container format like SequenceFile or Avro because it is not inherently splittable.  It is being distributed with Hadoop. Similar to Snappy, LZO is optimized for speed as opposed to size.  However, LZO, unlike Snappy support splittability of the compressed files, but it requires indexing.  LZO, unlike Snappy, is not distributed with Hadoop and requires a license and separate installation.  Gzip, like Snappy, provides good compression performance, but is not splittable, and it should be used with a container format. The speed read performance of the Gzip is like the Snappy.  Gzip is slower than Snappy for write processing.  Gzip is not splittable and should be used with a container format.  The use of smaller blocks with Gzip can result in better performance.   The bzip2 is another compression type for Hadoop.  It provides good compression performance, but it can be slower than another compression codec such as Snappy.  It is not an ideal codec for Hadoop storage. Bzip2, unlike Snappy and Gzip, is inherently splittable.  It inserts synchronization markers between blocks.  It can be used for active archival purposes (Grover et al., 2015).

The compression format can become splittable when used with container file formats such as Avro, SequenceFile which compress blocks of records or each record individually (Grover et al., 2015).  If the compression is implemented on the entire file without using the container file format, the compression format that inherently supports splittable must be used such as bzip2.  The compression use with Hadoop has three recommendation (Grover et al., 2015).  The first recommendation is to enable compression of MapReduce intermediate output, which improves performance by decreasing the among of intermediate data that needs to be read and written from and to disk.  The second recommendation s to pay attention to the order of the data.  When the data is close together, it provides better compression levels. The data in Hadoop file format is compressed in chunks, and the organization of those chunks determines the final compression.   The last recommendation is to consider the use of a compact file format with support for splittable compression such as Avro.  Avro and SequenceFiles support splittability with non-splittable compression formats.  A single HDFS block can contain multiple Avro or SequenceFile blocks. Each block of the Avro or SequenceFile can be compressed and decompressed individually and independently of any other blocks of Avro or SequenceFile. This technique makes the data splittable because each block can be compressed and decompressed individually.  Figure 9 shows the Avro and SequenceFile splittability support (Grover et al., 2015).  


Figure 9.  Compression Example Using Avro (Grover et al., 2015).

Design Consideration for HDFS Schema

HDFS and HBase are the commonly used storage managers in the Hadoop ecosystem.  Organizations can store the data in HDFS or HBase which internally store it on HDFS (Grover et al., 2015).  When storing data in HDFS, some design techniques must be taken into consideration.  The schema-on-read model of Hadoop does not impose any requirement when loading data into Hadoop, as data can be ingested into HDFS by one of many methods without the requirements to associate a schema or preprocess the data.  Although Hadoop has been used to load many types of data such as the unstructured data, semi-structured data, some order is still required, because Hadoop serves as a central location for the entire organization and the data stored in HDFS is intended to be shared across various departments and teams in the organization (Grover et al., 2015).  The data repository should be carefully structured and organized to provide various benefits to the organization  (Grover et al., 2015).   When there is a standard directory structure, it becomes easier to share data among teams working with the same data set.  The data gets staged in a separate location before processing it.  The standard stage technique can help not processing data that has not been appropriately staged or entirely yet.  The standard organization of data allows for some code reuse that may process the data (Grover et al., 2015).  The placement of data assumptions can help simplify the loading process of the data into Hadoop.   The HDFS data model design for projects such as data warehouse implementation is likely to use structure facts and dimension tables similar to the traditional schema  (Grover et al., 2015).  The HDFS data model design for projects of unstructured and semi-structured data is likely to focus on directory placement and metadata management (Grover et al., 2015). 

Grover et al. (2015) suggested three key considerations when designing the schema, regardless of the data model design project.  The first consideration is to develop standard practices that can be followed by all teams.  The second point is to ensure the design works well with the chosen tools.  For instance, if the version of Hive can support only table partitions on directories that are named a certain way, it will affect the schema design and the names of the table subdirectories.  The last consideration when designing a schema is to keep usage patterns in mind, because different data processing and querying patterns work better with different schema designs (Grover et al., 2015). 

HDFS Files Location Consideration

            The first step when designing an HDFS schema involves the determination of the location of the file.  Standard file location plays a significant role in finding and sharing data among various departments and teams. It also helps in the assignment of permission to access files to various groups and users.  The recommended file locations are summarized in Table 2.


Table 2.  Standard Files Locations.

HDFS Schema Design Consideration

The HDFS schema design involves advanced techniques to organize data into files (Grover et al., 2015).   A few strategies are recommended to organize the data set. These strategies for data organization involve partitioning, bucketing, and denormalizing process.  The partitioning process of the data set is a common technique used to reduce the amount of I/O required to process the data set.  HDFS does not store indexes on the data unlike the traditional data warehouse. Such a lack of indexes in HDFS plays a key role in speeding up data ingest, with a full table scan cost where every query will have to read the entire dataset even when processing a small subset of data. Breaking up the data set into smaller subsets, or partitions can help with the full table scan, allowing queries to read only the specific partitions reducing the amount of I/O and improving the query time processing significantly (Grover et al., 2015). When data is placed in the filesystem, the directory format for partition should be as shown below.  The order data sets are partitioned by date because there are a large number of orders done daily and the partitions will contain large enough files which are optimized by HDFS.  Various tools such as HCatalog, Hive, Impala, and Pig understand this directory structure leveraging the partitioning to reduce the amount of I/O requiring during the data processing (Grover et al., 2015).

  • <data set name>/<partition_column_name=partition_column_value>/(Armstrong)
  • e.g. medication_orders/date=20181107/[order1.csv, order2.csv]

Bucketing is another technique for breaking a large data set into manageable sub-sets (Grover et al., 2015).  The bucketing technique is similar to the hash partitions which is used in the relational database.   Various tools such as HCatalog, Hive, Impala, and Pig understand this directory structure leveraging the partitioning to reduce the amount of I/O requiring during the data processing. The partition example above was implemented using the date which resulted in large data files which can be optimized by HDFS (Grover et al., 2015).  However, if the data sets are partitioned by a the category of the physician, the result will be too many small files, which leads to small file problems, which can lead to excessive memory use for the NameNode, since metadata for each file stored in HDFS is stored in memory (Grover et al., 2015).  Many small files can also lead to many processing tasks, causing excessive overhead in processing.  The solution for too many small files is to use the bucketing process for the physician in this example, which uses the hashing function to map physician into a specified number of buckets (Grover et al., 2015).

The bucketing technique controls the size of the data subsets and optimizes the query speed (Grover et al., 2015).  The recommended average bucket size is a few multiples of the HDFS block size. The distribution of data when hashed on the bucketing column is essential because it results in consistent bucketing (Grover et al., 2015).  The use of the number of buckets as a power of two is every day.   Bucketing allows joining two data sets.  The join, in this case, is used to represent the general idea of combining two data sets to retrieve a result. The joins can be implemented through the SQL-on-Hadoop systems and also in MapReduce, or Spark, or other programming interfaces to Hadoop.  When using join in the bucketing technique, it joins corresponding buckets individually without having to join the entire datasets, which help in minimizing the time complexity for the reduce-side join of the two datasets process, which is computationally expensive (Grover et al., 2015).   The join is implemented in the map stage of a MapReduce job by loading the smaller of the buckets in memory because the buckets are small enough to easily fit into memory, which is called map-side join process.  The map-side join process improves the join performance as compared to a reduce-side join process.  A hive for data analysis recognizes the tables are bucketed and optimize the process accordingly.

Further optimization can be implemented if the data in the bucket is sorted, the merge join can be used, and the entire bucket does not get stored in memory when joining, resulting in the faster process and much less memory than a simple bucket join.  Hive supports this optimization as well.  The use of both sorting and bucketing on large tables that are frequently joined together using the join key for bucketing is recommended (Grover et al., 2015).

The schema design depends on how the data will be queried (Grover et al., 2015).  Thus, the columns to be used for joining and filtering must be identified before the portioning and bucketing of the data is implemented.   In some cases, when the identification of one partitioning key is challenging, storing the same data set multiple times can be implemented, each with the different physical organization, which is regarded to be an anti-pattern in a relational database.  However, this solution can be implemented with Hadoop, because with Hadoop is write-once, and few updates are expected.  Thus, the overhead of keeping duplicated data set in sync is reduced.  The cost of storage in Hadoop clusters is reduced as well  (Grover et al., 2015). The duplicated data set in sync provides better query speed processing in such cases (Grover et al., 2015). 

Regarding the denormalizing process, it is another technique of trading the disk space for query performance, where joining the entire data set need is minimized (Grover et al., 2015).   In the relational database model, the data is stored in the third standard form (NF3), where redundancy is minimized, and data integrity is enforced by splitting data into smaller tables, each holding a particular entity.  In this relational model, most queries require joining a large number of tables together to produce a final result as desired (Grover et al., 2015).  However, in Hadoop, joins are often the slowest operations and consume the most resources from the cluster.  Specifically, the reduce-side join requires sending the entire table over the network, which is computationally costly.  While sorting and bucketing help minimizing this computational cost, another solution is to create data sets that are pre-joined or pre-aggregated (Grover et al., 2015).  Thus, the data can be joined once and store it in this form instead of running the join operations every time there is a query for that data.  Hadoop schema consolidates many of the small dimension tables into a few larger dimensions by joining them during the ETL process  (Grover et al., 2015).  Other techniques to speed up the process include the aggregation or data type conversion.  The duplication of the data is of less concern; thus, when the processing is frequent for a large number of queries, it is recommended to doing it one and reuse as the case with a materialized view in the relational database.  In Hadoop, the new dataset is created that contains the same data in its aggregated form (Grover et al., 2015).

To summarize, the partitioning process is used to reduce the I/O overhead of processing by selectively reading and writing data in particular partitions.  The bucketing can be used to speed up queries that involve joins or sampling, by reducing the I/O as well.  The denormalization can be implemented to speed up Hadoop jobs.   In this section, a review of advanced techniques to organize data into files is discussed.  The discussion includes the use of a small number of large files versus a large number of small files.  Hadoop prefers working with a small number of large files than a large number of small files.  The discussion also addresses the reduce-side join versus map-side join techniques.   The reduce-side join is computationally costly. Hence, the map-side join technique is preferred and recommend. 

HBase Schema Design Consideration

HBase is not a relational database (Grover et al., 2015; Yang, Liu, Hsu, Lu, & Chu, 2013).  HBase is similar to a large hash table, which allows the association of values with keys and performs a fast lookup of the value based on a given key  (Grover et al., 2015). The operations of hash tables involve put, get, scan, increment and delete.  HBase provides scalability and flexibility and is useful in many applications, including fraud detection, which is a widespread application for HBase (Grover et al., 2015).

The framework of HBase involves Master Server, Region Servers, Write-Ahead Log (WAL), Memstore, HFile, API and Hadoop HDFS (Bhojwani & Shah, 2016).  Each component of the HBase framework plays a significant role in data storage and processing.  Figure 10 illustrated the HBase framework.


Figure 10.  HBase Architecture (Bhojwani & Shah, 2016).

            The following consideration must be taken when designing the schema for HBase (Grover et al., 2015).

  • Row Key Consideration.
  • Timestamp Consideration.
  • Hops Consideration.
  • Tables and Regions Consideration.
  • Columns Use Consideration.
  • Column Families Use Consideration.
  • Time-To-Live Consideration.

The row key is one of the most critical factors for well-architected HBase schema design (Grover et al., 2015).  The row key consideration involves record retrieval, distribution, block cache, the ability to scan, size, readability, and uniqueness.  The row key is critical for retrieving records from HBase. In the relational database, the composite key can be used to combine multiple primary keys.  In HBase, multiple pieces of information can be combined in a single key.  For instance, a key of customer_id, order_id, and timestamp will be a row key for a row describing an order. In a relational database, they are three different columns in the relational database, but in HBase, they will be combined into a single unique identifier.  Another consideration for selecting the row key is the get operation because a get operation of a single record is the fasted operation in HBase.  A single get operation can retrieve the most common uses of the data improves the performance, which requires to put much information in a single record which is called denormalized design.    For instance, while in the relational database, customer information will be placed in various tables, in HBase all customer information will be stored in a single record where get operation will be used. The distribution is another consideration for HBase schema design.  The row key determines the regions of HBase cluster for a given table, which will be scattered throughout various regions (Grover et al., 2015; Yang et al., 2013).   The row keys are sorted, and each region stores a range of these sorted row keys  (Grover et al., 2015).  Each region is pinned to a region server namely a node in the cluster  (Grover et al., 2015).  The combination of device ID and timestamp or reverse timestamp is commonly used to “salt” the key in machine data  (Grover et al., 2015).  The block cache is a least recently used (LRU) cache which caches data blocks in memory  (Grover et al., 2015).  HBase reads records in chunks of 64KB from the disk by default. Each of these chunks is called HBase block  (Grover et al., 2015).  When the HBase block is read from disk, it will be put into the block cache  (Grover et al., 2015).   The choice of the row key can affect the scan operation as well.  HBase scan rates are about eight times slower than HSFS scan rates.  Thus, reducing I/O requirements has a significant performance advantage.  The size of the row key determines the performance of the workload.  The short row key is better than, the long row key because it has lower storage overhead and faster read/ writes performance.  The readability of the row key is critical. Thus, it is essential to start with human-readable row key.  The uniqueness of the row key is also critical since a row key is equivalent to a key in hash table analogy.  If the row key is based on the non-unique attribute, the application should handle such cases and only put data in HBase with a unique row key (Grover et al., 2015).

The timestamp is the second essential consideration for good HBase schema design (Grover et al., 2015).  The timestamp provides advantages of determining which records are newer in case of put operation to modify the record.  It also determines the order where records are returned when multiple versions of a single record are requested. The timestamp is also utilized to remove out-of-date records because time-to-live (TTL) operation compared with the timestamp shows the record value has either been overwritten by another put or deleted (Grover et al., 2015).

The hop term refers to the number of synchronized “get” requests to retrieve specific data from HBase (Grover et al., 2015). The less hop, the better because of the overhead.  Although multi-hop requests with HBase can be made, it is best to avoid them through better schema design, for example by leveraging de-normalization, because every hop is a round-trip to HBase which has a significant performance overhead (Grover et al., 2015).

The number of tables and regions per table in HBase can have a negative impact on the performance and distribution of the data (Grover et al., 2015).  If the number of tables and regions are not implemented correctly, it can result in an imbalance in the distribution of the load.  Important considerations include one region server per node, many regions in a region server, a give region is pinned to a particular region server, and tables are split into regions and scattered across region servers.  A table must have at least one region.  All regions in a region server receive “put” requests and share the region server’s “memstore,” which is a cache structure present on every HBase region server. The “memstore” caches the write is sent to that region server and sorts them in before it flushes them when certain memory thresholds are reached. Thus, the more regions exist in a region server; the less memstore space is available per region.  The default configuration sets the ideal flush size to 100MB. Thus, the “memstore” size can be divided by 100MB and result should be the maximum number of regions which can be put on that region server.   The vast region takes a long time to compact.  The upper limit on the size of a region is around 20GB. However, there are successful HBase clusters with upward of 120GB regions.  The regions can be assigned to HBase table using one of two techniques. The first technique is to create the table with a single default region, which auto splits as data increases.  The second technique is to create the table with a given number of regions and set the region size to a high enough value, e.g., 100GB per region to avoid auto splitting (Grover et al., 2015).  Figure 11 shows a topology of region servers, regions and tables. 


Figure 11.  The Topology of Region Servers, Regions, and Tables (Grover et al., 2015).

The columns used in HBase is different from the traditional relational database (Grover et al., 2015; Yang et al., 2013).  In HBase, unlike the traditional database, a record can have a million columns, and the next record can have a million completely different columns, which is not recommended but possible (Grover et al., 2015).   HBase stores data in a format called HFile, where each column value gets its row in HFile (Grover et al., 2015; Yang et al., 2013). The row has files like row key, timestamp, column names, and values. The file format provides various functionality, like versioning and sparse column storage (Grover et al., 2015). 

HBase, include the concept of column families (Grover et al., 2015; Yang et al., 2013).  A column family is a container for columns.  In HBase, a table can have one or more column families.  Each column family has its set of HFiles and gets compacted independently of other column families in the same table.  In many cases, no more than one column family is needed per table.  The use of more than one column family per table can be done when the operation is done, or the rate of change on a subset of the columns of a table is different from the other columns (Grover et al., 2015; Yang et al., 2013).  The last consideration for HBase schema design is the use of TTL, which is a built-in feature of HBase which ages out data based on its timestamp (Grover et al., 2015).  If TTL is not used and an aging requirement is needed, then a much more I/O intensive operation would need to be done.   The objects in HBase begin with table object, followed by regions for the table, store per column family for each region for the table, memstore, store files, and block (Yang et al., 2013).  Figure 12 shows the hierarchy of objects in HBase.

Figure 12.  The Hierarchy of Objects in HBase (Yang et al., 2013).

To summarize this section, HBase schema design requires seven key consideration starting with the row key, which should be selected carefully for record retrieval, distribution, block cache, ability to scan, size, readability, and uniqueness.  The timestamp and hops are other schema design consideration for HBase.  Tables and regions must be considered for put performance, and compacting time.  The use of columns and column families should also be considered when designing the schema for HBase. The TTL to remove data that aged is another consideration for HBase schema design. 

Metadata Consideration

The above discussion has been about the data and the techniques to store it in Hadoop.  Metadata is as essential as the data itself.  Metadata is data about the data (Grover et al., 2015)).  Hadoop ecosystem has various forms of metadata.   Metadata about logical dataset usually stored in a separate metadata repository include the information like the location of a data set such as directory in HDFS or HBase table name, the schema associated with the dataset, the partitioning and sorting properties of the data set, the format of the data set e.g. CSV, SequenceFile, etc. (Grover et al., 2015). The metadata about files on HDFS includes the permission and ownership of such files and the location of various blocks on data nodes, usually stored and managed by Hadoop NameNode (Grover et al., 2015).  Metadata about tables in HBase include information like table names, associated namespace, associated attributes, e.g. MAX_FILESIZE, READONLY, etc., and the names of column families, usually stored and managed by HBase (Grover et al., 2015).  Metadata about data ingest and transformation include information like which user-generated a given dataset, where the dataset came from, how long it took to generate it, and how many records there are, or the size of the data load (Grover et al., 2015).  Metadata about dataset statistics include information like the number of rows in a dataset, number of unique values in each column, a histogram of the distribution of the data, and maximum and minimum values (Grover et al., 2015).  Figure 13 summarizes this various metadata.


Figure 13.  Various Metadata in Hadoop.

Apache Hive was the first project in the Hadoop ecosystem to store, manage and leverage metadata (Antony et al., 2016; Grover et al., 2015).  Hives stores this metadata in a relational database called the Hive “metastore” (Antony et al., 2016; Grover et al., 2015).  Hive also provides a “metastore” service which interfaces with the Hive metastore database (Antony et al., 2016; Grover et al., 2015).  The query process in Hive goes to the metastore to get the metadata for the desired query, and metastore sends the metadata to Hive generating execution plan, followed by executing the job using the Hadoop cluster, which implements the job and Hive send the fetched result to the user (Antony et al., 2016; Grover et al., 2015).  Figure 14 shows the query process and the role of the metastore in Hive framework.


Figure 14.  Query Process and the Role of Metastore in Hive (Antony et al., 2016).

More projects have utilized the concept of metadata that was introduced by Hive and created a separate project called HCatalog to enable the usage of Hive metastore outside of Hive (Grover et al., 2015).  HCatalog is a part of Hive and allows other tools like Pig and MapReduce to integrate with Hive metastore.  It also opens the access to Hive metastore to other tools such as REST API via WebHCat server.  MapReduce, Pig, and standalone applications can talk directly to the metastore of Hive through its APIs, but HCatalog allows easy access through its WebHCat REST APIs, and it allows the cluster administrators to lock down access to the Hive metastore to address security concerns. Other ways to store metadata include the embedding of metadata in file paths and names.  Another technique to store metadata involves storing it in HDFS in a hidden file, e.g., .metadata.  Figure 15 shows the HCatalog as an accessibility veneer around the Hive metastore (Grover et al., 2015). 


Figure 15.  HCatalog acts an accessibility veneer around the Hive metastore (Grover et al., 2015).

Hive Metastore and HCatalog Limitations

There are some limitations for Hive metastore and HCatalog, including the problem with high availability (Grover et al., 2015).  The HA database cluster solutions to bring HA to the Hive metastore database.  For the metastore service of Hive, there is support concurrently to run multiple metastores on more than one node in the cluster.  However, concurrency issues related to data definition language operations (DDL) can occur, and Hive community is working on fixing these issues. 

The fixed schema for metadata is another limitation.  Hadoop provides much flexibility on the type of data that can be stored, mainly because of the Schema-on-Read concept. Hive metastore provides a fixed schema for the metadata itself. It provides a tabular abstraction for the data sets.   The data in metastore is moving the part in the infrastructure which requires to be running and secured as part of Hadoop infrastructure (Grover et al., 2015).

Conclusion

This project has discussed essential topics related to Hadoop technology.  It began with an overview of Hadoop providing a history of Hadoop and the difference between Hadoop 1.x and Hadoop 2.x.  The discussion involved the Big Data Analytics Process using Hadoop technology.  The process involves six significant steps starting with the problem identification, required data to be collected, and the data collection process. The pre-processing data and ETL process must be implemented before performing the analytics. The last step is the visualization of the data for decision making.  Before processing any data and before collecting any data for storage, some considerations must be taken for data preprocessing, modeling and schema design in Hadoop for better processing and better data retrieval, giving some tools cannot split the data while others can.  These considerations begin with data storage format, followed by Hadoop file types consideration and XML and JSON format challenges in Hadoop.  Compression must be considered when designing the schema for Hadoop. Since HDFS and HBase are commonly used in Hadoop for data storage, the discussion involved the consideration for the HDFS and HBase schema design considerations.  To summarize the design of the schema for Hadoop, HDFS, and HBase makes a difference in storing data in various nodes using the right tools for splitting the data.  Thus, organizations must pay attention to the process and the design requirements before storing data into Hadoop for better computational processing. 

References

Alguliyev, R., & Imamverdiyev, Y. (2014). Big data: big promises for information security. Paper presented at the Application of Information and Communication Technologies (AICT), 2014 IEEE 8th International Conference on.

Ankam, V. (2016). Big Data Analytics: Packt Publishing Ltd.

Antony, B., Boudnik, K., Adams, C., Lee, C., Shao, B., & Sasaki, K. (2016). Professional Hadoop: John Wiley & Sons.

Armstrong, D. (n.d.). R: Learning by Example: Lattice Graphics. Retrieved from https://quantoid.net/files/rbe/lattice.pdf.

Bhojwani, N., & Shah, A. P. V. (2016). A SURVEY ON HADOOP HBASE SYSTEM. Development, 3(1).

Dittrich, J., & Quiané-Ruiz, J.-A. (2012). Efficient big data processing in Hadoop MapReduce. Proceedings of the VLDB Endowment, 5(12), 2014-2015.

Grover, M., Malaska, T., Seidman, J., & Shapira, G. (2015). Hadoop Application Architectures: Designing Real-World Big Data Applications: ” O’Reilly Media, Inc.”.

Hu, H., Wen, Y., Chua, T.-S., & Li, X. (2014). Toward scalable systems for big data analytics: A technology tutorial. IEEE Access, 2, 652-687.

Karanth, S. (2014). Mastering Hadoop: Packt Publishing Ltd.

Lublinsky, B., Smith, K. T., & Yakubovich, A. (2013). Professional hadoop solutions: John Wiley & Sons.

sas.com. (2018). Hadoop – why it is and why it matters. Retrieved from https://www.sas.com/en_us/insights/big-data/hadoop.html.

Yang, C. T., Liu, J. C., Hsu, W. H., Lu, H. W., & Chu, W. C. C. (2013, 16-18 Dec. 2013). Implementation of Data Transform Method into NoSQL Database for Healthcare Data. Paper presented at the 2013 International Conference on Parallel and Distributed Computing, Applications and Technologies.

 

Abstract

The purpose of this project is to discuss how data can be handled before Hadoop can take action on breaking data into manageable sizes.  The discussion begins with an overview of Hadoop providing a brief history of Hadoop and the difference between Hadoop 1.x and Hadoop 2.x. The discussion involves the Big Data Analytics process using Hadoop which involves six significant steps including the pre-processing data and ETL process where the data must be converted and cleaned before processing it.  Before data processing, some consideration must be taken for data preprocessing, modeling and schema design in Hadoop for better processing and data retrieval as it will affect how data can be split among various nodes in the distributed environment because not all tools can split the data.  This consideration begins with the data storage format, followed by Hadoop file types consideration and XML and JSON format challenges in Hadoop.  The compression of the data must be considered carefully because not all compression types are “splittable.” The discussion also involves the schema design consideration for HDFS and HBase since they are used often in the Hadoop ecosystem. 

Keywords: Big Data Analytics; Hadoop; Data Modelling in Hadoop; Schema Design in Hadoop.

Introduction

In the age of Big Data, dealing with large datasets in terabytes and petabytes is a reality and requires specific technology as the traditional technology was found inappropriate for it (Dittrich & Quiané-Ruiz, 2012).  Hadoop is developed to store, and process such large datasets efficiently.  Hadoop is becoming a data processing engine for Big Data (Dittrich & Quiané-Ruiz, 2012).  One of the significant advantages of Hadoop MapReduce is allowing non-expert users to run easily analytical tasks over Big Data (Dittrich & Quiané-Ruiz, 2012). However, before the analytical process takes place, some schema design and data modeling consideration must be taken for Hadoop so that the data process can be efficient (Grover, Malaska, Seidman, & Shapira, 2015).  Hadoop requires splitting the data. Some tools can split the data while others cannot split the data natively and requires integration (Grover et al., 2015). 

This project discusses these considerations to ensure the appropriate schema design for Hadoop and its components of HDFS, HBase where the data gets stored in a distributed environment.   The discussion begins with an overview of Hadoop first, followed by the data analytics process and ends with the data modeling techniques and consideration for Hadoop which can assist in splitting the data appropriately for better data processing performance and better data retrieval.

Overview of Hadoop

            Google published and disclosed its MapReduce technique and implementation early around 2004 (Karanth, 2014).  It also introduced the Google File System (GFS) which is associated with MapReduce implementation.  The MapReduce, since then, has become the most common technique to process massive data sets in parallel and distributed settings across many companies (Karanth, 2014).  In 2008, Yahoo released Hadoop as an open-source implementation of the MapReduce framework (Karanth, 2014; sas.com, 2018). Hadoop and its file system HDFS are inspired by Google’s MapReduce and GFS (Ankam, 2016; Karanth, 2014).  

The Apache Hadoop is the parent project for all subsequence projects of Hadoop (Karanth, 2014).  It contains three essential branches 0.20.1 branch, 0.20.2 branch, and 0.21 branch.  The 0.20.2 branch is often termed MapReduce v2.0, MRv2, or Hadoop 2.0.  Two additional releases for Hadoop involves the Hadoop-0.20-append and Hadoop-0.20-Security, introducing HDFS append and security-related features into Hadoop respectively.  The timeline for Hadoop technology is outlined in Figure 1.


Figure 1.  Hadoop Timeline from 2003 until 2013 (Karanth, 2014).

Hadoop version 1.0 was the inception and evolution of Hadoop as a simple MapReduce job-processing framework (Karanth, 2014).  It exceeded its expectations with wide adoption of massive data processing.  The stable version of the 1.x release includes features such as append and security.  Hadoop version 2.0 release came out in 2013 to increase efficiency and mileage from existing Hadoop clusters in enterprises.  Hadoop is becoming a common cluster-computing and storage platform from being limited to MapReduce only, because it has been moving faster than MapReduce to stay leading in massive scale data processing with the challenge of being backward compatible (Karanth, 2014). 

            In Hadoop 1.x, the JobTracker was responsible for the resource allocation and job execution (Karanth, 2014).  MapReduce was the only supported model since the computing model was tied to the resources in the cluster. The yet another resource negotiator (YARN) was developed to separate concerns relating to resource management and application execution, which enables other application paradigms to be added into Hadoop computing cluster. The support for diverse applications result in the efficient and effective utilization of the resources and integrates well with the infrastructure of the business (Karanth, 2014).  YARN maintains backward compatibility with Hadoop version 1.x APIs  (Karanth, 2014).  Thus, the old MapReduce program can still execute in YARN with no code changes, but it has to be recompiled (Karanth, 2014).

            YARN abstracts out the resource management functions to form a platform layer called ResourceManager (RM) (Karanth, 2014).  Every cluster must have RM to keep track of cluster resource usage and activity.  RM is also responsible for allocation of the resources and resolving contentions among resource seekers in the cluster.  RM utilizes a generalized resource model and is agnostic to application-specific resource needs.  RM does not need to know the resources corresponding to a single Map or Reduce slot (Karanth, 2014). Figure 2 shows Hadoop 1.x and Hadoop 2.x with YARN layer.   


Figure 2. Hadoop 1.x vs. Hadoop 2.x (Karanth, 2014).

Hadoop 2.x involves various enhancement at the storage layer as well.   These enhancements include the high availability feature to have a hot standby of NameNode (Karanth, 2014), when the active NameNode fails, the standby can become active NameNode in a matter of minutes.  The Zookeeper or any other HA monitoring service can be utilized to track NameNode failure (Karanth, 2014).  The failover process to promote the hot standby as the active NameNode is triggered with the assistance of the Zookeeper.  The HDFS federation is another enhancement in Hadoop 2.x, which is a more generalized storage model, where the block storage has been generalized and separated from the filesystem layer (Karanth, 2014).  The HDFS snapshots is another enhancement to the Hadoop 2.x which provides a read-only image of the entire or a particular subset of a filesystem to protect against user errors, backup, and disaster recovery.   Other enhancements added in Hadoop 2.x include the Protocol Buffers (Karanth, 2014). The wire protocol for RPCs within Hadoop is based on Protocol Buffers.  Hadoop 2.x is aware of the type of storage and expose this information to the application, to optimize data fetch and placement strategies (Karanth, 2014).  HDFS append support has been another enhancement in Hadoop 2.x.

Hadoop is regarded to be the de facto open-source framework for dealing with large-scale, massively parallel, and distributed data processing (Karanth, 2014).  The framework of Hadoop includes two layers for computation and data layer (Karanth, 2014).  The computation layer is used for parallel and distributed computation processing, while the data layer is used for a highly fault-tolerant data storage layer which is associated with the computation layer.  These two layers run on commodity hardware, which is not expensive, readily available, and compatible with other similar hardware (Karanth, 2014).

Hadoop Architecture

Apache Hadoop has four projects: Hadoop Common, Hadoop Distributed File System, Yet Another Resource Negotiator (YARN), and MapReduce (Ankam, 2016).  The HDFS is used to store data, MapReduce is used to process data, and YARN is used to manage the resources such as CPU and memory of the cluster and common utilities that support Hadoop framework (Ankam, 2016; Karanth, 2014).  Apache Hadoop integrates with other tools such as Avro, Hive, Pig, HBase, Zookeeper, and Apache Spark (Ankam, 2016; Karanth, 2014).

            Hadoop three significant components for Big Data Analytics.  The HDFS is a framework for reliable distributed data storage (Ankam, 2016; Karanth, 2014).  Some considerations must be taken when storing data into HDFS (Grover et al., 2015).  The multiple frameworks for parallel processing of data include MapReduce, Crunch, Cascading, Hive, Tez, Impala, Pig, Mahout, Spark, and Giraph (Ankam, 2016; Karanth, 2014). The Hadoop architecture includes NameNodes and DataNodes.  It also includes Oozie for workflow, Pig for scripting, Mahout for machine learning, Hive for the data warehouse.  Sqoop for data exchange, and Flume for log collection.  YARN is in Hadoop 2.0 as discussed earlier for distributed computing, while HCatalog for Hadoop metadata management.  HBase is for columnar database and Zookeeper for coordination (Alguliyev & Imamverdiyev, 2014).  Figure 3 shows the Hadoop ecosystem components.


Figure 3.  Hadoop Architecture (Alguliyev & Imamverdiyev, 2014).

Big Data Analytics Process Using Hadoop

The process of Big Data Analytics involves six essential steps (Ankam, 2016). The identification of the business problem and outcomes is the first step.  Examples of business problems include sales are going down, or shopping carts are abandoned by customers, a sudden rise in the call volumes, and so forth.  Examples of the outcome include improving the buying rate by 10%, decreasing shopping cart abandonment by 50%, and reducing call volume by 50% by next quarter while keeping customers happy.  The required data must be identified where data sources can be data warehouse using online analytical processing, application database using online transactional processing, log files from servers, documents from the internet, sensor-generated data, and so forth, based on the case and the problem.  Data collection is the third step in analyzing the Big Data (Ankam, 2016).  Sqoop tool can be used to collect data from the relational database, and Flume can be used for stream data.  Apache Kafka can be used for reliable intermediate storage.  The data collection and design should be implemented using the fault tolerance strategy (Ankam, 2016).  The preprocessing data and ETL process is the fourth step in the analytical process.  The collected data comes in various formats, and the data quality can be an issue. Thus, before processing it, it needs to be converted to the required format and cleaned from inconsistent, invalid or corrupted data.  Apache Hive, Apache Pig, and Spark SQL can be used for preprocessing massive amounts of data.  The analytics implementation is the fifth steps which should be in order to answer the business questions and problems. The analytical process requires understanding the data and relationships between data points.  The types of data analytics include descriptive and diagnostic analytics to present the past and current views of the data, to answer questions such as what and why happened.  The predictive analytics is performed to answer questions such as what would happen based on a hypothesis. Apache Hive, Pig, Impala, Drill, Tez, Apache Spark, and HBase can be used for data analytics in batch processing mode.  Real-time analytics tools including Impala, Tez, Drill, and Spark SQL can be integrated into the traditional business intelligence (BI) using any of BI tools such as Tableau, QlikView, and others for interactive analytics. The last step in this process involves the visualization of the data to present the analytics output in a graphical or pictorial format to understand the analysis better for decision making.  The finished data is exported from Hadoop to a relational database using Sqoop, for integration into visualization systems or visualizing systems are directly integrated into tools such as Tableau, QlikView, Excel, and so forth.  Web-based notebooks such as Jupyter, Zeppelin, and Data bricks cloud are also used to visualize data by integrating Hadoop and Spark components (Ankam, 2016). 

Data Preprocessing, Modeling and Design Consideration in Hadoop

            Before processing any data, and before collecting any data for storage, some considerations must be taken for data modeling and design in Hadoop for better processing and better retrieval (Grover et al., 2015).  The traditional data management system is referred to as Schema-on-Write system which requires the definition of the schema of the data store before the data is loaded (Grover et al., 2015).  This traditional data management system results in long analysis cycles, data modeling, data transformation loading, testing, and so forth before the data can be accessed (Grover et al., 2015).   In addition to this long analysis cycle, if anything changes or wrong decision was made, the cycle must start from the beginning which will take longer time for processing (Grover et al., 2015).   This section addresses various types of consideration before processing the data from Hadoop for analytical purpose.

Data Pre-Processing Consideration

The dataset may have various levels of quality regarding noise, redundancy, and consistency (Hu, Wen, Chua, & Li, 2014).  Preprocessing techniques must be used to improve data quality should be in place in Big Data systems (Hu et al., 2014; Lublinsky, Smith, & Yakubovich, 2013).  The data pre-processing involves three techniques: data integration, data cleansing, and redundancy elimination.

The data integration techniques are used to combine data residing in different sources and provide users with a unified view of the data (Hu et al., 2014).  The traditional database approach has well-established data integration system including the data warehouse method, and the data federation method (Hu et al., 2014).  The data warehouse approach is also known as ETL consisting of extraction, transformation, and loading (Hu et al., 2014).  The extraction step involves the connection to the source systems and selecting and collecting the required data to be processed for analytical purposes.  The transformation step involves the application of a series of rules to the extracted data to convert it into a standard format.  The load step involves importing extracted and transformed data into a target storage infrastructure (Hu et al., 2014).  The federation approach creates a virtual database to query and aggregate data from various sources (Hu et al., 2014).  The virtual database contains information or metadata about the actual data, and its location and does not contain data itself (Hu et al., 2014).  These two data pre-processing are called store-and-pull techniques which is not appropriate for Big Data processing, with high computation and high streaming, and dynamic nature (Hu et al., 2014).  

The data cleansing process is a vital process to keep the data consistent and updated to get widely used in many fields such as banking, insurance, and retailing (Hu et al., 2014).  The cleansing process is required to determine the incomplete, inaccurate, or unreasonable data and then remove these data to improve the quality of the data (Hu et al., 2014). The data cleansing process includes five steps (Hu et al., 2014).  The first step is to define and determine the error types.  The second step is to search and identify error instances.  The third step is to correct the errors, and then document error instances and error types. The last step is to modify data entry procedures to reduce future errors.  Various types of checks must be done at the cleansing process, including the format checks, completeness checks, reasonableness checks, and limit checks (Hu et al., 2014).  The process of data cleansing is required to improve the accuracy of the analysis (Hu et al., 2014).  The data cleansing process depends on the complex relationship model, and it has extra computation and delay overhead (Hu et al., 2014).  Organizations must seek a balance between the complexity of the data-cleansing model and the resulting improvement in the accuracy analysis (Hu et al., 2014). 

The data redundancy is the third data pre-processing step where data is repeated increasing the overhead of the data transmission and causes limitawtions for storage systems, including wasted space, inconsistency of the data, corruption of the dta, and reduced reliability (Hu et al., 2014).  Various redundancy reduction methods include redundancy detection and data compression (Hu et al., 2014).  The data compression method poses an extra computation burden in the data compression and decompression processes (Hu et al., 2014).

Data Modeling and Design Consideration

Schema-on-Write system is used when the application or structure is well understood and frequently accessed through queries and reports on high-value data (Grover et al., 2015).        The term Schema-on-Read is used in the context of Hadoop data management system (Ankam, 2016; Grover et al., 2015). This term refers to the raw data, that is not processed and can be loaded to Hadoop using the required structure at processing time based on the requirement of the processing application (Ankam, 2016; Grover et al., 2015).  The Schema-on-Read is used when the application or structure of data is not well understood (Ankam, 2016; Grover et al., 2015).  The agility of the process is implemented through the schema-on-read providing valuable insights on data not previously accessible (Grover et al., 2015).

            Five factors must be considered before storing data into Hadoop for processing (Grover et al., 2015).  The data storage format must be considered as there are some file formats and compression formats supported on Hadoop.  Each type of format has strengths that make it better suited to specific applications.   Although Hadoop Distributed File System (HDFS) is a building block of Hadoop ecosystem, which is used for storing data, several commonly used systems implemented on top of HDFS such as HBase for traditional data access functionality, and Hive for additional data management functionality (Grover et al., 2015).  These systems of HBase for data access functionality and Hive for data management functionality must be taken into consideration before storing data into Hadoop (Grover et al., 2015). The second factor involves the multitenancy which is a common approach for clusters to host multiple users, groups and application types. The multi-tenant clusters involve essential considerations for data storage.  The schema design factor should also be considered before storing data into Hadoop even if Hadoop is a schema-less (Grover et al., 2015).  The schema design consideration involves directory structures for data loaded into HDFS and the output of the data processing and analysis, including the schema of objects stored in systems such as HBase and Hive.  The last factor for consideration before storing data into Hadoop is represented in the metadata management.  Metadata is related to the stored data and is often regarded as necessary as the data.  The understanding of the metadata management plays a significant role as it can affect the accessibility of the data.  The security is another factor which should be considered before storing data into Hadoop system.  The security of the data decision involves authentication, fine-grained access control, and encryption. These security measures should be considered for data at rest when it gets stored as well as in motion during the processing (Grover et al., 2015).  Figure 4 summarizes these considerations before storing data into the Hadoop system. 


Figure 4.  Considerations Before Storing Data into Hadoop.

Data Storage Format Considerations

            When architecting a solution on Hadoop, the method of storing the data into Hadoop is one of the essential decisions. Primary considerations for data storage in Hadoop involve file format, compression, data storage system (Grover et al., 2015).  The standard file formats involve three types:  text data, structured text data, and binary data.  Figure 5 summarizes these three standard file formats.


Figure 5.  Standard File Formats.

The text data is widespread use of Hadoop including log file such as weblogs, and server logs (Grover et al., 2015).  These text data format can come in many forms such as CSV files, or unstructured data such as emails.  Compression of the file is recommended, and the selection of the compression is influenced by how the data will be used (Grover et al., 2015).  For instance, if the data is for archival, the most compact compression method can be used, while if the data are used in processing jobs such as MapReduce, the splittable format should be used (Grover et al., 2015).  The splittable format enables Hadoop to split files into chunks for processing, which is essential to efficient parallel processing (Grover et al., 2015).

In most cases, the use of container formats such as SequenceFiles or Avro provides benefits making it the preferred format for most file system including text (Grover et al., 2015).  It is worth noting that these container formats provide functionality to support splittable compression among other benefits (Grover et al., 2015).   The binary data involves images which can be stored in Hadoop as well.  The container format such as SequenceFile is preferred when storing binary data in Hadoop.  If the binary data splittable unit is more than 64MB, the data should be put into its file, without using the container format (Grover et al., 2015).

XML and JSON Format Challenges with Hadoop

The structured text data include formats such as XML and JSON, which can present unique challenges using Hadoop because splitting XML and JSON files for processing is not straightforward, and Hadoop does not provide a built-in InputFormat for either (Grover et al., 2015).  JSON presents more challenges to Hadoop than XML because no token is available to mark the beginning or end of a record.  When using these file format, two primary consideration must be taken.  The container format such as Avro should be used because Avro provides a compact and efficient method to store and process the data when transforming the data into Avro (Grover et al., 2015).  A library for processing XML or JSON should be designed.  XMLLoader in PiggyBank library for Pig is an example when using XML data type.  The Elephant Bird project is an example of a JSON data type file (Grover et al., 2015). 

Hadoop File Types Considerations

            Several Hadoop-based file formats created to work well with MapReduce (Grover et al., 2015).  The Hadoop-specific file formats include file-based data structures such as sequence files, serialization formats like Avro, and columnar formats such as RCFile and Parquet (Grover et al., 2015).  These files types share two essential characteristics that are important for Hadoop application: splittable compression and agnostic compression.  The ability of splittable files play a significant role during the data processing, and should not be underestimated when storing data in Hadoop because it allows large files to be split for input to MapReduce and other types of jobs, which is a fundamental part of parallel processing and a key to leveraging data locality feature of Hadoop (Grover et al., 2015).  The agnostic compression is the ability to compress using any compression codec without readers having to know the codec because the codec is stored in the header metadata of the file format (Grover et al., 2015).  Figure 6 summarizes these Hadoop-specific file formats with the typical characteristics of splittable compression and agnostic compression.


Figure 6. Three Hadoop File Types with the Two Common Characteristics.  

1.      SequenceFiles Format Consideration

SequenceFiles format is the most widely used Hadoop file-based formats.  SequenceFile format store data as binary key-value pairs (Grover et al., 2015).  It involves three formats for records stored within SequenceFiles:  uncompressed, record-compressed, and block-compressed.  Every SequenceFile uses a standard header format containing necessary metadata about the file such as the compression codec used, key and value class names, user-defined metadata, and a randomly generated syn marker.  The SequenceFiles arewell supported in Hadoop. However, it has limited support outside the Hadoop ecosystem as it is only supported in Java language.  The frequent use case for SequenceFiles is a container for smaller files.  However, storing a large number of small files in Hadoop can cause memory issue and excessive overhead in processing.  Packing smaller files into a SequenceFile can make the storage and processing of these files more efficient because Hadoop is optimized for large files (Grover et al., 2015).   Other file-based formats include the MapFiles, SetFiles, Array-Files, and BloomMapFiles.  These formats offer a high level of integration for all forms of MapReduce jobs, including those run via Pig and Hive because they were designed to work with MapReduce (Grover et al., 2015).  Figure 7 summarizes the three formats for records stored within SequenceFiles.


Figure 7.  Three Formats for Records Stored within SequenceFile.

2.      Serialization Formats Consideration

Serialization is the process of moving data structures into bytes for storage or for transferring data over the network (Grover et al., 2015).   The de-serialization is the opposite process of converting a byte stream back into a data structure (Grover et al., 2015).  The serialization process is the fundamental building block for distributed processing systems such as Hadoop because it allows data to be converted into a format that can be efficiently stored and transferred across a network connection (Grover et al., 2015).  Figure 8 summarizes the serialization formats when architecting for Hadoop.


Figure 8.  Serialization Process vs. Deserialization Process.

The serialization involves two aspects of data processing in a distributed system of interprocess communication using data storage, and remote procedure calls or RPC (Grover et al., 2015).  Hadoop utilizes Writables as the main serialization format, which is compact and fast but uses Java only.  Other serialization frameworks have been increasingly used within Hadoop ecosystems, including Thrift, Protocol Buffers and Avro (Grover et al., 2015).  Avro is a language-neutral data serialization system (Grover et al., 2015).  It was designed to address the limitation of the Writables of Hadoop which is lack of language portability.  Similar to Thrift and Protocol Buffers, Avro is described through a language-independent schema (Grover et al., 2015).   Avro, unlike Thrift and Protocol Buffers, the code generation is optional.  Table 1 provides a comparison between these serialization formats.

Table 1:  Comparison between Serialization Formats.

3.      Columnar Format Consideration

Row-oriented systems have been used to fetch data stored in the database (Grover et al., 2015).  This type of data retrieval has been used as the analysis heavily relied on fetching all fields for records that belonged to a specific time range.  This process is efficient if all columns of the record are available at the time or writing because the record can be written with a single disk seek.  The column storage has recently been used to fetch data.  The use of columnar storage has four main benefits over the row-oriented system (Grover et al., 2015).  The skips I/O and decompression on columns that are not part of the query is one of the benefits of the columnar storage.  Columnar data storage works better for queries that access a small subset of columns than the row-oriented data storage, which can be used when many columns are retrieved.  The compression on columns provides efficiency because data is more similar within the same column than it is in a block of rows.  The columnar data storage is more appropriate for data warehousing-based applications where aggregations are implemented using specific columns than an extensive collection of records (Grover et al., 2015).  Hadoop applications have been using the columnar file formats including the RCFile format, Optimized Row Columnar (ORC), and Parquet.  The RCFile format has been used as a Hive Format.  It was developed to provide fast data loading, fast query processing, and highly efficient storage space utilization.  It breaks files into row splits, and within each split uses columnar-oriented storage.  Despite its advantages of the query and compression performance compared to SequenceFiles, it has limitations, that prevent the optimal performance for query times and compression.  The newer version of the columnar formats ORC and Parquet are designed to address many of the limitations of the RCFile (Grover et al., 2015). 

Compression Consideration

Compression is another data storage consideration because it plays a crucial role in reducing the storage requirements, and in improving the data processing performance (Grover et al., 2015).  Some compression formats supported on Hadoop are not splittable (Grover et al., 2015).  MapReduce framework splits data for input to multiple tasks; the nonsplittable compression format is an obstacle to efficient processing.  Thus, the splittability is a critical consideration in selecting the compression format and file format for Hadoop.  Various compression types for Hadoop include Snappy, LZO, Gzip, bzip2.  Google developed Snappy for speed. However, it does not offer the best compression size. It is designed to be used with a container format like SequenceFile or Avro because it is not inherently splittable.  It is being distributed with Hadoop. Similar to Snappy, LZO is optimized for speed as opposed to size.  However, LZO, unlike Snappy support splittability of the compressed files, but it requires indexing.  LZO, unlike Snappy, is not distributed with Hadoop and requires a license and separate installation.  Gzip, like Snappy, provides good compression performance, but is not splittable, and it should be used with a container format. The speed read performance of the Gzip is like the Snappy.  Gzip is slower than Snappy for write processing.  Gzip is not splittable and should be used with a container format.  The use of smaller blocks with Gzip can result in better performance.   The bzip2 is another compression type for Hadoop.  It provides good compression performance, but it can be slower than another compression codec such as Snappy.  It is not an ideal codec for Hadoop storage. Bzip2, unlike Snappy and Gzip, is inherently splittable.  It inserts synchronization markers between blocks.  It can be used for active archival purposes (Grover et al., 2015).

The compression format can become splittable when used with container file formats such as Avro, SequenceFile which compress blocks of records or each record individually (Grover et al., 2015).  If the compression is implemented on the entire file without using the container file format, the compression format that inherently supports splittable must be used such as bzip2.  The compression use with Hadoop has three recommendation (Grover et al., 2015).  The first recommendation is to enable compression of MapReduce intermediate output, which improves performance by decreasing the among of intermediate data that needs to be read and written from and to disk.  The second recommendation s to pay attention to the order of the data.  When the data is close together, it provides better compression levels. The data in Hadoop file format is compressed in chunks, and the organization of those chunks determines the final compression.   The last recommendation is to consider the use of a compact file format with support for splittable compression such as Avro.  Avro and SequenceFiles support splittability with non-splittable compression formats.  A single HDFS block can contain multiple Avro or SequenceFile blocks. Each block of the Avro or SequenceFile can be compressed and decompressed individually and independently of any other blocks of Avro or SequenceFile. This technique makes the data splittable because each block can be compressed and decompressed individually.  Figure 9 shows the Avro and SequenceFile splittability support (Grover et al., 2015).  


Figure 9.  Compression Example Using Avro (Grover et al., 2015).

Design Consideration for HDFS Schema

HDFS and HBase are the commonly used storage managers in the Hadoop ecosystem.  Organizations can store the data in HDFS or HBase which internally store it on HDFS (Grover et al., 2015).  When storing data in HDFS, some design techniques must be taken into consideration.  The schema-on-read model of Hadoop does not impose any requirement when loading data into Hadoop, as data can be ingested into HDFS by one of many methods without the requirements to associate a schema or preprocess the data.  Although Hadoop has been used to load many types of data such as the unstructured data, semi-structured data, some order is still required, because Hadoop serves as a central location for the entire organization and the data stored in HDFS is intended to be shared across various departments and teams in the organization (Grover et al., 2015).  The data repository should be carefully structured and organized to provide various benefits to the organization  (Grover et al., 2015).   When there is a standard directory structure, it becomes easier to share data among teams working with the same data set.  The data gets staged in a separate location before processing it.  The standard stage technique can help not processing data that has not been appropriately staged or entirely yet.  The standard organization of data allows for some code reuse that may process the data (Grover et al., 2015).  The placement of data assumptions can help simplify the loading process of the data into Hadoop.   The HDFS data model design for projects such as data warehouse implementation is likely to use structure facts and dimension tables similar to the traditional schema  (Grover et al., 2015).  The HDFS data model design for projects of unstructured and semi-structured data is likely to focus on directory placement and metadata management (Grover et al., 2015). 

Grover et al. (2015) suggested three key considerations when designing the schema, regardless of the data model design project.  The first consideration is to develop standard practices that can be followed by all teams.  The second point is to ensure the design works well with the chosen tools.  For instance, if the version of Hive can support only table partitions on directories that are named a certain way, it will affect the schema design and the names of the table subdirectories.  The last consideration when designing a schema is to keep usage patterns in mind, because different data processing and querying patterns work better with different schema designs (Grover et al., 2015). 

HDFS Files Location Consideration

            The first step when designing an HDFS schema involves the determination of the location of the file.  Standard file location plays a significant role in finding and sharing data among various departments and teams. It also helps in the assignment of permission to access files to various groups and users.  The recommended file locations are summarized in Table 2.


Table 2.  Standard Files Locations.

HDFS Schema Design Consideration

The HDFS schema design involves advanced techniques to organize data into files (Grover et al., 2015).   A few strategies are recommended to organize the data set. These strategies for data organization involve partitioning, bucketing, and denormalizing process.  The partitioning process of the data set is a common technique used to reduce the amount of I/O required to process the data set.  HDFS does not store indexes on the data unlike the traditional data warehouse. Such a lack of indexes in HDFS plays a key role in speeding up data ingest, with a full table scan cost where every query will have to read the entire dataset even when processing a small subset of data. Breaking up the data set into smaller subsets, or partitions can help with the full table scan, allowing queries to read only the specific partitions reducing the amount of I/O and improving the query time processing significantly (Grover et al., 2015). When data is placed in the filesystem, the directory format for partition should be as shown below.  The order data sets are partitioned by date because there are a large number of orders done daily and the partitions will contain large enough files which are optimized by HDFS.  Various tools such as HCatalog, Hive, Impala, and Pig understand this directory structure leveraging the partitioning to reduce the amount of I/O requiring during the data processing (Grover et al., 2015).

  • <data set name>/<partition_column_name=partition_column_value>/(Armstrong)
  • e.g. medication_orders/date=20181107/[order1.csv, order2.csv]

Bucketing is another technique for breaking a large data set into manageable sub-sets (Grover et al., 2015).  The bucketing technique is similar to the hash partitions which is used in the relational database.   Various tools such as HCatalog, Hive, Impala, and Pig understand this directory structure leveraging the partitioning to reduce the amount of I/O requiring during the data processing. The partition example above was implemented using the date which resulted in large data files which can be optimized by HDFS (Grover et al., 2015).  However, if the data sets are partitioned by a the category of the physician, the result will be too many small files, which leads to small file problems, which can lead to excessive memory use for the NameNode, since metadata for each file stored in HDFS is stored in memory (Grover et al., 2015).  Many small files can also lead to many processing tasks, causing excessive overhead in processing.  The solution for too many small files is to use the bucketing process for the physician in this example, which uses the hashing function to map physician into a specified number of buckets (Grover et al., 2015).

The bucketing technique controls the size of the data subsets and optimizes the query speed (Grover et al., 2015).  The recommended average bucket size is a few multiples of the HDFS block size. The distribution of data when hashed on the bucketing column is essential because it results in consistent bucketing (Grover et al., 2015).  The use of the number of buckets as a power of two is every day.   Bucketing allows joining two data sets.  The join, in this case, is used to represent the general idea of combining two data sets to retrieve a result. The joins can be implemented through the SQL-on-Hadoop systems and also in MapReduce, or Spark, or other programming interfaces to Hadoop.  When using join in the bucketing technique, it joins corresponding buckets individually without having to join the entire datasets, which help in minimizing the time complexity for the reduce-side join of the two datasets process, which is computationally expensive (Grover et al., 2015).   The join is implemented in the map stage of a MapReduce job by loading the smaller of the buckets in memory because the buckets are small enough to easily fit into memory, which is called map-side join process.  The map-side join process improves the join performance as compared to a reduce-side join process.  A hive for data analysis recognizes the tables are bucketed and optimize the process accordingly.

Further optimization can be implemented if the data in the bucket is sorted, the merge join can be used, and the entire bucket does not get stored in memory when joining, resulting in the faster process and much less memory than a simple bucket join.  Hive supports this optimization as well.  The use of both sorting and bucketing on large tables that are frequently joined together using the join key for bucketing is recommended (Grover et al., 2015).

The schema design depends on how the data will be queried (Grover et al., 2015).  Thus, the columns to be used for joining and filtering must be identified before the portioning and bucketing of the data is implemented.   In some cases, when the identification of one partitioning key is challenging, storing the same data set multiple times can be implemented, each with the different physical organization, which is regarded to be an anti-pattern in a relational database.  However, this solution can be implemented with Hadoop, because with Hadoop is write-once, and few updates are expected.  Thus, the overhead of keeping duplicated data set in sync is reduced.  The cost of storage in Hadoop clusters is reduced as well  (Grover et al., 2015). The duplicated data set in sync provides better query speed processing in such cases (Grover et al., 2015). 

Regarding the denormalizing process, it is another technique of trading the disk space for query performance, where joining the entire data set need is minimized (Grover et al., 2015).   In the relational database model, the data is stored in the third standard form (NF3), where redundancy is minimized, and data integrity is enforced by splitting data into smaller tables, each holding a particular entity.  In this relational model, most queries require joining a large number of tables together to produce a final result as desired (Grover et al., 2015).  However, in Hadoop, joins are often the slowest operations and consume the most resources from the cluster.  Specifically, the reduce-side join requires sending the entire table over the network, which is computationally costly.  While sorting and bucketing help minimizing this computational cost, another solution is to create data sets that are pre-joined or pre-aggregated (Grover et al., 2015).  Thus, the data can be joined once and store it in this form instead of running the join operations every time there is a query for that data.  Hadoop schema consolidates many of the small dimension tables into a few larger dimensions by joining them during the ETL process  (Grover et al., 2015).  Other techniques to speed up the process include the aggregation or data type conversion.  The duplication of the data is of less concern; thus, when the processing is frequent for a large number of queries, it is recommended to doing it one and reuse as the case with a materialized view in the relational database.  In Hadoop, the new dataset is created that contains the same data in its aggregated form (Grover et al., 2015).

To summarize, the partitioning process is used to reduce the I/O overhead of processing by selectively reading and writing data in particular partitions.  The bucketing can be used to speed up queries that involve joins or sampling, by reducing the I/O as well.  The denormalization can be implemented to speed up Hadoop jobs.   In this section, a review of advanced techniques to organize data into files is discussed.  The discussion includes the use of a small number of large files versus a large number of small files.  Hadoop prefers working with a small number of large files than a large number of small files.  The discussion also addresses the reduce-side join versus map-side join techniques.   The reduce-side join is computationally costly. Hence, the map-side join technique is preferred and recommend. 

HBase Schema Design Consideration

HBase is not a relational database (Grover et al., 2015; Yang, Liu, Hsu, Lu, & Chu, 2013).  HBase is similar to a large hash table, which allows the association of values with keys and performs a fast lookup of the value based on a given key  (Grover et al., 2015). The operations of hash tables involve put, get, scan, increment and delete.  HBase provides scalability and flexibility and is useful in many applications, including fraud detection, which is a widespread application for HBase (Grover et al., 2015).

The framework of HBase involves Master Server, Region Servers, Write-Ahead Log (WAL), Memstore, HFile, API and Hadoop HDFS (Bhojwani & Shah, 2016).  Each component of the HBase framework plays a significant role in data storage and processing.  Figure 10 illustrated the HBase framework.


Figure 10.  HBase Architecture (Bhojwani & Shah, 2016).

            The following consideration must be taken when designing the schema for HBase (Grover et al., 2015).

  • Row Key Consideration.
  • Timestamp Consideration.
  • Hops Consideration.
  • Tables and Regions Consideration.
  • Columns Use Consideration.
  • Column Families Use Consideration.
  • Time-To-Live Consideration.

The row key is one of the most critical factors for well-architected HBase schema design (Grover et al., 2015).  The row key consideration involves record retrieval, distribution, block cache, the ability to scan, size, readability, and uniqueness.  The row key is critical for retrieving records from HBase. In the relational database, the composite key can be used to combine multiple primary keys.  In HBase, multiple pieces of information can be combined in a single key.  For instance, a key of customer_id, order_id, and timestamp will be a row key for a row describing an order. In a relational database, they are three different columns in the relational database, but in HBase, they will be combined into a single unique identifier.  Another consideration for selecting the row key is the get operation because a get operation of a single record is the fasted operation in HBase.  A single get operation can retrieve the most common uses of the data improves the performance, which requires to put much information in a single record which is called denormalized design.    For instance, while in the relational database, customer information will be placed in various tables, in HBase all customer information will be stored in a single record where get operation will be used. The distribution is another consideration for HBase schema design.  The row key determines the regions of HBase cluster for a given table, which will be scattered throughout various regions (Grover et al., 2015; Yang et al., 2013).   The row keys are sorted, and each region stores a range of these sorted row keys  (Grover et al., 2015).  Each region is pinned to a region server namely a node in the cluster  (Grover et al., 2015).  The combination of device ID and timestamp or reverse timestamp is commonly used to “salt” the key in machine data  (Grover et al., 2015).  The block cache is a least recently used (LRU) cache which caches data blocks in memory  (Grover et al., 2015).  HBase reads records in chunks of 64KB from the disk by default. Each of these chunks is called HBase block  (Grover et al., 2015).  When the HBase block is read from disk, it will be put into the block cache  (Grover et al., 2015).   The choice of the row key can affect the scan operation as well.  HBase scan rates are about eight times slower than HSFS scan rates.  Thus, reducing I/O requirements has a significant performance advantage.  The size of the row key determines the performance of the workload.  The short row key is better than, the long row key because it has lower storage overhead and faster read/ writes performance.  The readability of the row key is critical. Thus, it is essential to start with human-readable row key.  The uniqueness of the row key is also critical since a row key is equivalent to a key in hash table analogy.  If the row key is based on the non-unique attribute, the application should handle such cases and only put data in HBase with a unique row key (Grover et al., 2015).

The timestamp is the second essential consideration for good HBase schema design (Grover et al., 2015).  The timestamp provides advantages of determining which records are newer in case of put operation to modify the record.  It also determines the order where records are returned when multiple versions of a single record are requested. The timestamp is also utilized to remove out-of-date records because time-to-live (TTL) operation compared with the timestamp shows the record value has either been overwritten by another put or deleted (Grover et al., 2015).

The hop term refers to the number of synchronized “get” requests to retrieve specific data from HBase (Grover et al., 2015). The less hop, the better because of the overhead.  Although multi-hop requests with HBase can be made, it is best to avoid them through better schema design, for example by leveraging de-normalization, because every hop is a round-trip to HBase which has a significant performance overhead (Grover et al., 2015).

The number of tables and regions per table in HBase can have a negative impact on the performance and distribution of the data (Grover et al., 2015).  If the number of tables and regions are not implemented correctly, it can result in an imbalance in the distribution of the load.  Important considerations include one region server per node, many regions in a region server, a give region is pinned to a particular region server, and tables are split into regions and scattered across region servers.  A table must have at least one region.  All regions in a region server receive “put” requests and share the region server’s “memstore,” which is a cache structure present on every HBase region server. The “memstore” caches the write is sent to that region server and sorts them in before it flushes them when certain memory thresholds are reached. Thus, the more regions exist in a region server; the less memstore space is available per region.  The default configuration sets the ideal flush size to 100MB. Thus, the “memstore” size can be divided by 100MB and result should be the maximum number of regions which can be put on that region server.   The vast region takes a long time to compact.  The upper limit on the size of a region is around 20GB. However, there are successful HBase clusters with upward of 120GB regions.  The regions can be assigned to HBase table using one of two techniques. The first technique is to create the table with a single default region, which auto splits as data increases.  The second technique is to create the table with a given number of regions and set the region size to a high enough value, e.g., 100GB per region to avoid auto splitting (Grover et al., 2015).  Figure 11 shows a topology of region servers, regions and tables. 


Figure 11.  The Topology of Region Servers, Regions, and Tables (Grover et al., 2015).

The columns used in HBase is different from the traditional relational database (Grover et al., 2015; Yang et al., 2013).  In HBase, unlike the traditional database, a record can have a million columns, and the next record can have a million completely different columns, which is not recommended but possible (Grover et al., 2015).   HBase stores data in a format called HFile, where each column value gets its row in HFile (Grover et al., 2015; Yang et al., 2013). The row has files like row key, timestamp, column names, and values. The file format provides various functionality, like versioning and sparse column storage (Grover et al., 2015). 

HBase, include the concept of column families (Grover et al., 2015; Yang et al., 2013).  A column family is a container for columns.  In HBase, a table can have one or more column families.  Each column family has its set of HFiles and gets compacted independently of other column families in the same table.  In many cases, no more than one column family is needed per table.  The use of more than one column family per table can be done when the operation is done, or the rate of change on a subset of the columns of a table is different from the other columns (Grover et al., 2015; Yang et al., 2013).  The last consideration for HBase schema design is the use of TTL, which is a built-in feature of HBase which ages out data based on its timestamp (Grover et al., 2015).  If TTL is not used and an aging requirement is needed, then a much more I/O intensive operation would need to be done.   The objects in HBase begin with table object, followed by regions for the table, store per column family for each region for the table, memstore, store files, and block (Yang et al., 2013).  Figure 12 shows the hierarchy of objects in HBase.

Figure 12.  The Hierarchy of Objects in HBase (Yang et al., 2013).

To summarize this section, HBase schema design requires seven key consideration starting with the row key, which should be selected carefully for record retrieval, distribution, block cache, ability to scan, size, readability, and uniqueness.  The timestamp and hops are other schema design consideration for HBase.  Tables and regions must be considered for put performance, and compacting time.  The use of columns and column families should also be considered when designing the schema for HBase. The TTL to remove data that aged is another consideration for HBase schema design. 

Metadata Consideration

The above discussion has been about the data and the techniques to store it in Hadoop.  Metadata is as essential as the data itself.  Metadata is data about the data (Grover et al., 2015)).  Hadoop ecosystem has various forms of metadata.   Metadata about logical dataset usually stored in a separate metadata repository include the information like the location of a data set such as directory in HDFS or HBase table name, the schema associated with the dataset, the partitioning and sorting properties of the data set, the format of the data set e.g. CSV, SequenceFile, etc. (Grover et al., 2015). The metadata about files on HDFS includes the permission and ownership of such files and the location of various blocks on data nodes, usually stored and managed by Hadoop NameNode (Grover et al., 2015).  Metadata about tables in HBase include information like table names, associated namespace, associated attributes, e.g. MAX_FILESIZE, READONLY, etc., and the names of column families, usually stored and managed by HBase (Grover et al., 2015).  Metadata about data ingest and transformation include information like which user-generated a given dataset, where the dataset came from, how long it took to generate it, and how many records there are, or the size of the data load (Grover et al., 2015).  Metadata about dataset statistics include information like the number of rows in a dataset, number of unique values in each column, a histogram of the distribution of the data, and maximum and minimum values (Grover et al., 2015).  Figure 13 summarizes this various metadata.


Figure 13.  Various Metadata in Hadoop.

Apache Hive was the first project in the Hadoop ecosystem to store, manage and leverage metadata (Antony et al., 2016; Grover et al., 2015).  Hives stores this metadata in a relational database called the Hive “metastore” (Antony et al., 2016; Grover et al., 2015).  Hive also provides a “metastore” service which interfaces with the Hive metastore database (Antony et al., 2016; Grover et al., 2015).  The query process in Hive goes to the metastore to get the metadata for the desired query, and metastore sends the metadata to Hive generating execution plan, followed by executing the job using the Hadoop cluster, which implements the job and Hive send the fetched result to the user (Antony et al., 2016; Grover et al., 2015).  Figure 14 shows the query process and the role of the metastore in Hive framework.


Figure 14.  Query Process and the Role of Metastore in Hive (Antony et al., 2016).

More projects have utilized the concept of metadata that was introduced by Hive and created a separate project called HCatalog to enable the usage of Hive metastore outside of Hive (Grover et al., 2015).  HCatalog is a part of Hive and allows other tools like Pig and MapReduce to integrate with Hive metastore.  It also opens the access to Hive metastore to other tools such as REST API via WebHCat server.  MapReduce, Pig, and standalone applications can talk directly to the metastore of Hive through its APIs, but HCatalog allows easy access through its WebHCat REST APIs, and it allows the cluster administrators to lock down access to the Hive metastore to address security concerns. Other ways to store metadata include the embedding of metadata in file paths and names.  Another technique to store metadata involves storing it in HDFS in a hidden file, e.g., .metadata.  Figure 15 shows the HCatalog as an accessibility veneer around the Hive metastore (Grover et al., 2015). 


Figure 15.  HCatalog acts an accessibility veneer around the Hive metastore (Grover et al., 2015).

Hive Metastore and HCatalog Limitations

There are some limitations for Hive metastore and HCatalog, including the problem with high availability (Grover et al., 2015).  The HA database cluster solutions to bring HA to the Hive metastore database.  For the metastore service of Hive, there is support concurrently to run multiple metastores on more than one node in the cluster.  However, concurrency issues related to data definition language operations (DDL) can occur, and Hive community is working on fixing these issues. 

The fixed schema for metadata is another limitation.  Hadoop provides much flexibility on the type of data that can be stored, mainly because of the Schema-on-Read concept. Hive metastore provides a fixed schema for the metadata itself. It provides a tabular abstraction for the data sets.   The data in metastore is moving the part in the infrastructure which requires to be running and secured as part of Hadoop infrastructure (Grover et al., 2015).

Conclusion

This project has discussed essential topics related to Hadoop technology.  It began with an overview of Hadoop providing a history of Hadoop and the difference between Hadoop 1.x and Hadoop 2.x.  The discussion involved the Big Data Analytics Process using Hadoop technology.  The process involves six significant steps starting with the problem identification, required data to be collected, and the data collection process. The pre-processing data and ETL process must be implemented before performing the analytics. The last step is the visualization of the data for decision making.  Before processing any data and before collecting any data for storage, some considerations must be taken for data preprocessing, modeling and schema design in Hadoop for better processing and better data retrieval, giving some tools cannot split the data while others can.  These considerations begin with data storage format, followed by Hadoop file types consideration and XML and JSON format challenges in Hadoop.  Compression must be considered when designing the schema for Hadoop. Since HDFS and HBase are commonly used in Hadoop for data storage, the discussion involved the consideration for the HDFS and HBase schema design considerations.  To summarize the design of the schema for Hadoop, HDFS, and HBase makes a difference in storing data in various nodes using the right tools for splitting the data.  Thus, organizations must pay attention to the process and the design requirements before storing data into Hadoop for better computational processing. 

References

Alguliyev, R., & Imamverdiyev, Y. (2014). Big data: big promises for information security. Paper presented at the Application of Information and Communication Technologies (AICT), 2014 IEEE 8th International Conference on.

Ankam, V. (2016). Big Data Analytics: Packt Publishing Ltd.

Antony, B., Boudnik, K., Adams, C., Lee, C., Shao, B., & Sasaki, K. (2016). Professional Hadoop: John Wiley & Sons.

Armstrong, D. (n.d.). R: Learning by Example: Lattice Graphics. Retrieved from https://quantoid.net/files/rbe/lattice.pdf.

Bhojwani, N., & Shah, A. P. V. (2016). A SURVEY ON HADOOP HBASE SYSTEM. Development, 3(1).

Dittrich, J., & Quiané-Ruiz, J.-A. (2012). Efficient big data processing in Hadoop MapReduce. Proceedings of the VLDB Endowment, 5(12), 2014-2015.

Grover, M., Malaska, T., Seidman, J., & Shapira, G. (2015). Hadoop Application Architectures: Designing Real-World Big Data Applications: ” O’Reilly Media, Inc.”.

Hu, H., Wen, Y., Chua, T.-S., & Li, X. (2014). Toward scalable systems for big data analytics: A technology tutorial. IEEE Access, 2, 652-687.

Karanth, S. (2014). Mastering Hadoop: Packt Publishing Ltd.

Lublinsky, B., Smith, K. T., & Yakubovich, A. (2013). Professional hadoop solutions: John Wiley & Sons.

sas.com. (2018). Hadoop – why it is and why it matters. Retrieved from https://www.sas.com/en_us/insights/big-data/hadoop.html.

Yang, C. T., Liu, J. C., Hsu, W. H., Lu, H. W., & Chu, W. C. C. (2013, 16-18 Dec. 2013). Implementation of Data Transform Method into NoSQL Database for Healthcare Data. Paper presented at the 2013 International Conference on Parallel and Distributed Computing, Applications and Technologies.