Basic Pig Operators

In this post I will present some of the basic and most common and useful Pig operators. I will explain how they operate on data and what results they produce, but also how they are internally translated into Map-Reduce jobs and executed on the Hadoop execution engine. 

I should remind here how the compilation to Map-Reduce works. The compiler that transforms the Physical Plan into a DAG of Map-Reduce operators uses a predecessor depth-first traversal to generate the graph. When compiling an operator, the goal is to try and merge it in the existing Map-Reduce operator, i.e. in the current Map or Reduce phase. However, some operators, such as group, require the data to be shuffled or sorted, so they cause the creation of a new Map-Reduce operator. The new operator is connected to the previous one with a store-load combination.

  • FOREACH

FOREACH takes as input a record and generates a new one by applying a set of expressions to it. It is essentially a projection operator. It selects fields from a record, applies some tranformations on them and outputs a new record. FOREACH is a non-blocking operator, meaning it can be included inside the current Map-Reduce operator.

Foreach

  • FILTER

FILTER selects those records from dataset for which a predicate is true. Predicates contain equality expressions, regular expressions, boolean operators and user-defined functions. FILTER is also non-blocking and can be merged in the current Map or Reduce plan.

Filter

  • GROUP BY

GROUP collects all records with the same key inside a bag. A bag is a Pig data structure which can be described as an unordered set of tuples. GROUP generates records with two fields: the corresponding key which is assigned the alias "group" and a bag with the collected records for this key.

Groupby

We can group on myltiple keys and we can also GROUP "all". GROUP all will use the literal "all" as a key and will generate one and only record with all the data in it. This can be useful if we would like to use some kind of aggregation function on all our records, e.g. COUNT. 

GROUP is a blocking operator and it compiles down to three new operators in the Physical Plan: Local Rearrange, Global Rearrange and Package. It requires repartitioning and shuffling, which will force a Reduce phase to be created in the Map-Reduce plan. If we are currently inside a Map phase, then this is no big problem. However, if we are currently inside a Reduce phase, a GROUP will cause the pipeline to go through Map-Shuffle-Reduce.

  • ORDER BY

The ORDER BY operator orders records by one or more keys, in ascending ot descending order. However, what is happening behind the scenes is much more interesting than you may imagine. ORDER is not implemented as simply as Sorting-Shuffling-Reduce. Instead it forces the creation of two Map-Reduce jobs. The reason is that datasets often suffer from skew. That means that most of the values are concentrated around a few keys, while other keys have much less corresponding values. This phenomenon will cause only a few of the reducers to be assigned most of the workload, slowing down the overall execution. The first Map-Reduce job that Pig creates is used to perform a fast random sampling of the keys in the dataset. This job will figure out the key distribution and balance the load among reducers in the second job. However, just like in the case of Skew Join, this technique breaks the Map-Reduce convention that all records with the same key will be processed by the same reducer.

  • JOIN

JOIN has been extensively discussed in this post.

  • COGROUP

COGROUP is a generalization of the GROUP operator, as it can group more than one inputs based on a key. Of course, it is a blocking operator and is compiled in a way similar to that of GROUP.

  • UNION

UNION is an operator that concatenates two or more inputs without joining them. It does not require a separate Reduce phase to be created. An interesting point about UNION in PIg is that it does not require the input records to share the same schema. If they do, then the output will also have this schema. If the schemas are different, then the output will have no schema and different records will have different fields. Also, it does not eliminate duplicates.

  • CROSS

CROSS will receive two or more inputs and will output the cartesian product of their records. This means that it will match each record from one input with every record of all other inputs. If we have an input of size n records and an input of size m records, CROSS will generate an output with n*m records. The output of CROSS usually results in very large datasets and it should be used with care. CROSS is implemented in a quite complicated way. A CROSS logical operator is in reality equivalent to four operators:

Cross

The GFCross function is an internal Pig function and its behaviour depends on the number of inputs, as well as the number of reducers available (specified by the "parallel 10" in the script). It generates artificial keys and tags the records of each input in a way that only one match of the keys is guaranteed and all records of one input will match all records of the other. If you are interested in more details, you can read the corresponding part of this book.

