First Dive into Pig Code

The Pig project code base is quite big and complex. In this post, I will focus on the back end side of the system, meaning the execution engine. The hierarchy of Pig’s back end looks roughly like this:

Pigbackend
Zooming in the Hadoop execution engine, we get the following diagram:

Pigexecutionengine
 
However, the engine itself also has a front end and a back end.
 
The front end takes care of all compilation and transformation from one Plan to another. First, the parser transforms a Pig Latin script into a Logical Plan. Semantic checks (such as type checking) and some optimizations (such as determining which fields in the data need to be read to satisfy the script) are done on this Logical Plan. The Logical Plan is then transformed into a PhysicalPlan. In the above hierarchy, PhysicalPlan lies under ExecutionEngine -> PhysicalLayer -> Plans. This Physical Plan contains the operators that will be applied to the data.
This PhysicalPlan is then passed to the MRCompiler. The MRCompiler lies under ExecutionEngine-> MapReduceLayer. This is the compiler that transforms the PhysicalPlan into a DAG of MapReduce operators. It uses a predecessor depth-first traversal of the PhysicalPlan to generate the compiled graph of operators. When compiling an operator, the goal is first trying to merge it in the existing MapReduce operators, in order to keep the generated number of jobs as small as possible. A new MapReduce operator is introduces only for blocking operators and splits. The two operators are then connected using a store-load combination. The output of the MRComiler is an MROperPlan object. This corresponds to the Map-Reduce plan to be executed.
This plan is then optimized by using the Combiner where possible or by compining jobs that scan the same input data etc..  
The final set of of MapReduce jobs is generated by the JobControlCompiler. This class lies under ExecutionEngine-> MapReduceLayer. It takes an MROperPlan and converts it into a JobControl object with the relevant dependency info maintained. The JobControl Object is made up of Jobs each of which has a JobConf. The conversion is done by the method compile(), which compiles all jobs that have no dependencies, removes them from the plan and returns. It must be called with the same plan until exhausted and it returns a JobControl Object
The generated jobs are then submitted to Hadoop and monitored by the 
MapReduceLauncher.
 
In the back end, each PigGenericMapReduce.Map, PigCombiner.Combine, and PigGenericMapReduce.Reduce use the pipeline of physical operators constructed in the front end to load, process, and store data.

The goal of my project is to replace the Hadoop execution engine component with a new one, corresponding to the Nephele execution engine. It might sound easy, but it is not as trivial as it looks. Even if Pig was built having modularity in mind and trying to make it independent of the execution engine, it seems that this is not exactly the case. A lot of parameters are Hadoop-specific and there are a lot of dependencies outside the Hadoop packages that need to be taken care of.

Wish me luck!
I wish you happy coding :-)
V.