Paper of the week: HaLoop
Efficient Iterative Data Processing on Large Clusters
In this post I'm presenting HaLoop, yet another modified version of Hadoop, as the name reveals. HaLoop aims to overcome those limitations of Hadoop MapReduce, which make iterative applications inefficient and hard to develop.
The typical way of writing an iterative application in Hadoop MapReduce, is developing one MapReduce job for each iteration and also writing a driver program to manage the jobs. There are several disadvantages in this approach, including:
- manual orchestration of the jobs
- no explicit way to define a termination condition. For simplicity, many developers specify the number of iterations beforehand. However, it is often very difficult to predict how many iterations would be enough in order to achieve a solution accurate enough, especially when the termination condition is some kind of convergence criterion.
- re-loading and re-processing of data that remain invariant across iterations
HaLoop extends Hadoop's API to support loops and easy expression of termination conditions. It also proposes caching and indexing mechanisms and novel loop-aware scheduler which can exploit cached data.
System Overview
As seen in the figure below, HaLoop reuses most of Hadoop's master-slaves architecture, including HDFS as its distributed file system. HaLoop adds a control module to the master node, which is responsible for launching new jobs as iterations and also check the termination condition, if specified. The task scheduler and task tracker are modified in order to become loop-aware, while an indexing and a caching module were added.
Programming Model
HaLoop's API was built to support recursive programs of the following general form:
Ri+1 = R0 U (Ri ⋈ L)
where, R0 contains initial values and L is invariant.
The API provides ways to specify a termination condition. A loop can finish when either the maximum number of iterations has been reached or the difference between the result of two consecutive iterations is below some threshold. HaLoop does not specify any high-level language for writing applications. A developer needs to specify the loop body, consisting of one or more map-reduce pairs. Optionally, they can also define a termination condition and loop-invariant data.
Scheduling
HaLoop's scheduler is built to send to the same physical machines tasks that access the same data on different iterations. This is what the authors call inter-iteration locality. Assuming we have a problem of the general relation above, the mappers that were assigned partitions of the L dataset only need to compute their results once. Their results also need to be sent to the corresponding reducers only after the first iteration. The only requirement for the scheduler to work is that the number of reducer tasks needs to remain constant throughout the program execution.
Caching and Indexing
HaLoop's caching and indexing mechanisms are built in three levels:
- Reducer Input Cache
If a reducer receives data from a table that has been defined to be loop-invariant and its input cache is enabled, it will store this data in its local disk for future use and create an index on it. In a subsequent iteration, when the reducer will receive tuples that need to be sent to the user-specified reduce function, it will also search its cache for values corresponding to the same key. Both cached and new tuples will be passed to the reduce function.
- Reducer Output Cache
The reducer output cache stores the latest local result of the computation on this node. The aim of this cache is to reduce the costs of computing termination conditions. When all reducers have their local result cached, they can send them over to the master node, which can decide if the termination condition is fulfilled.
- Mapper Input Cache
The goal of the mapper input cache is to avoid as many non-local reads as possible during non-initial iterations. If during the first iteration a mapper reads a remote split, it will cache it for possible future use.
Evaluation
PageRank, Descendant Query and K-means are the applications used for evaluation and compared against Hadoop implementations. Each cache type was evaluated separately. Evaluation shows that the Reducer Input Cache significantly reduces the the I/O of the MapReduce shuffling phase. This is very important as for a lot of applications, the most time-consuming step is communicating data from mappers to reducers. The Reducer Output Cache is also beneficial and reduces the termination condition calculation cost up to 40%. The Mapper Input Cache only marginally benefits execution.
Links
Happy coding and reading,
V.