My conclusion of the above analysis was that even the Physical Plan is very dependent on the Map-Reduce framework and does not reflect the right level for my work to be done. (CO)GROUP is compiled down to three new operators and CROSS is compiled down to four, while they can be mapped directly to the CoGroup and Cross Input Contracts of Stratosphere. That led me to move up one level and start working in compiling the Logical Plan into a PACT plan. 

It turned out that things are much simpler up there, but a lot more coding needs to be done. Just to illustrate the simplicity, I will use the script and plan generation from the Pig paper:

Paper_script

And this is how the Logical Plan is transformed into a Physical and then a Map-Reduce Plan:

Paper_plans
Now, this is how the Logical Plan could be compiled to a PACT Plan:

Pact_plan
Much simpler and much cleaner! I'm quite optimistic =)

And now I have to sit down and code this thing!

Until next time, happy coding!

V.

Join Types in Pig

This blog post is on joins! This trivial but extremely useful relational operation I know you' re all familiar with! 

Inner join, equi-join, natural join, theta-join, outer join, left-outer join, right-outer join, full-outer join, self join, semi-join...

I bet you remember the definitions and tell the differences as easy as you remember the multiplication tables... Right! Once upon a time, I also could... just right before my undergrad databases exam... hmmm...

Honestly, I've always found it hard to remember the specific details for all different types of joins available and I always need to refresh the concepts whenever I need to use a specific type. (Oh, how much I love wikipedia, hell yeah I do! :p)

 

Joins in Map-Reduce

No matter how common and trivial, join operations have always been a headache to Map-Reduce users. A simple google search on "map-reduce join operation" will give you several blog posts, presentations and papers as a result. The problem originates from Map-Reduce's Map-Shuffle-Sort-Reduce static pipeline and single input second-order functions. The challenge is finding the most effective way to "fit" the join operation into this programming model.

The most common strategies are two and both consist of one Map-Reduce job:

  • Reducer-side join: In this strategy, the map phase serves as the preparation phase. The mapper reads records from both inputs and tags each record with a label based on the origin of the record. It then emits records setting as key the join key. Each reducer then receives all records that share the same key, checks the origin of each record and generates the cross product. Slides 21-22 from this ETH presentation provide a very clear example.

 

  • Mapper-side join: The alternative comes from the introduction of Hadoop's distributed cache. This facility can be used to broadcast one of the inputs to all mappers and perform the join in the map phase. However, it is quite obvious that this technique only makes sense in the case where one of the inputs is small enough to fit in the distributed cache!

 

Joins in Pig

Fortunately, Pig users do not need to program the join operations themselves as Pig Latin offers the JOIN statement. Also, since Pig is a high-level abstraction that aims to hide low-level implementation details, they do not need to care about the join strategy... Or do they?

Pig users can use the JOIN operator in pair with the USING keyword in order to select the join execution strategy. Pig offers the following Advanced Join Techniques:

  • Fragment-Replicate Join: USING 'replicated'

It is advised to used this technique when a small table that fits to memory needs to be joined with a significantly larger table. The small table will be loaded in the memory of each machine using the distributed cache, while the large table will be fragmented and distributed to the mappers. No reduce phase is required, as the join can be completely implemented in the map phase. This type of join can only support inner and left-outer join, as the left table is always the one that will be replicated. Pig implements this join by creating two map-only jobs. During the first one, the distributed cache is set and the small input is broadcasted to all machines. The second one is used to actually perform the join operation.

The user must pay attention and have in mind that the second table in their statement will be the one loaded into memory, i.e. in the statement:

joined = JOIN A BY $0, B BY $0 USING 'replicated'

B is the input that will be loaded into memory. Extra care needs to be taken for one more reason when using this type of join. Pig will not check beforehand if the specified input will fit into memory, thus resulting in a runtime error in case it doesn't!

  • Merge Join: USING 'merge'

You should use this type of join when the inputs are already sorted by key. This is a variation of the well-known sort-merge algorithm, where the sort is already performed :)

In order to execute this join, Pig will first run an initial Map-Reduce job that will sample the second input and build an index of the values of the join keys for each HDFS block. The second job will take the first input and utilize the index to find the key it is looking for in the correct block. For each key, all records with this particular key will be saved in memory and used to do the join. In other words, two pointers need to be maintained, one for each input. Since both inputs are sorted, only one lookup in the index is required.

  • Skew Join: USING 'skewed'

