Apache Zookeeper Cluster
The Zookeeper cluster stores the metadata of the cluster. It stores the cluster configuration and also the heartbeat information of all the machines in the cluster. It also stores some meta-information about the files stored in the Hungry Hippos file system. Visit http://zookeeper.apache.org to understand its architecture.
Apache Spark Cluster
The Spark cluster acts as a computation engine on the top of the Hungry Hippos file system. The Spark driver programs can use the Spark SQL / DataFrame API to run queries on the data stored in the Hungry Hippos file system. Spark cluster runs on the Hungry Hippos file system just like the Spark Cluster runs on Hadoop Distributed file system. Visit https://spark.apache.org to understand its architecture.
Hungry Hippos Cluster
Each machine in the Hungry Hippos cluster is called a node. The Hungry Hippos cluster stores the actual data and their respective metadata. The data to be stored in the cluster is first processed and then stored in distributed sharded manner. To reduce the Disk and Network I/O while computing, the final sharded data is stored in compressed form.
A key feature in Hungry Hippos is multiway sharding. Multiway sharding is a sharding mechanism that retains sharding by each key.
For example, if your data contains all transaction details and you want to shard the data based on a geographical area, customer, and product; multiway sharding allows you to shard the same data by each of these keys and all the sharding strategies can work independently.
The usual way of doing this would be to make three different tables, each sharded by each of the keys mentioned. That way, whenever we need to query based on a particular sharding strategy, we can fire the query in the corresponding table with the specified sharding key. However, this requires us to maintain three copies of the data which leads to an increased storage cost.
In Hungry Hippos, we realized that multiple copies of the data must anyways be saved to provide resilience and availability. Availability is the ability to run queries when part of the infrastructure is not available. Resilience is the ability to retain the data even when part of the infrastructure cannot be recovered. With Hungry Hippos, we are able to shard different replication copies by different keys. When the full infrastructure is available, we use the corresponding replica with the specified sharding, otherwise different replication copies can be used to run the query.
Overview of mechanism
The set of rows of data can be partitioned only one way, which means that one copy of the rows can only be partitioned one way. Hence, we must partition different replication copies differently. This requires some details to be resolved. To ease our discussion, let’s consider only two sharding keys and a replication factor of 2.
Suppose we partition each replication copy differently – copy A is sharded by customer and copy B is sharded by product. Suppose, all rows of customer X in copy A ends up being in node 2 and all rows of product Y in copy B ends up being in the same node. If Customer X buys product Y, then both the replication copies of that row would end up in node 2. If node 2 goes down, this row is unavailable for queries. The solution? It is easy to see that the row containing the transaction of customer X buying product B needs to be stored in node 2 only once. If proper storage principles are maintained, this single copy can be used efficiently in queries that required either of the sharding keys (customer or product). The other copy can be stored in any other node to ensure availability and resilience. In case node 2 goes down, we will still be able to query this row albeit with some more processing.
Storing the replicas
With the above principle in mind, we require a storage strategy that allows us to query efficiently by each dimension. The first step of sharding is to partition the ranges of key values into some buckets. This is done either by hashing (if the key is unique) or by value (based on the frequency of occurrence). Going ahead, we will only talk about buckets and not individual keys. Each bucket is allocated to a shard, each shard represents a particular physical node holding a partition of the data.
Each sharding key will have some n buckets, numbered from 1 to n. Each row of data will be a member of a single bucket for each sharding key. This means that each row would be a member of two independent buckets, one for each sharding key. For example, if customer X is a member of shard 3 in copy A and if product Y is a member of shard 5 in copy B, then the row having the record of customer X buying product B would be a member of the shard 3 in copy A and the shard 5 in copy B. In short, we can even a sharding coordinate to this row as (3,5).
NOTE: Every row of data has a bucket coordinate indicating which bucket it belongs to according to each of the sharding keys. Because of this coordinate system, we call each sharding key a dimension.
Figure 1. Sharding Coordinates[/caption]
Figure 1 illustrates bucket coordinates. If we imagine the outermost square as the entire set of data, then the horizontal and vertical partitions can represent shards by the keys customer and product, respectively. Now, if the sharding coordinate is (3,5), one copy of this data must be stored in node 3, and another copy must be stored in node 5. This fulfills the requirement that both node 3 and node 5 has this data.
Suppose a query needs to run using the shard by customer, it expects this data to be present in node 3, which is ensured. On the other hand, if a query needs to use the sharding by product, it will look for this data in node 5, which also works. In case of a node failure, the query will not be able to process the entire data according to the shard. It will work for most of the data. For the shards that are lost, they must be processed using the other copy of the data.
If the coordinate is like (5,5), however, queries using either dimension would look for this part of the data in node 5. So, storing only one copy of the data in node 5 satisfies this requirement. The other copy must be stored in a different node to maintain resilience and availability. Things would be a little more complicated in case of more dimensions-cum-replication factor. For example, if you have a replication factor of 3, there is a case of any two of the coordinate values being the same (for example (3,3,5) , or it can be that all the coordinate values are same (for example (5,5,5)). In case of (3,3,5), both node 3 and node 5 get one copy each, and the third copy is stored in a different node. In case of (5,5,5), node 5 gets one copy and the other two copies are stored in two different nodes.
While storing the data, each small box is stored in a different file. This lets us use the file system as an index and fetch relevant parts of the data depending on the query.
Figure 1.1 illustrates data publishing. The input data from the client is uploaded in chunks (approximately of size 128MB). Each chunk is sent to one of the nodes only once.
In the nodes, the data is converted to a binary format and segregated into multiple intermediate files (Figure 1.2). Each intermediate file corresponds to a node. The node of a record is determined using the sharding table. Every record belongs to a particular bucket combination of the sharding table. Each bucket combination is assigned a unique number according to the sharding table. Every record is stored with its unique number. After processing the 128MB chunk, the segregated intermediate files are sent to the respective nodes (Figure 1.3).
The data in the intermediate files are further segregated according to the number assigned to each record and stored in part files (Figure 1.4,). Each part file is then appended to the final compressed part file (Figure 1.5). The max and min of each column in compressed part file is computed and stored in a metadata file. The metadata file also stores the size of each compressed part file. A file contains multiple number of blocks . Each block has a certain number of records. The max and min of each column in each block is also stored. If the Hungry Hippos file has multiple shard columns, the uncompressed part file data are also sent to other nodes where they are appended in their corresponding compressed part files.
Once all the input data is uploaded from client, the client sends a signal to each node to share the metadata among themselves (Figure 1.6). Once all the records have reached their respective compressed files, the column statistics are shared among all the nodes.
Figure 1.1 Sending data in chunks from client to the nodes for processing
Figure 1.2 Data is converted to binary format and segregated to be delivered to each node on the basis of first dimension of the sharding table
Figure 1.3 Segregated Data is sent to their corresponding nodes for further processing
Figure 1.4 The data from the received files from the nodes are further grouped into Part Files according to the shard columns. Metadata of each file is also computed and stored.
Figure 1.5 The data present in the part files are appended to their corresponding final compressed part files. Also the data in the part files are sent to the corresponding nodes according to the sharding logic where they are appended to their corresponding compressed part files.
Figure 1.6 All the data in the part files are appended in their corresponding compressed part files. The metadata of all the part files are shared with all the other nodes in the cluster.
Hungry Hippos Interface for Spark
The Spark Driver programs can access data stored in Hungry Hippos Cluster using the Hungry Hippos API. The Spark driver programs can read the sharding table and the metadata of a Hungry Hippos file to generate Hungry Hippos Replaceable Database Drivers (RDD). These RDDs are used by the Spark SQL API to generate Datasets. These Datasets can be used by the Spark SQL API to run queries.
The queries written in the driver program are parsed by the Spark SQL. After parsing the queries, Spark SQL generates filters and projections (a list of required columns). The filters are used by the Hungry Hippos API to discard unnecessary files from the set of file parts of the Hungry Hippos File.
One of the shard columns is chosen (choosing a suitable shard column is possible through driver program using the dimension property ) for creating a Hungry Hippos RDD. That particular shard column decides which part files are assigned to which Partition. Column statistics of all the files are checked and the names of the eligible part files are selected.
An empty partition instance is created and a bucket of the shard column is chosen. The eligible part files corresponding to the bucket are selected. Each time part file name is added to a partition, its corresponding size is added to the partition. Along with each part file name, its corresponding location information is also added. If adding part file name to a partition tries to exceed the size of partition beyond 128MB, the existing partition is added to a partition list and a new empty partition is created. The size of each partition is approximately 128MB or more (when size of a part file is larger than 128MB). This procedure is done for all the buckets of the shard column.
Maximum 3 preferred node locations are assigned to each partitions according to the presence of the file parts in those nodes. A node for a partition is most likely preferred if the amount of data in that node for that partition is more than other nodes.
The Spark Application uses the Hungry Hippos Iterator to read the data from Hungry Hippos partitions. The Hungry Hippos partition, list of filters and list of required columns are passed to the Hungry Hippos Iterator. Before reading each partition in the node, the iterator checks whether all the compressed part files in the partitions are present locally or not. The missing part files are downloaded using their location information.
Before reading each compressed part file, the column statistics of all the blocks for the part file are checked using the filters to find the blocks’ eligibility. The required columns of the records from eligible blocks are then read from each partition by the Iterator to create a row for Spark Application.
NOTE: The Spark Driver program accesses sharding table and column statistics data from the node and must not be run from outside the cluster.
Figure 2.1 The driver program reads the metadata and the sharding table data to generate a Hungry Hippos RDD having Hungry Hippos Partitions. These RDDs can be used for running Spark SQL Queries.
Figure 2.2 Information in each Hungry Hippos partition
Figure 2.3 Information in the Hungry Hippos RDD
Figure 2.4 Reading data from Hungry Hippos and providing it to the Spark application