Paper of the week: HAIL

Only Aggressive Elephants are Fast Elephants

The main character of this post's paper is poor Bob, an analyst, and the story goes about his adventures with elephants, i.e. Hadoop MapReduce. Bob's queries often reveal interesting information or anomalies that need to be further examined, spawning new queries and therefore triggering new MapReduce jobs. In such a scenario, the problem stems from slow query runtimes, due to lack of proper schemas and data indexing. Poor Bob ends up drinking too much coffee waiting for his queries to finish, while he also has to deal with his angry boss! The paper proposes inexpensive index creation on Hadoop data attributes, in order to reduce execution times in exploratory use-cases of MapReduce.

The main idea comes from the observation that HDFS keeps three replicas of each block by default. Why not keep each one of these copies in a different sort order and with a different index? If it happens that our query filters data by an attribute for which there exists an index, we will gain great speedup!

As usual, it's not as easy as it sounds. How can indexes be created during upload time, so that no additional scan is required? How does HDFS need to be modified in order to distinguish the different replicas? How can MapReduce applications exploit the sort orders and indexes effectively? And how can we make the scheduler aware?

HAIL (Hadoop Aggressive Indexing Library) answers all these questions.

 

System Overview

HAIL is a Hadoop modification which keeps each physical replica of an HDFS block in a different sort order and creates clustered indexes on HDFS blocks when uploading the data. The HAIL client converts each HDFS block to binary PAX format and sends it to three datanodes. Then, each node sorts the data contained in the block using a different sort order. Sorting and indexing happens in main memory. Each datanode creates a different clustered index for each data block and stores it with the sorted data. The process is shown below:

Hail_architecture

 

Upload Pipeline

HAIL modifies the upload pipeline of HDFS to support index creation and sorting. One major difference is how the HAIL client parses each file into rows. In HDFS, each block is defined by a constant number of bytes, while HAIL parses the file based on content, never splitting a row between two blocks. After the blocks have been defined, each one of them is converted to a binary PAX representation.

HAIL also modifies the checksum mechanism of HDFS. In HDFS, only the datanode receiving the last replica verifies the checksums and then acknowledges the packets back to the other datanodes. However, in HAIL, no checksum or data is flushed to disk, as sorting is required first. A typical size of a block is 64MB, which allows HAIL to perform several block sortings and index creations completely in memory. After sorting, indexing and metadata creation, each datanode computes their own checksums and data is flushed to disk.

At this point, I have to make clear that there is no impact on the fault-tolerance properties of HDFS. Data is only reorganized inside the same block, therefore in case of failure, no information is lost except from the index of that particular replica.

 

Query Pipeline

From the application perspective, a MapReduce program requires only a few changes in order to use HAIL. Bob needs to annotate his map function and specify a selection predicate and the projected attributes. In other words, he needs to specify the values to filter the input records and the fields he wants the query to return as result. This information will help the runtime schedule the tasks on nodes containing the appropriate indexes. This annotation simplifies the map function, where Bob does not have to include the filtering anymore.

From HAIL's perspective, the record splitting policy needs to be modified, as each replica is now of different size, due to indexes. HAIL groups several data blocks inside one input split, resulting in less map tasks. If the job requires a full scan or no relevant index exists, HAIL falls back to standard Hadoop mechanisms. 

 

Evaluation

Extensive evaluation is provided in the paper. The system is compared to Hadoop and Hadoop++ (previous work of the same group). Experiments are performed to test upload performance, impact of index creation, scaling, query performance and new splitting mechanism impact. Six different clusters are used and two different datasets. Applications include queries with varying degree of selectivities.

The results show that HAIL matches or often outperforms Hadoop in data uploading, even though it has to sort blocks and create indexes and demonstrate the efficiency of PAX representation. Query execution times significantly decrease when appropriate indexes are present and having less map tasks reduces end-to-end job runtimes. However, in the case of a failure of a map task, more data needs to be re-processed, slowing down recovery.

 

Links

 

Happy reading and coding,

V.

Paper of the week: Themis

An I/O Efficient MapReduce

This week's paper is named after a Titaness of the Greek Mythology, Themis, protector of order and law. However, in our context, Themis is a MapReduce-inspired system, whose main goal is to minimize I/O operations.

It is true that many MapReduce jobs are I/O-bound and it would certainly benefit performance if we could find a mechanism to limit I/O operations. At this point, I need to clarify that Themis aims to reduce disk I/O, i.e. find a way to process records in memory and spil records to disk as rarely as possible.