The third and last type of join provided by Pig is the skew join. It is quite common that some keys are a lot more popular than others in datasets, that is, most of the values correspond to a very small set of keys. Using the default algorithm in such a case would result in significantly overloading some of the reducers in the system. 

In order to overcome this problem, one can use Pig's skew join. Pig will first sample one of the inputs, searching for the popular keys, whose records would not fit in memory. The rest of the records will be handled by a default join. However, records that belong to one of the identified as popular keys, will be split among a number of reducers. The records of the other input that correspond to keys that were split, will be replicated in each reducer that contains that key.

Skew is supported in one input only. If both tables have skew, the algorithm will still work, but will be significantly slower.

However, extra care should be taken when using this type of join! This algorithm breaks the Map-Reduce convention that all records with the same key will be processed by the same reducer! This could be dangerous or weild unexpected results if one tries to use an operation that depends on all records with the same key being in the same part file!

 

Thoughts...

Pig's philosophy states that "Pigs are domestic animals", meaning that users should be able to control and modify its behaviour. This is one of the reasons why Pig does not have an optimizer to choose among the available join strategies and leaves this choice to the user. However, this choice implies that the users have a deep understanding on how the different techniques work, as well as adequate information regarding the format and distribution of the data they want to join is available. If this is not the case, a wrong choice will almost surely lead to severe execution overhead.

My scepticism comes from the high-level nature that such a system is supposed to offer. What do the users of such systems know and what should they know? In my understanding, the whole point of a high-level abstraction is to hide implementation details and low-level information on how the underlying framework works. And honestly speaking, I can't see how an optimizer would come in conflict to Pig's philosophy on it being a "domestic animal". Maybe, it could be designed so that it is possible to disable.

How is all this related to my thesis? The truth is that I will probably have no time at all to look into this any further. On the other hand, it is interesting to point out that Stratosphere offers an almost natural way of expressing joins and other relational operations using its Input Contracts. The Match Contract essentially maps to an inner-join, while the PACT compiler can choose the most effective execution strategy to implement it. The CoGroup Input Contract can be used to realize outer and anti-joins, while the Cross Contract can be used to implement all kinds of arbitrary theta-joins. 

I personally find this kind of issues really intriguing and although I will probably have to "push" them into "future work", I now have something to look forward after my thesis is done =)

 

I hope it will be Spring already by the next time I post!

Until then, happy coding!

V.

 

PS: For more info on Pig's advanced relational operations, here is da book!

Pig and Stratosphere Integration Alternatives

In this post I am going to present some alternative design choices concerning the actual implementation of the project, i.e. the integration of Pig and Stratosphere systems.

The main goal is to have a working system, such that Pig Latin scripts can be executed on top of the Nephele execution engine. However, performance is an issue, and of course, we wouldn't like to end up with a system slower than the current implementation :S The very motivation of this project is to overcome the limitations of the existing system, by exploiting Stratosphere's features.

The architectures of the two systems are shown side by side in the next diagram:

Ps0

The integration can be achieved in several ways and on different levels:

  • Translate MapReduce programs into PACT programs

This is the naive and straight-forward way of solving the given problem. PACT already supports Map and Reduce Input Contracts, which can be used for the transformation of the Map-Reduce Plan into a one-to-one PACT Plan. The Logical and Physical Plans that are generated by Pig can be re-used without modification. It is obvious that this solution wouldn't provide any gains compared to the existing implementation. In fact, it should be slower, since it adds one more layer to the system architecture. However, it is the simplest approach and it will be my starting point, in order to better understand the framworks' internals =)

Psv1

  • Translate the Physical Plan into a PACT Plan

This is a more natural solution and corresponds to the approach that would have been taken if Pig had been designed having Nephele in mind as execution engine, instead of Hadoop. It includes completely replacing the MapReduce Plan by a PACT Plan, which will be generated directly from the Physical Plan. This way, the additional Input Contracts, such as Match, Cross and CoGroup, could be used to compile common operation, like Joins. I hope and do expect this solution to be advantageous over the existing implementation. With this design, we should be able to exploit stratosphere's advantages and reflect them as performance gains, in certain classes of applications.

