The Nephele Execution Engine
This post is an overview of Nephele, Stratosphere's execution engine. The programming model provided with Stratosphere is PACT (and will be covered in a future post). However, it is possible to submit jobs directly to the Nephele engine, in the form of Directed Acyclic Graphs (DAGs), where each vertex of the graph represents a task of the job. There are three types of vertices: Input vertices, Output vertices and Task vertices. The edges of the graph correspond to the communication channels between tasks.
Why choose Nephele over other engines?
One big advantage of Nephele is the high degree of parametrization it offers, which could lead to several optimizations. It is possible for the user to set the degree of data parallelism per task or explicitly specify the type of communication channels between nodes. More importantly, Nephele supports dynamic resource allocation. In contrast to MapReduce ans Dryad which are designed to work on static cluster environmnets, Nephele is capable of allocating resources from a Cloud environmnet depending on the workload.
Architecture Overview
Nephele consists of several communicating components that are presented next in more detail. An overview of the Nephele architecture is shown in the following diagram.
Nephele Jobs
Jobs in Nephele are defined as Directed Acyclic Graphs (DAGs). Each graph vertex represents one task and each edge indicates communication flow between tasks. Three types of vertices can be defined: Task vertex, Input vertex and Output vertex. The Input and Output vertices define how data is read or written to disk. The Task vertices are where the actual user code is executed.
Nephele defines a default strategy for setting up the execution of a job. However, there is a set of parameters that the user can tune in order to make execution more efficient. These parameters include the number of parallel subtasks, the number of subtasks per instance, how instances should be shared between tasks, the types of communication channels and the instance types that fulfill the hardware requirements of a specific job.
Nephele offers three types of communication channels that can be defined between tasks. A Network Channel establishes a TCP connection between two vertices and allows pipelined processing. This means that records emitted from one task can be consumed by the following task immediately, without being persistently stored. Tasks connected with this type of channel are allowed to reside in different instances. Network channels are the default type of communication channel chosen by the Nephele, if the user does not specify a type. Subtasks scheduled to run on the same instance can be connected by an In-Memory Channel. This is the most effective type of communication and is performed using the instance’s main memory, also allowing data pipelining. The third type of communication is through File Channels. Tasks that are connected through this type of channel use the local file system to communicate. The output of the first task is written to an intermediate file, which the serves as the input of the second task.
And now what?
Pig's execution plans can also be represented as DAGs. The challenge now is to study how to convert Pig's plans into Nephele Job graphs. This would be an approach that would skip Stratosphere's programming model layer. It is not clear what implications such a decision could have perfomace-wise. On one hand, skipping one layer of execution could definitely lead to performance gains. However, the PACT compiler is designed to perfom several optimizations when translating PACT programs into Nephele DAGs. It is my wish is to implement and evaluate both alternatives. Let's hope I will have enough time for that!
Unti next time, happy coding!
V.