In order to achieve this goal, Themis needs to make substantially different design choices from common MapReduce systems. The three most important features that enable disk I/O elimination are:

1. Relaxation of fault-tolerance guarantees

Classical MapReduce implementations offer fault-tolerance at the task level, i.e. if a node fails, the tasks that were running on this node will be re-scheduled and executed on another node. The rest of the tasks, running or completed, are unaffected. This model allows for fast recovery and has satisfactory results in very large clusters where failures are common. However, the creators of Themis argue that clusters with thousands of machines are not that common in the real world. Surely Google, Facebook, Yahoo! and a couple others are running MapReduce jobs on clusters of such scale, however, the majority of Hadoop adaptors have much smaller deployments. Making the assumption of a small to mid-size cluster, Themis offers fault-tolerance at the job level only, i.e. if a node fails during execution, the whole job will need to be re-executed. This decision greatly simplifies implementation and allows for data pipelining and time-consuming checkpoint elimination.

2. Dynamic memory management

In order to minimize disk operations, Themis aims to process a record entirely as soon as it is read from disk. To achieve this, the system needs a mechanism to ensure that there will be enough memory throughout the duration of the record processing. Themis implements a dynamic memory manager with the following pluggable policies:

  • Pool-based management

This policy views the available memory as a pool of fixed-sized pre-allocated buffers. The application and reserve a buffer from the pool and return it when it is not needed anymore. If a worker requests a block from a pool which is empty, it will block until a buffer is returned from another worker. The obvious advantage of this policy is its simplicity by performing all memory allocation and pool setup at startup. On the other hand, this static partitioning limits the maximum record size supported and sacrifices flexibility.

  • Quota-based management

The second policy is design to better handle varying-sized records. It controls the flow of data among computational stages, so that stages receiving input do not get overwhelmed. To achieve this, the manager uses queues between stages. A quota is the amount of memory that is allocated between a producer and a consumer stage. If the producer is about to exceed its allowed quota, it is stalled until the corresponding consumer has processed enough data. 

  • Constraint-based management

The third and more sophisticated policy dynamically adjusts memory allocation based on worker requests and currently available memory. The manager monitors memory usage by each worker and accepts requests for memory allocation. If enough memory is available, the request is satisfied. Worker requests are prioritized based on their position on the execution graph. This policy adds a significant overhead to the execution time and performs well when the number of allocation requests is relatively small.

3. Per-node disk-I/O management

A disk scheduler, running on every Themis node, manages the way data is grouped and written to disk. The scheduler  organizes data in large batches in order to reduce disk accesses.

 

System Overview

Like many other systems, Themis is implemented as a dataflow graph consisting of stages which are grouped in phases and executed in parallel by workers

Phase Zero samples input data in a distributed fashion and extracts information about the distribution of records and keys. This phase is necessary in order to ensure that partitions in each of the following stages will be small enough to be processed and sorted in memory. Figuring out the distribution of input data is fairly simple, however, intermediate data also need to be considered. Themis applies the map function to a subset of the input data in order to generate a subset of intermediate data, which can be used to approximate the intermediate distribution. Both range and hash partitioning mechanisms are supported.

Phase One implements the mapping and shuffling, but it is split down to more fine-grained steps, shown in the diagram below:

 

Themis_phase1

 

The Reader, Mapper and Sender are the "producer-side" of the phase and their functionality is as simple as their names suggest. The rest of the stages are the "consumer-side" of the phase and it is the place where the disk-scheduler, described in the previous section, comes into the picture.

Phase Two consists of the sorting and the reduce phase. Each partition is sorted by key, always keeping the result in memory. The stages of phase two are shown below:

Themis_phase2

 

Evaluation

Themis is evaluated using several applications, like sorting, WordCount, PageRank and CloudBurst. Both synthetic and real data are used for evaluation. Experiments are run on a fairly small, 20-node cluster, or even on a single node if data is small enough. Experiments test disk performance, memory management policies and compare Themis with Hadoop. Comparison with Hadoop shows that I/O bound applications can greatly benefit from Themis' design. However, Themis does not use a distributed file system or any kind of replication and it is not clear whether those factors were taken into account when taking measurements. 

 

Thoughts

In my opinion, Themis is a quite immature system with some important limitations that need to be solved. However, it clearly shows that Hadoop is not the best fit for all cluster-sizes and types and that many deployments could be highly benefited by alternative designs.

 

Happy coding and reading,

V.

Paper of the week: HaLoop

Efficient Iterative Data Processing on Large Clusters