Psv2

  • Translate the MapReduce Plan (or even Physical Plan) into a Nephele Job
If you just look at the two system architectures, as shown in the above figures, you might think that the more layers you take away the faster the resulting system would be. For example, one could argue that getting rid of both the high-level programming frameworks, Map-Reduce and PACT, would speed up things. However, merging at that point, would include re-implementing a job already done, i.e. compiling down to code that can be understood by an execution engine, such as Nephele (or Hadoop). A speedup in this case is quite unprobable to happen and it should mean that there is something wrong with the PACT compiler. Well, I have no reason to suspect so, or any spare time to check this during the 3 months I have left :p

The solutions discussed here are not the only ones possible. One could think of and propose several variations in different levels. For example, in order to take full advantage of Stratosphere's flexibility, it would be reasonable to try and modify Pig in the level of the Physical Plan. Of course, there is the danger of messing up with Pig's modularity and making it execution engine dependent. Moreover, one could exploit Stratosphere's Output Contracts and implement optimization rules, in cases such as grouping or joining pre-partinioned or already sorted data. 

The thing I like with this project is that I constantly have more and more ideas about variations, optimizations and possible extensions. And every time I meet with my supervisor and his team, I fill in my notebook with as many interesting and motivating thoughts from them all! However, I don't have all the time in the world, so I focus in the first two alternatives for the purpose of my thesis.
Just a final conclusion and something that I always have in mind while working on this project:

When any kind of abstracton is made, and this applies as well for high-level languages, there is always an overhead you have to pay in exchange for simplicity. The underlying system, of which the details the user doesn't need to know anymore, will be designed to take several decisions that would often differ from those an experienced low-level programmer would take.

However, the abstraction only has value, provided that the frustration imposed to the user by the slow-down of accomplishing their job, is lower than the satisfaction they get by being able to accomplish this job in a simpler way.

 

Hoping for a valuable abstraction!

Until next time, happy coding,

V.

The PACT programming model

PACT is Stratosphere's programming model. It consists of the so-called Parallelization Contracts which push the Map-Reduce idea one step further.

I was thinking of writing a post, explaining how PACT works, but the truth is that I wouldn't do any better than the already existing documentation at the Stratosphere project website.

So, I will only provide here some useful links:

  • The Pact Programming Model: A high-level view of the programming model and in-detail presentation of the second-order functions available and the guarantees provided by the framework.
  • Building a PACT Program: A guide to PACT programming, including everything you need to know before starting writing PACT programs.
  • Example Jobs: Six example PACT programs of varying difficulty, starting from simple WordCount to more complex graph analysis algorithms.
  • The PACT Compiler: A detailed overview of how the PACT Compiler is built and how it performs the transformation of PACT programs into Nephele DAGs.

If you are already familiar with MapReduce programming, you will also find this paper very helpful. It compares the two programming models and contains a series of examples of common data analysis tasks implemented in both models.

 

Keep on happy coding,

V.

The Nephele Execution Engine

This post is an overview of Nephele, Stratosphere's execution engine. The programming model provided with Stratosphere is PACT (and will be covered in a future post). However, it is possible to submit jobs directly to the Nephele engine, in the form of Directed Acyclic Graphs (DAGs), where each vertex of the graph represents a task of the job. There are three types of vertices: Input vertices, Output vertices and Task vertices. The edges of the graph correspond to the communication channels between tasks.
 
Why choose Nephele over other engines? 
 
One big advantage of Nephele is the high degree of parametrization it offers, which could lead to several optimizations. It is possible for the user to set the degree of data parallelism per task or explicitly specify the type of communication channels between nodes. More importantly, Nephele supports dynamic resource allocation. In contrast to MapReduce ans Dryad which are designed to work on static cluster environmnets, Nephele is capable of allocating resources from a Cloud environmnet depending on the workload.

Architecture Overview

Nephele consists of several communicating components that are presented next in more detail. An overview of the Nephele architecture is shown in the following diagram.

In order to submit a job to the Nephele engine, a Client has to communicate with the the Job Manager. The Job Manager is unique in the system and is responsible for scheduling the jobs it receives and coordinating their execution.

