Basic Pig Operators

In this post I will present some of the basic and most common and useful Pig operators. I will explain how they operate on data and what results they produce, but also how they are internally translated into Map-Reduce jobs and executed on the Hadoop execution engine. 

I should remind here how the compilation to Map-Reduce works. The compiler that transforms the Physical Plan into a DAG of Map-Reduce operators uses a predecessor depth-first traversal to generate the graph. When compiling an operator, the goal is to try and merge it in the existing Map-Reduce operator, i.e. in the current Map or Reduce phase. However, some operators, such as group, require the data to be shuffled or sorted, so they cause the creation of a new Map-Reduce operator. The new operator is connected to the previous one with a store-load combination.

  • FOREACH

FOREACH takes as input a record and generates a new one by applying a set of expressions to it. It is essentially a projection operator. It selects fields from a record, applies some tranformations on them and outputs a new record. FOREACH is a non-blocking operator, meaning it can be included inside the current Map-Reduce operator.

Foreach

  • FILTER

FILTER selects those records from dataset for which a predicate is true. Predicates contain equality expressions, regular expressions, boolean operators and user-defined functions. FILTER is also non-blocking and can be merged in the current Map or Reduce plan.

Filter

  • GROUP BY

GROUP collects all records with the same key inside a bag. A bag is a Pig data structure which can be described as an unordered set of tuples. GROUP generates records with two fields: the corresponding key which is assigned the alias "group" and a bag with the collected records for this key.

Groupby

We can group on myltiple keys and we can also GROUP "all". GROUP all will use the literal "all" as a key and will generate one and only record with all the data in it. This can be useful if we would like to use some kind of aggregation function on all our records, e.g. COUNT. 

GROUP is a blocking operator and it compiles down to three new operators in the Physical Plan: Local Rearrange, Global Rearrange and Package. It requires repartitioning and shuffling, which will force a Reduce phase to be created in the Map-Reduce plan. If we are currently inside a Map phase, then this is no big problem. However, if we are currently inside a Reduce phase, a GROUP will cause the pipeline to go through Map-Shuffle-Reduce.

  • ORDER BY

The ORDER BY operator orders records by one or more keys, in ascending ot descending order. However, what is happening behind the scenes is much more interesting than you may imagine. ORDER is not implemented as simply as Sorting-Shuffling-Reduce. Instead it forces the creation of two Map-Reduce jobs. The reason is that datasets often suffer from skew. That means that most of the values are concentrated around a few keys, while other keys have much less corresponding values. This phenomenon will cause only a few of the reducers to be assigned most of the workload, slowing down the overall execution. The first Map-Reduce job that Pig creates is used to perform a fast random sampling of the keys in the dataset. This job will figure out the key distribution and balance the load among reducers in the second job. However, just like in the case of Skew Join, this technique breaks the Map-Reduce convention that all records with the same key will be processed by the same reducer.

  • JOIN

JOIN has been extensively discussed in this post.

  • COGROUP

COGROUP is a generalization of the GROUP operator, as it can group more than one inputs based on a key. Of course, it is a blocking operator and is compiled in a way similar to that of GROUP.

  • UNION

UNION is an operator that concatenates two or more inputs without joining them. It does not require a separate Reduce phase to be created. An interesting point about UNION in PIg is that it does not require the input records to share the same schema. If they do, then the output will also have this schema. If the schemas are different, then the output will have no schema and different records will have different fields. Also, it does not eliminate duplicates.

  • CROSS

CROSS will receive two or more inputs and will output the cartesian product of their records. This means that it will match each record from one input with every record of all other inputs. If we have an input of size n records and an input of size m records, CROSS will generate an output with n*m records. The output of CROSS usually results in very large datasets and it should be used with care. CROSS is implemented in a quite complicated way. A CROSS logical operator is in reality equivalent to four operators:

Cross

The GFCross function is an internal Pig function and its behaviour depends on the number of inputs, as well as the number of reducers available (specified by the "parallel 10" in the script). It generates artificial keys and tags the records of each input in a way that only one match of the keys is guaranteed and all records of one input will match all records of the other. If you are interested in more details, you can read the corresponding part of this book.

My conclusion of the above analysis was that even the Physical Plan is very dependent on the Map-Reduce framework and does not reflect the right level for my work to be done. (CO)GROUP is compiled down to three new operators and CROSS is compiled down to four, while they can be mapped directly to the CoGroup and Cross Input Contracts of Stratosphere. That led me to move up one level and start working in compiling the Logical Plan into a PACT plan. 

It turned out that things are much simpler up there, but a lot more coding needs to be done. Just to illustrate the simplicity, I will use the script and plan generation from the Pig paper:

Paper_script

And this is how the Logical Plan is transformed into a Physical and then a Map-Reduce Plan:

Paper_plans
Now, this is how the Logical Plan could be compiled to a PACT Plan:

Pact_plan
Much simpler and much cleaner! I'm quite optimistic =)

And now I have to sit down and code this thing!

Until next time, happy coding!

V.