In this post I'm presenting HaLoop, yet another modified version of Hadoop, as the name reveals. HaLoop aims to overcome those limitations of Hadoop MapReduce, which make iterative applications inefficient and hard to develop. 

The typical way of writing an iterative application in Hadoop MapReduce, is developing one MapReduce job for each iteration and also writing a driver program to manage the jobs. There are several disadvantages in this approach, including:

  • manual orchestration of the jobs
  • no explicit way to define a termination condition. For simplicity, many developers specify the number of iterations beforehand. However, it is often very difficult to predict how many iterations would be enough in order to achieve a solution accurate enough, especially when the termination condition is some kind of convergence criterion.
  • re-loading and re-processing of data that remain invariant across iterations

HaLoop extends Hadoop's API to support loops and easy expression of termination conditions. It also proposes caching and indexing mechanisms and novel loop-aware scheduler which can exploit cached data.

 

System Overview

As seen in the figure below, HaLoop reuses most of Hadoop's master-slaves architecture, including HDFS as its distributed file system. HaLoop adds a control module to the master node, which is responsible for launching new jobs as iterations and also check the termination condition, if specified. The task scheduler and task tracker are modified in order to become loop-aware, while an indexing and a caching module were added.

Haloop

Programming Model

HaLoop's API was built to support recursive programs of the following general form:

Ri+1 = R0 U (Ri L)

where, R0 contains initial values and L is invariant.

The API provides ways to specify a termination condition. A loop can finish when either the maximum number of iterations has been reached or the difference between the result of two consecutive iterations is below some threshold. HaLoop does not specify any high-level language for writing applications. A developer needs to specify the loop body, consisting of one or more map-reduce pairs. Optionally, they can also define a termination condition and loop-invariant data.

 

Scheduling

HaLoop's scheduler is built to send to the same physical machines tasks that access the same data on different iterations. This is what the authors call inter-iteration locality. Assuming we have a problem of the general relation above, the mappers that were assigned partitions of the L dataset only need to compute their results once. Their results also need to be sent to the corresponding reducers only after the first iteration. The only requirement for the scheduler to work is that the number of reducer tasks needs to remain constant throughout the program execution.

 

Caching and Indexing

HaLoop's caching and indexing mechanisms are built in three levels:

  • Reducer Input Cache

If a reducer receives data from a table that has been defined to be loop-invariant and its input cache is enabled, it will store this data in its local disk for future use and create an index on it. In a subsequent iteration, when the reducer will receive tuples that need to be sent to the user-specified reduce function, it will also search its cache for values corresponding to the same key. Both cached and new tuples will be passed to the reduce function.

  • Reducer Output Cache

The reducer output cache stores the latest local result of the computation on this node. The aim of this cache is to reduce the costs of computing termination conditions. When all reducers have their local result cached, they can send them over to the master node, which can decide if the termination condition is fulfilled.

  • Mapper Input Cache

The goal of the mapper input cache is to avoid as many non-local reads as possible during non-initial iterations. If during the first iteration a mapper reads a remote split, it will cache it for possible future use.

 

Evaluation

PageRank, Descendant Query and K-means are the applications used for evaluation and compared against Hadoop implementations. Each cache type was evaluated separately. Evaluation shows that the Reducer Input Cache significantly reduces the the I/O of the MapReduce shuffling phase. This is very important as for a lot of applications, the most time-consuming step is communicating data from mappers to reducers. The Reducer Output Cache is also beneficial and reduces the termination condition calculation cost up to 40%. The Mapper Input Cache only marginally benefits execution.

 

Links

 

Happy coding and reading,

V.

Sector and Sphere

design and implementation of a high-performance data cloud

This week's post discusses Sector and Sphere, two systems built at the University of Illinois, which together provide a framework for executing data-intensive applications in the cloud.

The vision of the project is to provide a computing cloud which can be geographically distributed. The authors build on the assumption that available clusters are connected by high-speed networks and that data inputs and query outputs are relatively large.

 

Sector

Sector provides the storage layer of the system. It assumes a large number of well-connected nodes, located in the same or different datacenters. Datasets are divided into Sector slices, which are also the units of replication in the system. An overview of the Sector architecture is shown below:

Sector

Sector is one of the very few large-scale data storage services which mentions security issues. Security is provided in the system by an external dedicated security server, which maintains user credentials, file access information and IP addresses of authorized nodes. The actual service is provided by a master node and a set of slave nodes. The master stores metadata, monitors the slaves and handles client requests. Client connect to the master over SSL. The master then contacts the security server in order to verify the user's credentials and obtain their access privileges. When a client requests to read a file, the master will check permissions and elect a slave node to serve the request. Communication between clients and slaves, as well as among slaves, is always coordinated by the master node.

