The Nephele Execution Engine

This post is an overview of Nephele, Stratosphere's execution engine. The programming model provided with Stratosphere is PACT (and will be covered in a future post). However, it is possible to submit jobs directly to the Nephele engine, in the form of Directed Acyclic Graphs (DAGs), where each vertex of the graph represents a task of the job. There are three types of vertices: Input vertices, Output vertices and Task vertices. The edges of the graph correspond to the communication channels between tasks.
 
Why choose Nephele over other engines? 
 
One big advantage of Nephele is the high degree of parametrization it offers, which could lead to several optimizations. It is possible for the user to set the degree of data parallelism per task or explicitly specify the type of communication channels between nodes. More importantly, Nephele supports dynamic resource allocation. In contrast to MapReduce ans Dryad which are designed to work on static cluster environmnets, Nephele is capable of allocating resources from a Cloud environmnet depending on the workload.

Architecture Overview

Nephele consists of several communicating components that are presented next in more detail. An overview of the Nephele architecture is shown in the following diagram.

In order to submit a job to the Nephele engine, a Client has to communicate with the the Job Manager. The Job Manager is unique in the system and is responsible for scheduling the jobs it receives and coordinating their execution.

The resources required for job execution are managed by the Instance Manager. The Instance Manager allocates or deallocates virtual machines, depending on the workload of the current execution phase. The jobs are executed in parallel by instances, each of which is controlled by a Task Manager. The Task Manager communicates with the Job Manager and is assigned jobs for execution. During execution, each Task Manager sends information about changes in the execution state of the job (completed, failed, etc.). Task Managers also periodically send heartbeats to the Job Manager, which are then propagated to the Instance Manager. This way, the Instance Manager is  keeping track of the availability of running instances. If a Task Manager has not sent a heartbeat in the given heartbeat interval, the host is assumed to be dead. The Instance Manager then removes the respective Task Manager from the set of compute resources and calls the scheduler to take appropriate actions.

When the Job Manager receives a job graph from the Client, it decides how many and what types of instances need to be launched. Once all virtual machines have booted up, execution is triggered. Persistent storage, accessible from both the Job and Task Managers, is needed to store the jobs’ input and output data.
 

Nephele Jobs
 
Jobs in Nephele are defined as Directed Acyclic Graphs (DAGs). Each graph vertex represents one task and each edge indicates communication flow between tasks. Three types of vertices can be defined: Task vertex, Input vertex and Output vertex. The Input and Output vertices define how data is read or written to disk. The Task vertices are where the actual user code is executed.
 
Nephele defines a default strategy for setting up the execution of a job. However, there is a set of parameters that the user can tune in order to make execution more efficient. These parameters include the number of parallel subtasks, the number of subtasks per instance, how instances should be shared between tasks, the types of communication channels and the instance types that fulfill the hardware requirements of a specific job.
 
Nephele offers three types of communication channels that can be defined between tasks. A Network Channel establishes a TCP connection between two vertices and allows pipelined processing. This means that records emitted from one task can be consumed by the following task immediately, without being persistently stored. Tasks connected with this type of channel are allowed to reside in different instances. Network channels are the default type of communication channel chosen by the Nephele, if the user does not specify a type. Subtasks scheduled to run on the same instance can be connected by an In-Memory Channel. This is the most effective type of communication and is performed using the instance’s main memory, also allowing data pipelining. The third type of communication is through File Channels. Tasks that are connected through this type of channel use the local file system to communicate. The output of the first task is written to an intermediate file, which the serves as the input of the second task.

And now what?

Pig's execution plans can also be represented as DAGs. The challenge now is to study how to convert Pig's plans into Nephele Job graphs. This would be an approach that would skip Stratosphere's programming model layer. It is not clear what implications such a decision could have perfomace-wise. On one hand, skipping one layer of execution could definitely lead to performance gains. However, the PACT compiler is designed to perfom several optimizations when translating PACT programs into Nephele DAGs. It is my wish is to implement and evaluate both alternatives. Let's hope I will have enough time for that!

Unti next time, happy coding!

V.