MapReduce | Advanced Features

The basic MapReduce programming explains the work flow details. But it does not cover the actual working details inside the MapReduce programming framework. This article will explain the data movement through the MapReduce architecture and the API calls used to do the actual processing. We will also discuss the customization techniques and function overriding for application specific needs.

The advanced MapReduce features describe the execution and lower level details. In normal MapReduce programming, only knowing APIs and their usage are sufficient to write applications. But inner details of MapReduce are a must to understand the actual working details and gain confidence.

Now let us discuss advanced features in the following sections.

Custom Types (Data): For user provided Mapper and Reducer, Hadoop MapReduce framework always use typed data. The data which passes through Mappers and Reducers are stored in Java objects.

  • Writable Interface: The Writable interface is one of the most important interfaces. The objects which can be marshaled to/from files and over the network use this interface. Hadoop also uses this interface to transmit data in a serialized form. Some of the classes which implements Writable interface are mentioned below
  1. Text class(It stores String data)
  2. LongWritable
  3. FloatWritable
  4. IntWritable
  5. BooleanWritable

The custom data type can also be made by implementing Writable interface. Hadoop is capable of transmitting any custom data type (which fits your requirement) that implements Writable interface.

Following is the Writable interface which is having two methods readFields and write. The first method (readFields) initializes the data of the object from the data contained in the ‘in’ binary stream. The second method (write) is used to reconstruct the object to the binary stream ‘out’. The most important contract of the entire process is that the order of read and write to the binary stream is same.

Listing1: Showing Writable interface

public interface Writable {

void readFields(DataInput in);

void write(DataOutput out);

}

Custom Types (Key): In the previous section we have discussed about the custom data type to meet application specific data requirement. It manages the value part only. Now we will also discuss about the custom key type. In Hadoop MapReduce, the Reducer processes the key in sorted order. So the custom key type needs to implement the interface called WritableComparable. The key types should also implement hashCode ().

Following is showing WritableComparable interface. It represents a Writable which is also Comparable.

Listing2: Showing WritableComparable interface

public interface WritableComparable<T>

extends Writable, Comparable<T>

How to use Custom Types: We have already discussed custom value and key types which can be processed by Hadoop. Now we will discuss the mechanism so that Hadoop can understand it. The JobConf object (which defines the job) has two methods called setOutputKeyClass () and setOutputValueClass () and these methods are used to control the value and key data types. If the Mapper produces different types which does not match Reducer then JobConf’s setMapOutputKeyClass () and setMapOutputValueClass () methods can be used to set the input type as expected by the Reducer.

Faster Performance: The default sorting process is a bit slower as it first reads the key type from a stream then parse the byte stream (using readFields() method) and then finally call the compareTo () method of the key class. The faster approach would be deciding an ordering between the keys by checking the byte streams without parsing the entire data set. To implement this faster comparison mechanism, WritableComparator class can be extended with a comparator specific to your data types. Following is the class declaration. 

Listing3: Showing WritableComparator class

public class WritableComparator

extends Object

implements RawComparator

So custom data and key types allow to use higher level data structure in Hadoop framework. In practical Hadoop application custom data type is one of the most important requirements. So this feature allows using custom writable types and provides significant performance improvement.

Input Formats: The InputFormat is one of the most important interfaces which define the input specification of a MapReduce job. Hadoop offers different types of InputFormat for interpretation of various types of input data. The most common and default is TextInputFormat which is used to read lines from a text file. Similarly SequenceFileInputFormat is used to read binary file formats.

The fundamental task of InputFormat is to read the data from the input file. Implementation of custom InputFormat is also possible as per your application need. For default TextInputFormat implementation the key is the byte offset of the line and value is the content of the line terminated by ‘\n’ character. For custom implementation, the separator can be any character and the InputFormat will parse accordingly.

The other job of InputFormat is to split the input file (data source) into fragments which are the input to map tasks. These fragments/splits are encapsulated in the instances of InputSplit interface. The input data source can be anything like a database table, xml file or some other file. So the split will be performed based on the application requirement. The most important point is that the split operation should be fast and cheap.

After splitting the files, read operation from individual splits are very important. The RecordReader is responsible for reading the data from the splits. The RecordReader should be efficient enough to handle the fact that the splits do not always end neatly at the end of a line. The RecordReader always reads till the end of the line even if it crosses the theoretical end of a split. This feature is very important to avoid missing of records which might have crossed the InputSplit boundaries.

  • Custom InputFormat: In basic applications InputFormat is used directly. But for custom read the best way is to subclass FileInputFormat. This abstract class provides functionalities to manipulate files as per application requirement. For custom parsing, the getRecordReader () method must be overridden which returns an instance of RecordReader. This RecordReader is the responsible for reading and parsing.
  • Alternate Source (Data): The InputFormat describes two things, first is the presentation of data to the Mapper and the second is the data source. Most of the implementations are based on the FileInputFormat, where the data source is local file system of HDFS (Hadoop Distributed File System).But for other types of data sources, custom implementation of InputFormat is required. For example, NoSQL database like HBase provides TableInputFormat for reading data from database tables. So the data source can be anything which can be handled by custom implementation.

Output Formats: The OutputFormat is responsible for write operation. We have already discussed that InputFormat and RecordReader interfaces are responsible for reading data into MapReduce program. After processing the data, the write operation to the permanent storage is managed by OutputFormat and RecordWriter interfaces. The default format is TextOutputFormat which writes the key/value pairs as strings to the output file. The other output format is SequenceFileOutputFormat and it keeps the data in binary form. All these classes use write () and readFields () methods of Writable classes.

The OutputFormat implementation needs to be customized to write data in a custom format. The FileOutputFormat abstract class must be extended to make the customization. The JobConf.setOutputFormat () method must be modified to use different custom format.

Data Partitioning: Partitioning can be defined as a process that determines which Reducer instance will receive which intermediate key/value pair. Each Mapper should determine the destination Reducer for all its output key/value pairs. The most important point is that for any key regardless of its Mapper instance, the destination partition is the same. For performance reason Mappers never communicate with each other to the partition of a particular key.

The Partitioner interface is used by the Hadoop system to determine the destination partition for a key/value pair. The number of partitions should match with the number of reduce tasks. The MapReduce framework determines the number of partitions when a job starts.

Following is the signature of Partitioner interface.

Listing 4: Showing Partitioner interface

public interface Partitioner<K2,V2>

extends JobConfigurable

Conclusion: In this discussion we have covered the most important Hadoop MapReduce features. These features are helpful for customization purpose. In practical MapReduce applications, the default implementation of APIs does not have much usage. Rather, the custom features (which are based on the exposed APIs) have significant impact. All these customizations can be done easily once the concepts are clear. Hope this article will be helpful for understanding the advanced features and their implementation.

Praveen Hanchinal

Praveen Hanchinal is an Educator, IT Consultant, Professional Speaker on Artificial Intelligence (AI, ML, DL), Cloud, Big Data, IoT (Internet of Things) and BlockChain. Have been working on AI, Cloud, Big Data, IoT technologies for 11+ years. He is a Team Lead, Educator, IT Consultant, trains and gives talks on topics of his interest and educates people. Trained around 11000+ people which include teachers, students, industry professionals and government officials on recent technologies.

You may also like...

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.