Sector relies on the local native file system of the node to store files. Each Sector slice is stored as a separate file in the native file system and is not further split (e.g. in blocks like in Hadoop). apart from metadata, the master also stores information about the current topology and available resources of the slaves. The master also periodically checks the number of existing copies of each file in the system. If this is under the specified replication factor, it chooses a slave node to create a copy.

Sector uses UDP (with a reliable message-passing library on top) for messages and UDT for data transfers. 

 

Sphere

Sphere provides the computing layer of the system. It transparently manages data movements, message-passing, scheduling and fault-tolerance.

Sphere's computing paradigm is based on stream-processing. In this context, each slave corresponds to an ALU in a GPU or a core in a CPU. A Sphere program takes a stream as input and produces one or more streams as output. Each stream is divided into segments, which are assigned to different Sphere Processing Engines (SPEs) for execution. The computing paradigm is shown in the following diagram:

Sphere

One could easily implement MapReduce-like computations in Sphere. When writing a Sphere UDF, one can specify a bucket ID for each record of the output. This way, one can imitate the shuffling phase of MapReduce.

When a client sends a request for data-processing to the master, it receives a list of slaves available for computation. The client selects the nodes it needs and start an SPE on each of them. While the SPE is running, it periodically reports its progress back to the client. 

The Sphere client is responsible for splitting the input into segments and uniformly distributing it to the available SPEs. Data locality and data access concurrency are the main goals of the scheduling rules. In order to detect failures, the Sphere client monitors and the running SPEs and uses timeouts to regard them as failed. There exists no check-pointing mechanism, therefore an SPE that failed will have to be re-scheduled and re-computed completely. Sphere also uses a technique very similar to MapReduce in order to deal with stragglers.

 

Thoughts

Overall, the design of Sector and Sphere is straight-forward and a number of decisions seemed to have been taken in order to avoid implementation complexity. Sector performs best when dealing with a small number of relatively large files. The programming model is general and more flexible than MapReduce, lower-level though. The Terasort benchmark is used for evaluation and there is a comparison with Hadoop/MR, although not extensive enough.

 

Links

 

Happy reading and coding,

V.

Paper of the week: Incoop

MapReduce for Incremental Computations

This week I am writing about Incoop, a system developed to support incremental computations. Incoop transparently extends Hadoop MapReduce, i.e. existing applications can be executed on it without changing any code. 

Incoop's motivation comes from the observation that a large amount of MapReduce jobs need to be run repeatedly with slightly different (most often augmented) input. This obviously leads to redundant computations and inefficiencies. In order to overcome this problem, one has to specially design their MapReduce application to handle this case by storing and using state across multiple runs. Incoop's goal is to handle such cases without demanding any extra effort from the programmer.

Incoop extends Hadoop to support incremental computations by making three important modifications:

  • Inc-HDFS: A modified HDFS which splits data depending on file contents instead of size
  • Contraction Phase: An additional computation phase added before the Reduce phase, used to control task granularity
  • Memoization-aware Scheduler: An improved scheduler which takes into account data locality of previously computed results while also using a work-stealing algorithm.

 

System Overview

The main goal of Incoop is supporting incremental computations while offering transparency and efficiency. Transparency means being backwards compatible, which puts a limitation on the modifications one can make on the initial MapReduce model. On the other hand, efficiency is mainly determined by the granularity of the elements which can be reused.

To achieve these goals, Incoop modified HDFS and the MapReduce Scheduler and also added a memoization server in the architecture. The memoization server is a simply stores a mapping from the input of a computation to its output. Whenever a task runs, it queries the server to find out if the result it is going to produce already exists. A high-level view of the system is shown below:

Incoop_arch

 

Inc-HDFS

Incremental HDFS mainly differs from original HDFs in the way that data partitioned into chunks. In original HDFS, data is divided into fixed-sized blocks and these blocks are also the units of computation for a map task. This design is not suitable for incremental computations, as the slightest change in the input could affect all subsequent blocks. One could propose computing differences between the entire input files, but this choices would be too expensive in our case.

What Inc-HDFS does is dividing data into chunks based on content instead of size. The idea comes from the data deduplication area and uses fingerprints to match patterns and mark chunk boundaries. With this approach, a change in the input will most probably only affect the specific chunk of data, while the rest can be reused.

 

Incremental MapReduce

