Paper of the week: SkewTune
Mitigating Skew in MapReduce Applications
This week's paper is about SkewTune, a Hadoop extension built at the University of Washington, which addresses the problem of skew in MapReduce applications.
In general, skew refers to the case when task runtimes vary in a high degree, i.e. one or more of a set of tasks running in parallel take much longer to finish than the rest. Skew can appear in a parallel execution due to several reasons, including computational load imbalance, characteristics of the algorithm or the specific dataset or degraded performance of single nodes. In a parallel system, execution time is limited by the speed of the slowest task, therefore skew problems have always been a headache and have been extensively researched in ceratin domains, such as parallel databases. In the MapReduce context, the straggler problem was immediately identified and solved by its creators, using speculative execution. More specifically, when a task execution is detected to take longer than expected, a copy of it is scheduled on a different node and the result of the fastest node is reported. This is an approach that works well on a cluster with thousands of nodes where failures are common or hard to be distinguished from degraded performance, but suffers from the fact that work already done must be repeated.
SkewTune addresses skew on a different level and originating from different sources, specifically uneven distribution of input data and skew caused due to some parts of the input requiring longer processing times than others.
Types of Skew in MapReduce
- Map phase: Expensive Record
One would not expect skew in map phases to exist, since in MapReduce applications data is equally divided among mappers, i.e. each map task processes roughly the same amount of data (in bytes). However, there exist cases where some records may require significantly more CPU or memory. The reason for this could be that the value of the key-value pair is simply larger or that the algorithm's runtime depends on the value itself. The PageRank algorithm is an example where this type of skew can be encountered, since its runtime depends on the out-degree of each vertex of the graph being processed.
- Map phase: Heterogeneous Map
Map and reduce functions typically accept one input. Since, this is an important limitation in expressing several algorithms, MapReduce developers often emulate n-ary operations by concatenating multiple input datasets and adding special tags to the records, in order to distinguish their source. Although this technique has proved to be very useful, it can also become an important source of skew, as mappers will be assigned different record types which may require different processing.
- Reduce phase: Partitioning skew
In the reduce phase, skew causes are more obvious and more common. The first type occurs when the partitioning function used does not succeed in fairly distributing the load. It is a common case in many applications that a small set of keys are much more popular than others, i.e. often visited URLs, often used words in text etc.
- Reduce phase: Expensive Key Group
This is the case analogous to the Expensive Record in the map phase.
For a more detailed description of skew types and their causes, I suggest you to read this short study.
SkewTune Overview
SkewTune is built as a Hadoop extension and aims to be transparent to the user, so that no existing code needs to be modified and no guarantees of the MapReduce model are violated. SkewTune has mechanisms to detect stragglers and mitigate skew by repartitioning its remaining unprocessed input data.
In order to decide when a task should be treated as a straggler, while avoiding unnecessary overhead and false-positives, SkewTune is using Late Skew Detection. Simply put, SkewTune delays any skew mitigation as long as all slots in the cluster are busy. Only when a resource is detected to be idle, will SkewTune trigger straggler detection. When this moment comes, the system will have to decide which task to identify as a straggler, if any. The team's experience showed that skew mitigation is only beneficial when considering one straggler task at a time. SkewTune selects the task with the highest remaining time estimate and only initiates repartitioning if half of this time estimate is greater than the repartitioning overhead.
Skew mitigation happens in three steps. First, the identified as straggler task is stopped. Next, the remaining input data needs to be scanned in order to collect information about them. SkewTune collects a compressed summary of the remaining data, which takes the form of a series of approximately equal key intervals. Depending on the size of the remaining data, SkewTune may decide to scan the data locally or in parallel. Finally, the co-ordinator of the system plans the re-partitioning strategy and schedules the additional tasks.
In Hadoop, skew mitigation is implemented by SkewTune as a separate MapReduce job for each parallel data scan and for each mitigation. When repartitioning a map task, a map-only job is executed and the job tracker broadcasts all information about the mitigated map to all the reducers in the system. When repartitioning a reduce task, due to the MapReduce static pipeline inflexibility, an identity map phase needs to be run before the actual additional reduce task.
Performance
The paper contains an extensive performance evaluation based on popular applications, such as PageRank, Inverted Index and CloudBurst. It is shown that SkewTune successfully manages to avoid delays due to skew and re-balances the load in the cluster. Most importantly, it makes no assumptions about the cause of the skew and it is also effective in case of job misconfigurations. According to the published results, it can result in applications running up to 4 times faster in the presence of skew, while imposing negligible overhead otherwise.
Links
Happy reading and coding,
V.
Paper of the week: Spark Streaming
Discretized Streams: An Efficient and Fault-Tolerant Model for Stream Processing on Large Clusters
This week's paper comes from Berkeley and the creators of Spark, a Map-Reduce-like cluster computing framework, optimized for iterative jobs. In order to deal with real-time big data, they extended Spark and built a new programming model, discretized streams (D-Streams) and Spark Streaming.
A lot of applications that need to deal with "big data", also need to deal with them at the moment they arrive, i.e. data is mostly valuable when they are received. Identifying trending topics in Twitter or processing server log files in real time to detect a failure as fast as possible are a only a couple of examples.
Apart from enabling real-time big data processing, D-Streams also aim to provide solutions to three of the most important challenges of streaming processing systems: fault-tolerance, consistency and easy integration with batch processing. The system is built on the idea of treating stream computation as a series of deterministic batch computations on small time intervals. This approach directly benefits from the advantages of batch processing and also implies that all existing Spark operations can be reused in Spark Streaming, too. However, in order to minimize latency, intermediate state is kept in memory and not in disk like in batch systems. D-Streams also borrow from Spark the idea of RDDs (Resilient Distributed Datasets), a storage abstraction that can be used to rebuild lost data without using replication. This idea is extended in Spark Streaming to support parallel recovery in case of failures.
D-Streams
During an interval, all input data received is stored in the cluster and forms the input dataset for this particular interval. When the interval completes, data can be processed using common operations like map and reduce. Results and intermediate state are stored using RDDs. A D-Stream is a group of such RDDs. _
Example
ones = pageViews.map(event => (event.url, 1)
counts = ones.runningReduce((a, b) => a + b)
In this example, we group the page views data into D-Streams of 1 sec intervals. Each dataset for each interval is then transformed into a (URL, 1) set of tuples and feeds a running count.
Operators
Notes
I found this paper cited by Muppet, another processing system for streaming data. I enjoyed reading Muppet a lot and I found most of its design decisions very interesting. It could be a very good candidate for my "paper of the week" post for this week, but I was disappointed by the lack of an evaluation section or source code. On the other hand, it provided me with an excellent list of references to look into, one of which is the discussed paper in this post.
Links:
Happy coding and reading,
V.
Paper of the week: ASTERIX
Towards a scalable, semistructured data platform for evolving-world models
V.
Post Series: Paper of the week
For the next couple of months I will be conducting a survey on the state-of-the-art of my reserach area, i.e. data-intensive applications and parallel programming frameworks for data analytics, or if you prefer the "catchy" way, Big Data.
After a quick calculation I realized that in order to achieve a satisfactory cover of the area, I need to read and *understand* about 15-20 papers per week.. I'm only expecting a small percentage out of these to be relevant to what I'm working on.
As a motivation to keep me going and following the idea of my MSc thesis progress posts, I have decided to write one blog post per week dedicated to my favorite paper of that week.
I hope to create a nice collection of useful information by the end of this journey :-)
(or yet another excuse to make me feel productive?)
http://www.phdcomics.com/comics/archive.php?comicid=762
V.
A List of My Favorite Readings on Distributed Systems
Inspired by this wonderful list on Distributed Systems readings and nostalgic about my completed now Master, I decided to put together my own list of my favorite readings, slides, videos, opinions and misc things I came across during the past 2 years.
I am certainly missing a lot of classics, but this is not supposed to be a top-x readings list. Some of it is not even directly DS-related. This is a list of things that I personally found interesting, inpiring and motivating :-)
Papers
The Seven Deadly Sins of Distributed Systems
Consensus: the Big Misunderstanding
Brewer's Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services
The Byzantine Generals Problem
Should Computer Scientists Experiment More?
Why File Sharing Networks Are Dangerous
Who Says What To Whom on Twitter
How NOT to review a paper: The tools and techniques of the adversarial reviewer
Articles / Blog Posts
Amazon Architecture by High Scalability
Google and Microsoft Cheat on Slow-Start. Should You?
MapReduce Patterns, Algorithms and Use Cases
The Game of Distributed Systems Programming. Which Level Are You?
Misc
Big Data Analytics Beyond Map/Reduce
Selected EMDC Presentations
Master is over, chapter is closed and here I am digging into old folders trying to tidy up the mess that these past 2 years have brought over my hard disks..
I'm posting here a short list of presentations I prepared during these past 2 years, both at KTH in Stockholm and at UPC in Barcelona.
I have avoided including presentations on projects and coursework, but mainly focused on presentations of papers, articles and studies.
Thesis Presentation @TU Berlin
An Intro to Pig
On Cloud Operating Systems
Dremel
Meridian: A Lightweight Approach to Network Positioning
Nautilus: A Testbed for Green Scientific Computing
Fighting Over the Future of the Internet
Managing Virtual Resources: Fly Through the Sky
P2P VoIP Applications: A Skype Case Study
Basic Pig Operators
In this post I will present some of the basic and most common and useful Pig operators. I will explain how they operate on data and what results they produce, but also how they are internally translated into Map-Reduce jobs and executed on the Hadoop execution engine.
I should remind here how the compilation to Map-Reduce works. The compiler that transforms the Physical Plan into a DAG of Map-Reduce operators uses a predecessor depth-first traversal to generate the graph. When compiling an operator, the goal is to try and merge it in the existing Map-Reduce operator, i.e. in the current Map or Reduce phase. However, some operators, such as group, require the data to be shuffled or sorted, so they cause the creation of a new Map-Reduce operator. The new operator is connected to the previous one with a store-load combination.
- FOREACH
FOREACH takes as input a record and generates a new one by applying a set of expressions to it. It is essentially a projection operator. It selects fields from a record, applies some tranformations on them and outputs a new record. FOREACH is a non-blocking operator, meaning it can be included inside the current Map-Reduce operator.
- FILTER
FILTER selects those records from dataset for which a predicate is true. Predicates contain equality expressions, regular expressions, boolean operators and user-defined functions. FILTER is also non-blocking and can be merged in the current Map or Reduce plan.
- GROUP BY
GROUP collects all records with the same key inside a bag. A bag is a Pig data structure which can be described as an unordered set of tuples. GROUP generates records with two fields: the corresponding key which is assigned the alias "group" and a bag with the collected records for this key.
We can group on myltiple keys and we can also GROUP "all". GROUP all will use the literal "all" as a key and will generate one and only record with all the data in it. This can be useful if we would like to use some kind of aggregation function on all our records, e.g. COUNT.
GROUP is a blocking operator and it compiles down to three new operators in the Physical Plan: Local Rearrange, Global Rearrange and Package. It requires repartitioning and shuffling, which will force a Reduce phase to be created in the Map-Reduce plan. If we are currently inside a Map phase, then this is no big problem. However, if we are currently inside a Reduce phase, a GROUP will cause the pipeline to go through Map-Shuffle-Reduce.
- ORDER BY
The ORDER BY operator orders records by one or more keys, in ascending ot descending order. However, what is happening behind the scenes is much more interesting than you may imagine. ORDER is not implemented as simply as Sorting-Shuffling-Reduce. Instead it forces the creation of two Map-Reduce jobs. The reason is that datasets often suffer from skew. That means that most of the values are concentrated around a few keys, while other keys have much less corresponding values. This phenomenon will cause only a few of the reducers to be assigned most of the workload, slowing down the overall execution. The first Map-Reduce job that Pig creates is used to perform a fast random sampling of the keys in the dataset. This job will figure out the key distribution and balance the load among reducers in the second job. However, just like in the case of Skew Join, this technique breaks the Map-Reduce convention that all records with the same key will be processed by the same reducer.
- JOIN
JOIN has been extensively discussed in this post.
- COGROUP
COGROUP is a generalization of the GROUP operator, as it can group more than one inputs based on a key. Of course, it is a blocking operator and is compiled in a way similar to that of GROUP.
- UNION
UNION is an operator that concatenates two or more inputs without joining them. It does not require a separate Reduce phase to be created. An interesting point about UNION in PIg is that it does not require the input records to share the same schema. If they do, then the output will also have this schema. If the schemas are different, then the output will have no schema and different records will have different fields. Also, it does not eliminate duplicates.
- CROSS
CROSS will receive two or more inputs and will output the cartesian product of their records. This means that it will match each record from one input with every record of all other inputs. If we have an input of size n records and an input of size m records, CROSS will generate an output with n*m records. The output of CROSS usually results in very large datasets and it should be used with care. CROSS is implemented in a quite complicated way. A CROSS logical operator is in reality equivalent to four operators:
The GFCross function is an internal Pig function and its behaviour depends on the number of inputs, as well as the number of reducers available (specified by the "parallel 10" in the script). It generates artificial keys and tags the records of each input in a way that only one match of the keys is guaranteed and all records of one input will match all records of the other. If you are interested in more details, you can read the corresponding part of this book.
My conclusion of the above analysis was that even the Physical Plan is very dependent on the Map-Reduce framework and does not reflect the right level for my work to be done. (CO)GROUP is compiled down to three new operators and CROSS is compiled down to four, while they can be mapped directly to the CoGroup and Cross Input Contracts of Stratosphere. That led me to move up one level and start working in compiling the Logical Plan into a PACT plan.
It turned out that things are much simpler up there, but a lot more coding needs to be done. Just to illustrate the simplicity, I will use the script and plan generation from the Pig paper:
And this is how the Logical Plan is transformed into a Physical and then a Map-Reduce Plan:
Now, this is how the Logical Plan could be compiled to a PACT Plan: Much simpler and much cleaner! I'm quite optimistic =)And now I have to sit down and code this thing!
Until next time, happy coding!
V.
Pig's Logical Plan Optimizer
Hello from *sunny* Stockholm!
It's been almost a month since my last thesis post and as I was hoping, Spring is finally here :D
It's been a crazy, busy and productive month though, so I will be updating you on my progress by writing two posts today!
This one is about Pig's Logical Plan Optimizer. In my previous posts (here and here) I have explained how Pig creates a data-flow graph from the Pig Latin script, the Logical Plan, and then transforms this graph into a set of Map-Reduce jobs. The Logical Plan goes through the first compiler and is transformed into a Physical Plan, and the Physical Plan is then sent to the Map-Reduce compiler, which transforms it into a DAG of Map-Reduce jobs:
An intermediate and quite interesting stage which is not visible in the above diagram, is the optimization of the Logical Plan. The initial Logical Plan is created by an one-to-one mapping of the Pig Latin statements to Logical Operators. The structure of this plan is of course totally dependent on the scripting skills of the user and can result in highly inefficient execution.
Pig performs a set of transformations on this plan before it compiles it to a Physical one. Most of them are trivial and have been long used in database systems and other high-level languages. However, I think they're still interesting to discuss in the "Pig context".
Rules, RuleSets, Patterns and Transformers
The base optimizer class is designed to accept a list of RuleSets, i.e. sets of rules. Each RuleSet contains rules that can be applied together without conflicting with each other. Pig applies each rule in a set repeatedly, until no rule is longer applicable or it has reached a maximum number of iterations. It then moves to the next set and never returns to a previous set.
Each rule has a pattern and an associated transformer. A pattern is essentially a sub-plan with specific node types. The optimizer will try to find this pattern inside the Logical Plan and if it exists, we have a match. When a match is found, the optimizer will then have to look more in depth into the matched pattern and decide whether the rule fulfils some additional requirements. If it does, then the rule is applied and the transformer is responsible for making the corresponding changes to the plan.
Some extra caution is needed in two places. The current pattern matching logic assumes that all the leaves in the pattern are siblings. You can read more on this issue here. This assumption creates no problems with the existing rules. However, when new rules are designed, it should be kept in mind that the pattern matching logic might need to be changed.
Another point that needs highlighting has to do with the actual Java implementation. When searching for a matching pattern, the match() method will return a list of all matched sub-plans. Each one of them is a subset of the original plan and the operators returned are the same objects as in the original plan.
Some Examples
- ColumnMapKeyPrune
This rules prunes columns and map keys that are not needed. More specifically, removes a column if it mentioned in a script but never used and a map key if it never mentioned in the script.
- FilterAboveForeach
Guess what? Pushes Filter operators above Foreach operators! However, it checks if the field that Filter works on is present in the predecessor of Foreach:
- MergeFilter
As you can imagine, it merges two consecutive Filter operators, adding the condition of the second Filter to the condition of the first Filter with an AND operator:
- MergeForeach
This rule merges Foreach operators, but it's not as simple as it sounds. There are a few additional requirements that need to be met. For example, if the first Foreach operator has a Flatten in its internal plan, the rule cannot be applied. The optimizer also checks how many times the outputs of the first Foreach are used by the second Foreach. The assumption is that if an output is reffered tomore than once, the overhead of multiple expression calculation might even out the benefits from the application of this rule:
There are several more optimization rules, but I hope the idea is clear from the examples I already mentioned. All the optimizations performed at this level are general-purpose transformations and decoupled from the execution engine and the Map-Reduce model. However, this is not true after the transformation to a Physical Plan. And this is why I now understand why the integration alternatives I had in mind in late February are not worth implementing.
The reason will become clear with my next post very very soon.
Until then, happy coding :)
V.
Join Types in Pig
This blog post is on joins! This trivial but extremely useful relational operation I know you' re all familiar with!
Inner join, equi-join, natural join, theta-join, outer join, left-outer join, right-outer join, full-outer join, self join, semi-join...
I bet you remember the definitions and tell the differences as easy as you remember the multiplication tables... Right! Once upon a time, I also could... just right before my undergrad databases exam... hmmm...
Honestly, I've always found it hard to remember the specific details for all different types of joins available and I always need to refresh the concepts whenever I need to use a specific type. (Oh, how much I love wikipedia, hell yeah I do! :p)
Joins in Map-Reduce
No matter how common and trivial, join operations have always been a headache to Map-Reduce users. A simple google search on "map-reduce join operation" will give you several blog posts, presentations and papers as a result. The problem originates from Map-Reduce's Map-Shuffle-Sort-Reduce static pipeline and single input second-order functions. The challenge is finding the most effective way to "fit" the join operation into this programming model.
The most common strategies are two and both consist of one Map-Reduce job:
- Reducer-side join: In this strategy, the map phase serves as the preparation phase. The mapper reads records from both inputs and tags each record with a label based on the origin of the record. It then emits records setting as key the join key. Each reducer then receives all records that share the same key, checks the origin of each record and generates the cross product. Slides 21-22 from this ETH presentation provide a very clear example.
- Mapper-side join: The alternative comes from the introduction of Hadoop's distributed cache. This facility can be used to broadcast one of the inputs to all mappers and perform the join in the map phase. However, it is quite obvious that this technique only makes sense in the case where one of the inputs is small enough to fit in the distributed cache!
Joins in Pig
Fortunately, Pig users do not need to program the join operations themselves as Pig Latin offers the JOIN statement. Also, since Pig is a high-level abstraction that aims to hide low-level implementation details, they do not need to care about the join strategy... Or do they?
Pig users can use the JOIN operator in pair with the USING keyword in order to select the join execution strategy. Pig offers the following Advanced Join Techniques:
- Fragment-Replicate Join: USING 'replicated'
It is advised to used this technique when a small table that fits to memory needs to be joined with a significantly larger table. The small table will be loaded in the memory of each machine using the distributed cache, while the large table will be fragmented and distributed to the mappers. No reduce phase is required, as the join can be completely implemented in the map phase. This type of join can only support inner and left-outer join, as the left table is always the one that will be replicated. Pig implements this join by creating two map-only jobs. During the first one, the distributed cache is set and the small input is broadcasted to all machines. The second one is used to actually perform the join operation.
The user must pay attention and have in mind that the second table in their statement will be the one loaded into memory, i.e. in the statement:
joined = JOIN A BY $0, B BY $0 USING 'replicated'
B is the input that will be loaded into memory. Extra care needs to be taken for one more reason when using this type of join. Pig will not check beforehand if the specified input will fit into memory, thus resulting in a runtime error in case it doesn't!
- Merge Join: USING 'merge'
You should use this type of join when the inputs are already sorted by key. This is a variation of the well-known sort-merge algorithm, where the sort is already performed :)
In order to execute this join, Pig will first run an initial Map-Reduce job that will sample the second input and build an index of the values of the join keys for each HDFS block. The second job will take the first input and utilize the index to find the key it is looking for in the correct block. For each key, all records with this particular key will be saved in memory and used to do the join. In other words, two pointers need to be maintained, one for each input. Since both inputs are sorted, only one lookup in the index is required.
- Skew Join: USING 'skewed'
The third and last type of join provided by Pig is the skew join. It is quite common that some keys are a lot more popular than others in datasets, that is, most of the values correspond to a very small set of keys. Using the default algorithm in such a case would result in significantly overloading some of the reducers in the system.
In order to overcome this problem, one can use Pig's skew join. Pig will first sample one of the inputs, searching for the popular keys, whose records would not fit in memory. The rest of the records will be handled by a default join. However, records that belong to one of the identified as popular keys, will be split among a number of reducers. The records of the other input that correspond to keys that were split, will be replicated in each reducer that contains that key.
Skew is supported in one input only. If both tables have skew, the algorithm will still work, but will be significantly slower.
However, extra care should be taken when using this type of join! This algorithm breaks the Map-Reduce convention that all records with the same key will be processed by the same reducer! This could be dangerous or weild unexpected results if one tries to use an operation that depends on all records with the same key being in the same part file!
Thoughts...
Pig's philosophy states that "Pigs are domestic animals", meaning that users should be able to control and modify its behaviour. This is one of the reasons why Pig does not have an optimizer to choose among the available join strategies and leaves this choice to the user. However, this choice implies that the users have a deep understanding on how the different techniques work, as well as adequate information regarding the format and distribution of the data they want to join is available. If this is not the case, a wrong choice will almost surely lead to severe execution overhead.
My scepticism comes from the high-level nature that such a system is supposed to offer. What do the users of such systems know and what should they know? In my understanding, the whole point of a high-level abstraction is to hide implementation details and low-level information on how the underlying framework works. And honestly speaking, I can't see how an optimizer would come in conflict to Pig's philosophy on it being a "domestic animal". Maybe, it could be designed so that it is possible to disable.
How is all this related to my thesis? The truth is that I will probably have no time at all to look into this any further. On the other hand, it is interesting to point out that Stratosphere offers an almost natural way of expressing joins and other relational operations using its Input Contracts. The Match Contract essentially maps to an inner-join, while the PACT compiler can choose the most effective execution strategy to implement it. The CoGroup Input Contract can be used to realize outer and anti-joins, while the Cross Contract can be used to implement all kinds of arbitrary theta-joins.
I personally find this kind of issues really intriguing and although I will probably have to "push" them into "future work", I now have something to look forward after my thesis is done =)
I hope it will be Spring already by the next time I post!
Until then, happy coding!
V.
PS: For more info on Pig's advanced relational operations, here is da book!
Pig's Hadoop Launcher
This is a post on the functionality of the main class that launches Pig for Hadoop Map-Reduce and also a good starting point for developers wishing to contribute to the Pig project.
The class in question is the MapReduceLauncher and is found in the package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer
It extends the abstract class Launcher, which provides a simple interface to:
- reset the state of the system after launch
- launch Pig (in cluster or local mode)
- explain how the generated Pig plan will be executed in the underlying infrastructure
Other methods provided are related to gathering runtime statistics and retrieving job status information.
The most important methods of MapReduceLauncher are compile() and launchPig(). It is advised that launchers for other frameworks (i.e. other than Hadoop MR) should override these methods.
The compile method gets a Physical Plan and compiles it down to a Map-Reduce Plan. It is the point where all optimizations take place. A total of eleven different optimizations are possible in this stage, including combiner optimizations, secondary sort key optimizations, join operations optimizations etc. Optimizations will be the focus of the second phasis of my thesis, when I will have to dig into these classes!
The launchPig method is more interesting to me at this point of my work. It receives the Physical Plan to be compiled and executed as a parameter and returns a PigStats object, which contains statistics collected during the execution.
In short, it consists of the following *simplified* steps:
- Calls the compile method and retrieves the optimized Map-Reduce Plan
- Retrieves the Execution Engine
- Creates a JobClient Object
The JobClient class provides the primary interface for the user-code to interact with Hadoop's JobTracker. It allows submitting jobs and tracking their progress, accessing logs and status information. Usually, a user creates a JobConf object with the configuration information and then uses the JobClient to submit the job and monitor its progress.
- Creates a JobControlCompiler object. The Jo0bControlCompiler compiles the Map-Reduce Plan into a JobControl object
The JobControl object encapsulates a set of Map-Reduce jobs and their dependencies. It tracks the state of each job and has a separate thread that submits the jobs when they become ready, monitors them and updates their states. I hope the following diagrams will make this clear:
- Repeatedly calls the JobControlCompiler's compile method until all jobs in the Map-Reduce Plan are exhausted
- While there are still jobs in the plan, retrieves the JobTracker URL, launches the jobs and periodically checks their status, updating the progress and statistics information
- When all jobs in the Plan have been consumed, checks for native Map-Reduce jobs and runs them
- Finally, aggregates statistics, checks for exceptions, decides the execution outcome and logs it
Next Steps
When I did the analysis above (almost two weeks ago) I made a list of my next steps including:
- Browse through the rest of the thesis-related Pig code, i.e. org.apache.pig.backend*
- Identify the Classes and Interfaces that need to be changed
- Identify Hadoop dependencies in the Pig codebase
- Find the Stratosphere "equivalents" of JobControl, JobClient, JobConf etc.
- Find out how to run a PACT program from inside Pig
Since then, I've been browsing through the Pig code and I have also started coding (finally!). I've identified way more classes and interfaces that need to be changed even for the simplest version of the system I'm building and I am certainly amazed by the amount of dependencies I've found and need to take care of... And it seems that finding "equivalents" is not a straigh-forward or easy task at all!
But the challenge has already been accepted! I'll be updating soon with my solutions :-)
Until then, happy coding!
V.













