Implement Column Storage with esProc on Hadoop

The column storage is good, especially when there are lots of tabular fields (this is quite common). In querying, the data to traverse is far less than that on the row storage. Less data to traverse brings less I/O workloads and higher query speed. However, the Hadoop application consumes most time on the hard disk I/O.

Both Hive and Impala support column storage, but are only available with the basic SQL interface. As for some more complex computations, it seems quite difficult for the MapReduce framework. But, it is much easier to implement with esProc.

The sample data is a big data file sales.txt on the HDFS. This file has twenty fields, 100 million data entries, and the file size is 14G. Let’s take the GROUP computation for example to demonstrate the whole procedure. Concretely speaking, it is to summarize the total sales for everyone by the sales person. The algorithm involves two fields, empID and amount.

Firstly, compare it with the classic code for Grouping and Summarizing on Column Storage:

Code for summary machine:

esProc_column_storage_hadoop_1    

Code for node machine:

esProc_column_storage_hadoop_2

The above algorithm follows such train of thought: Decompose a large task into forty smaller tasks according to the file bytes; distribute them to the node machine for the initial summarization; and then further secondary summarize on the summary machine. This train of thoughts is similar to that for MapReduce. The difference is that this thinking pattern is simple and intuitive because the task scale could be customized, and most people can understand it easily.

As can be seen, the row storage is to store all fields in the file in the form of fields. Therefore, no matter there are two or twenty fields, we still have to traverse all data – the file of 14 G. However, the column storage is not like this. The data of each field are stored as a file. If only two fields are stored in the query, then you only need to retrieve the corresponding file of these two fields, i.e. 1.4 G. Please note that the 1.4 G is an average value. The total volume of the data of the two fields is slightly higher than 1.4 G.

As mentioned above, to implement the column storage, we must decompose the file field by field first, by using f.cursor() to retrieve first and then f.export() to export. The detailed procedure will be explained at the end of this article.

Code for Grouping and Summarizing once Column Storage is adopted:

Code for summary machine:

esProc_column_storage_hadoop_3

Code for node machine:

esProc_column_storage_hadoop_4

As can be seen, the greatest change is the code for node machine. The A3 cell uses the [file list]. Use the cursor() to merge a group of field files to generate a cursor of 2 dimension table. The remaining codes for grouping and summarizing are just the same as before.

Another change is about the task-decomposing. In fact, great trouble can be caused in this stage. As we know, in the row storage, the task-decomposing can be performed based on the number of byes. For example, a 14 G file can be divided into 40 segments evenly, and each segment will be assigned to a node machine as a task. But this method does not work in the column storage because of the record misalignment, and the result can be always wrong. The right method is to divide evenly by the number of records (Any suggestions on dividing it evenly, please leave a comment. Thanks.). 100 million entries can be allocated to 40 segments, and each segment will hold 2’500’000 entries.

For example, the empID column will be ultimately divided into 40 smaller files: empID1. dat”,”empID2. dat”……”empID40. dat”. Then, the algorithm codes are shown below:

esProc_column_storage_hadoop_5

In order to prevent the memory overflow, the above algorithm will retrieve 100’000 entries from cursor, and append to the file. By doing this, a new file will be created with every 2’500’000 entries of data. In which, A3 is the file count, C3 is used to monitor if the data entries in the current files reaches 2.5 M.

The data-decomposing is surprisingly troublesome, but the column computation after splitting is quite simple.

        

Advertisements

About datathinker

a technical consultant on Database performance optimization, Database storage expansion, Off-database computation. personal blog at: datakeywrod, website: raqsoft
This entry was posted in Big Data and tagged , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s