In the computation part, map task results are persistently stored and references are created which are inserted into the memoization server. If the running map task is part of an incremental computation, it first queries the memoization server and retrieves any result that already exists.

Making the reduce phase support incremental computations is a bit more complicated that in the map case. That is because we do not have control over the input to the reducers, as this is defined directly by the mappers' output. For this reason, Icoop adds an additional phase to the model, right before the reduce phase. This Contraction phase leverages the idea of Combiners to "break" the reduce task into a tree-hierarchy of smaller tasks. The process is run recursively until the last level where the reduce function is applied. In order to result into a data partitioning suitable for reuse, content-based partitioned is again performed on every level of Combiners.

 

Scheduler

The default MapReduce scheduler assigns tasks to nodes based on data locality and availability of execution slots. Data locality is a desirable factor in the case of Incoop as well, but memoized data need to be taken into account. The memoization-aware scheduler tries to schedule tasks on the nodes that contain data which can be reused. However, this approach might create load imbalance, in case some data is very popular, and lead to straggler tasks. To avoid this situation, the scheduler implements a simple work-stealing algorithm. When a node runs out of work, the scheduler will locate the node with the largest task queue and delegate a task to the idle node.

 

Thoughts

Incoop's design decisions can yield a lot of discussion on several levels and a thorough evaluation and profiling is needed to measure overheads and locate bottlenecks. For example, the content-based partitioning in inc-HDFS could possibly lead to overloaded map tasks. Moreover, performance gains are questionable in the case where even one map task will need to be re-executed. This is why the reduce phase cannot be initiated before all map tasks have finished, so the speedup inevitably depends on how slow that map task will be. 

The paper contains an extensive evaluation section which I will not describe here. The authors use five applications, both data and computation intensive, and try to answer the above questions and study the tradeoffs of their approach.

 

Happy reading and coding,

V.

Paper of the week: Nectar

Automatic Management of Data and Computation in Datacenters

This week's paper, Nectar, comes from Microsoft Research and provides a way to encounter some very fundamental problems that occur in modern datacenters. Nectar is fundamentally built to interact with Dryad and LINQ...

Wait! Don't stop reading!

I know you know that Microsoft dropped Dryad a while ago and that they decided on admitting Hadoop's victory, blah blah blah. But trust me, the ideas in this paper are beyond the specific platform and it's worth reading nevertheless :)

 

Motivation

Nectar basically deals with data management in large datacenters. Manual data management can cause a lot of pain, including data loss and waste of resources due to careless or oblivious users. Especially in big data scenarios, where user applications generate large amounts of output data, it is common that this data is only accessed once and then the user forgets to delete it. Consequently, large parts of the datacenter's storage is occupied by data that is never or rarely accessed.

 

System Overview

Nectar automates data management by associating the data with the computation which created it. Datasets can be uniquely identified by the computations that produced them. All successfully completed programs run on the datacenter are saved in a special store and can be used in order to re-create deleted data. In this way, data and computations can be used interchangeably: High-value data is cached and can be used instead of computation when possible and low-value data is garbage-collected and can be retrieved in case it is needed, by re-running the computation that produced it.

Nectar's input is a LINQ program. Like many of the high-level analysis languages, LINQ's operators are functional, i.e. they accept a dataset as input, do a transformation on it and output a different dataset. This property allows Nectar to create the association between datasets and programs. 

Datasets are classified in Nectar as primary or derived. Primary datasets are created once and are not garbage-collected. Derived datasets are created from primary or other derived datasets. A mapping is created between them and the program that produced them and they can be automatically deleted and re-created. In a scenario where we want to analyze website click logs, the raw click logs files would be primary, while all datasets produced by analysis on the click logs, e.g. most popular URLs, would be derived.

The Nectar architecture consists of a client-side program rewriter, a cache server, a garbage collection service and a datastore on the underlying distributed file system, TidyFS. When a DryadLINQ program is submitted for execution, it first goes through a rewriting step. The rewriter consults the cache and tries to transform the submitted program into a more efficient one. The rewriter uses a cost estimator to choose the best alternative. The rewriter also decided which intermediate datasets to cache and always creates a cache entry for the final output of a computation. Rewriting can be useful in two scenarios: (a) sub-expression replacement by cached datasets and (b) incremental computations where cached data can be reused and save computation time. After the rewriting, the optimized program is compiled into a Dryad job and executed.

The cache service is distributed and datacenter-wide. It serves lookup requests and in the case of a miss, it has a mechanism of backtracking all necessary computations in order to recreate the requested dataset. It also implements a cache replacement policy, keeping track of the least frequently accessed datasets.