The resources required for job execution are managed by the Instance Manager. The Instance Manager allocates or deallocates virtual machines, depending on the workload of the current execution phase. The jobs are executed in parallel by instances, each of which is controlled by a Task Manager. The Task Manager communicates with the Job Manager and is assigned jobs for execution. During execution, each Task Manager sends information about changes in the execution state of the job (completed, failed, etc.). Task Managers also periodically send heartbeats to the Job Manager, which are then propagated to the Instance Manager. This way, the Instance Manager is  keeping track of the availability of running instances. If a Task Manager has not sent a heartbeat in the given heartbeat interval, the host is assumed to be dead. The Instance Manager then removes the respective Task Manager from the set of compute resources and calls the scheduler to take appropriate actions.

When the Job Manager receives a job graph from the Client, it decides how many and what types of instances need to be launched. Once all virtual machines have booted up, execution is triggered. Persistent storage, accessible from both the Job and Task Managers, is needed to store the jobs’ input and output data.
 

Nephele Jobs
 
Jobs in Nephele are defined as Directed Acyclic Graphs (DAGs). Each graph vertex represents one task and each edge indicates communication flow between tasks. Three types of vertices can be defined: Task vertex, Input vertex and Output vertex. The Input and Output vertices define how data is read or written to disk. The Task vertices are where the actual user code is executed.
 
Nephele defines a default strategy for setting up the execution of a job. However, there is a set of parameters that the user can tune in order to make execution more efficient. These parameters include the number of parallel subtasks, the number of subtasks per instance, how instances should be shared between tasks, the types of communication channels and the instance types that fulfill the hardware requirements of a specific job.
 
Nephele offers three types of communication channels that can be defined between tasks. A Network Channel establishes a TCP connection between two vertices and allows pipelined processing. This means that records emitted from one task can be consumed by the following task immediately, without being persistently stored. Tasks connected with this type of channel are allowed to reside in different instances. Network channels are the default type of communication channel chosen by the Nephele, if the user does not specify a type. Subtasks scheduled to run on the same instance can be connected by an In-Memory Channel. This is the most effective type of communication and is performed using the instance’s main memory, also allowing data pipelining. The third type of communication is through File Channels. Tasks that are connected through this type of channel use the local file system to communicate. The output of the first task is written to an intermediate file, which the serves as the input of the second task.

And now what?

Pig's execution plans can also be represented as DAGs. The challenge now is to study how to convert Pig's plans into Nephele Job graphs. This would be an approach that would skip Stratosphere's programming model layer. It is not clear what implications such a decision could have perfomace-wise. On one hand, skipping one layer of execution could definitely lead to performance gains. However, the PACT compiler is designed to perfom several optimizations when translating PACT programs into Nephele DAGs. It is my wish is to implement and evaluate both alternatives. Let's hope I will have enough time for that!

Unti next time, happy coding!

V.

myThesisProject.init();

Hej hej from white Stockholm!
Christmas holidays are over and I’m back in town!

This will be the last semester of my MSc, during which I will be working on my thesis in collaboration with the Swedish Institute of Computer Science (SICS). I am very excited about the project and this is the first of a series of posts I intend to do, describing my progress and discoveries :-)

So what is this super-interesting project I’m going to work on?

Before getting to that, I will have to make a small introduction on two systems:
Apache Pig and Stratosphere.

Pig is a platform for analyzing big data sets. It consists of a high-level declarative language, Pig Latin, and an execution engine that “translates” Pig scripts into Map-Reduce jobs.

Stratosphere is a data-processing framework, under research by TU Berlin. It provides a programming model for writing parallel data analysis applications and an execution engine, Nephele, able to execute dataflow graphs in parallel. You can think of it as an extension/generalization of Hadoop Map-Reduce and it also shares a lot of ideas with Dryad.

Although right now it is only possible to execute Pig scripts on top of Hadoop, Pig is designed to be modular and it should be straight-forward to deploy it on top of another execution engine. And this is exactly the initial idea of the project. Additionally, the current state of the project appears to have some limitations that make it about 1,5 times slower than native Map-Reduce at the moment. I believe that Stratosphere architecture has several features that could be exploited in order to improve performance.

I am currently in the phase of studying the Pig architecture and the existing Hadoop compiler implementation. (Oh the joy of endless Java code :p )

Soon, I will post here my first findings, so stay tuned!

Until then, sweet coding!

V.