The garbage collector runs in the background without interfering with program execution or other datacenter jobs. It identifies datasets that are not referenced by any cache entry and deletes them. 

 

Evaluation Results

Nectar is deployed and evaluated on an internal 240-node cluster of Microsoft Research and was also evaluated using execution logs from 25 other production clusters. It was discovered that after its deployment, it could eliminate up to 7000 hours of daily computation.

Overall, we could summarize Nectar's advantages as following:

  • Space savings by caching, garbage collection and on-demand re-creation of datasets
  • Execution time savings by sub-computation sharing and support for incremental computations
  • Easy content management and dataset retrieval

Implementation details and extensive evaluation can be found in the papers.

 

Thoughts

Personally, I love caching. I believe it's a brilliant idea and I think we should use it more often, wherever applicable. And especially in big data systems. This recent (and enlightening) study on MapReduce workloads clearly shows that we can only benefit from caching.

 

Happy coding and reading,

V.

 

Paper of the week: SkewTune

Mitigating Skew in MapReduce Applications

This week's paper is about SkewTune, a Hadoop extension built at the University of Washington, which addresses the problem of skew in MapReduce applications.

In general, skew refers to the case when task runtimes vary in a high degree, i.e. one or more of a set of tasks running in parallel take much longer to finish than the rest. Skew can appear in a parallel execution due to several reasons, including computational load imbalance, characteristics of the algorithm or the specific dataset or degraded performance of single nodes. In a parallel system, execution time is limited by the speed of the slowest task, therefore skew problems have always been a headache and have been extensively researched in ceratin domains, such as parallel databases. In the MapReduce context, the straggler problem was immediately identified and solved by its creators, using speculative execution. More specifically, when a task execution is detected to take longer than expected, a copy of it is scheduled on a different node and the result of the fastest node is reported. This is an approach that works well on a cluster with thousands of nodes where failures are common or hard to be distinguished from degraded performance, but suffers from the fact that work already done must be repeated.

SkewTune addresses skew on a different level and originating from different sources, specifically uneven distribution of input data and skew caused due to some parts of the input requiring longer processing times than others.

 

Types of Skew in MapReduce

  • Map phase: Expensive Record

One would not expect skew in map phases to exist, since in MapReduce applications data is equally divided among mappers, i.e. each map task processes roughly the same amount of data (in bytes). However, there exist cases where some records may require significantly more CPU or memory. The reason for this could be that the value of the key-value pair is simply larger or that the algorithm's runtime depends on the value itself. The PageRank algorithm is an example where this type of skew can be encountered, since its runtime depends on the out-degree of each vertex of the graph being processed.

  • Map phase: Heterogeneous Map

Map and reduce functions typically accept one input. Since, this is an important limitation in expressing several algorithms, MapReduce developers often emulate n-ary operations by concatenating multiple input datasets and adding special tags to the records, in order to distinguish their source. Although this technique has proved to be very useful, it can also become an important source of skew, as mappers will be assigned different record types which may require different processing.

  • Reduce phase: Partitioning skew

In the reduce phase, skew causes are more obvious and more common.   The first type occurs when the partitioning function used does not succeed in fairly distributing the load. It is a common case in many applications that a small set of keys are much more popular than others, i.e. often visited URLs, often used words in text etc.

  • Reduce phase: Expensive Key Group

This is the case analogous to the Expensive Record in the map phase.

For a more detailed description of skew types and their causes, I suggest you to read this short study.

 

SkewTune Overview

 SkewTune is built as a Hadoop extension and aims to be transparent to the user, so that no existing code needs to be modified and no guarantees of the MapReduce model are violated. SkewTune has mechanisms to detect stragglers and mitigate skew by repartitioning its remaining unprocessed input data. 

In order to decide when a task should be treated as a straggler, while avoiding unnecessary overhead and false-positives, SkewTune is using Late Skew Detection. Simply put, SkewTune delays any skew mitigation as long as all slots in the cluster are busy. Only when  a resource is detected to be idle, will SkewTune trigger straggler detection. When this moment comes, the system will have to decide which task to identify as a straggler, if any. The team's experience showed that skew mitigation is only beneficial when considering one straggler task at a time. SkewTune selects the task with the highest remaining time estimate and only initiates repartitioning if half of this time estimate is greater than the repartitioning overhead.

Skew mitigation happens in three steps. First, the identified as straggler task is stopped. Next, the remaining input data needs to be scanned in order to collect information about them. SkewTune collects a compressed summary of the remaining data, which takes the form of a series of approximately equal key intervals. Depending on the size of the remaining data, SkewTune may decide to scan the data locally or in parallel. Finally, the co-ordinator of the system plans the re-partitioning strategy and schedules the additional tasks.

In Hadoop, skew mitigation is implemented by SkewTune as a separate MapReduce job for each parallel data scan and for each mitigation. When repartitioning a map task, a map-only job is executed and the job tracker broadcasts all information about the mitigated map to all the reducers in the system. When repartitioning a reduce task, due to the MapReduce static pipeline inflexibility, an identity map phase needs to be run before the actual additional reduce task.

 

Performance

The paper contains an extensive performance evaluation based on popular applications, such as PageRank, Inverted Index and CloudBurst. It is shown that SkewTune successfully manages to avoid delays due to skew and re-balances the load in the cluster. Most importantly, it makes no assumptions about the cause of the skew and it is also effective in case of job misconfigurations. According to the published results, it can result in applications running up to 4 times faster in the presence of skew, while imposing negligible overhead otherwise.

 

Links

 

Happy reading and coding,

V.

Paper of the week: Spark Streaming

Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters

This week's paper comes from Berkeley and the creators of Spark, a Map-Reduce-like cluster computing framework, optimized for iterative jobs. In order to deal with real-time big data, they extended Spark and built a new programming model, discretized streams (D-Streams) and Spark Streaming.

A lot of applications that need to deal with "big data", also need to deal with them at the moment they arrive, i.e. data is mostly valuable when they are received. Identifying trending topics in Twitter or processing server log files in real time to detect a failure as fast as possible are a only a couple of examples.

Apart from enabling real-time big data processing, D-Streams also aim to provide solutions to three of the most important challenges of streaming processing systems: fault-tolerance, consistency and easy integration with batch processing. The system is built on the idea of treating stream computation as a series of deterministic batch computations on small time intervals. This approach directly benefits from the advantages of batch processing and also implies that all existing Spark operations can be reused in Spark Streaming, too. However, in order to minimize latency, intermediate state is kept in memory and not in disk like in batch systems. D-Streams also borrow from Spark the idea of RDDs (Resilient Distributed Datasets), a storage abstraction that can be used to rebuild lost data without using replication. This idea is extended in Spark Streaming to support parallel recovery in case of failures.

 

D-Streams

During an interval, all input data received is stored in the cluster and forms the input dataset for this particular interval. When the interval completes, data can be processed using common operations like map and reduce. Results and intermediate state are stored using RDDs. A D-Stream is a group of such RDDs. _


Example

pageViews = readStream("http://...", "1s")
ones = pageViews.map(event => (event.url, 1)
counts = ones.runningReduce((a, b) => a + b)

In this example, we group the page views data into D-Streams of 1 sec intervals. Each dataset for each interval is then transformed into a (URL, 1) set of tuples and feeds a running count.

 

Operators

D-Streams provide two types of operators. Transformation operators produce a new D-Stream when applied to a stream. Stateless transformation operators act independently on each interval, while stateful operators can operate on multiple intervals and may produce intermediate RDDs as a result. Aggregation over a sliding window is an example of a stateful operator. Output operators allow the program to write data to external systems, like HDFS. Additionally to Spark's operators, Spark Streaming also offers windowing, incremental aggregation and time-skew join operators.

_

Fault Tolerance

One of the biggest challenges of streaming processing is fault tolerance and recovery. Since data is mostly valuable at the time of its arrival and most of the applications do not have strong requirements on accuracy, some systems usually choose to allow data loss and resume execution from a future point. Other approaches include replication or upstream backup, both expensive solutions, either in capacity or latency. Spark Streaming introduces the idea of parallel recovery. RDDs are periodically checkpointed and asynchronously replicated. RDDs track their lineage, i.e. they can be rebuilt using a set of deterministic operations. Exploiting this property, when a node in Spark Streaming fails, missing RDD partitions are detected and parallel tasks are launched for their recovery.
_

Notes

I found this paper cited by Muppet, another processing system for streaming data. I enjoyed reading Muppet a lot and I found most of its design decisions very interesting. It could be a very good candidate for my "paper of the week" post for this week, but I was disappointed by the lack of an evaluation section or source code. On the other hand, it provided me with an excellent list of references to look into, one of which is the discussed paper in this post.

 

Links:

 

Happy coding and reading,

V.

Paper of the week: ASTERIX

Towards a scalable, semistructured data platform for evolving-world models

ASTERIX is a data-intensive storage, management and analysis platform resulting from the cooperation among UC Irvine, UC Riverside and UC San Diego.

ASTERIX was designed according to the requirements of an "evolving world model". In this kind of model, think of the system as a place where we can store a small portion of the real world in the form of data. As the world is changing, the data is changing, therefore capturing the world's evolution. We can store and analyze information about events of the present and the past, we allow data to change and new types of data to be added in the future.

In such a model we can ask several interesting questions about the current or the past state of the world, or "as-of" queries:

What is the best route to get to the Olympic Stadium right now?
What is the traffic situation like on Saturday nights close to the city center?
How many visitors that visited the City Hall during the past year also went for dinner in that nearby restaurant?

ASTERIX is based and extends work from three main areas.
Its data model is inspired by work done in the area of semi-structured data, i.e. XML, JSON, Avro and XQuery.
It is greatly influenced by parallel database systems, to which it adds support for more complex data and fuzzy queries, as well as scalability to larger clusters and fault tolerance. 
Inevitably, ASTERIX also borrows from the area of data-intensive computing, however preserving its database ideas. Its difference from other data-intensive frameworks, e.g. Hadoop, is that its runtime was built having a high-level analysis language in mind and not the other way around. Consider the following: Map-Reduce is an over-simplified programming model, which is in fact such a bad fit for high-level analytics that it quickly spawned projects like Pig and Hive. But what if Pig was developed before Hadoop? Would then Hadoop be a reasonable choice as its runtime?
The answer is most probably no.

_
Data Model

In the "evolving world model" a system needs to provide support for various types of data, i.e. structured, semi-structured and also unstructured. Updates can be frequent, including not only values, but also schema changes. And of course, the amount of data is large and data can be arrive in the system in high rates. In other words, all three V's of "Big Data" need to be handled.

ASTERIX defines its own data model, the ASTERIX Data Model (ADM). In this model, data is semi-structured, typed and self-describing. Data is organized in datasets, which can be accessed in parallel, indexed, partitioned and possibly replicated across the cluster. Datasets are entities close to tables in a relational world. By default, data schemes are "open", meaning that schemes can be updated by adding new types in the future. Datasets related to an application can be grouped in a collection called a dataverse, which would correspond to a database in a relational context. External datasets are also supported in a Hive-like way. Users can create a dataverse writing in a data definition language (DDL) where they have to specify a primary key and an optional partitioning key for each dataset.

_
Query Language

The ASTERIX Query Language (AQL), inspired by XQuery, is designed to fit the needs of the data model described above. AQL has a declarative syntax and offers the expressive power similar to this of XQuery and Jaql. AQL queries form a logical plan of operators, a DAG (Directed Acyclic Graph), which then goes through a static optimization process. The goal is to provide a reusable algebra layer, which would make it easy to support different popular languages, such as Pig and Hive. After optimization, the logical plan is transformed into a Hyracks job, i.e. a job that can be executed on the ASTERIX runtime system, described in a following section.

_
System Architecture

Similar to Map-Reduce, ASTERIX targets large shared-nothing cluster architectures. Users submit AQL queries to the system, which compiles them and generates a runtime execution plan. ASTERIX's execution layer, Hyracks, is responsible for determining the degrees of parallelism for each level of the submitted job. In order to take decisions, Hyracks takes into consideration the characteristics of the operators being used, as well as the current state of the cluster. Hyracks is multi-purpose and general enough to even support Map-Reduce-style computation and other kinds of operator graphs. 

There exist two kinds of nodes in an ASTERIX cluster. The Metadata Nodes store information about the datasets and also about the system resources. They are also responsible for query compilation and planning. The rest of the nodes are Compute Nodes and they only contain what is necessary to manage dataset partitions and participate in the execution of Hyracks jobs.

_
Hyracks Overview

Hyracks is the execution engine of ASTERIX, analogous to Nephele in Stratosphere. In a similar way, a Hyracks job is a DAG of operator nodes called HODs (Hyracks Operator Descriptors) connected by CDs (ConnectorDescriptors). CDs expose data dependencies and encapsulate the data distribution logic that should be used during runtime. An example of a CD would be (1:1) or (M:N hash-merge). A HOD creates several HADs (Hyracks Activity Nodes), which provide information to Hyracks on how a job can be divided into stages. Hyracks then is able to decide the evaluation order of each stage and set its degree of parallelism. Finally, HONs (Hyracks Operator Nodes) are created and are responsible for the actual execution.

_
The first open-source release of ASTERIX is expected later this year, while Hyracks is already available on GoogleCode.

_
Links

V.