Compression in Hadoop

Hadoop has many standard compression codecs available, namely DEFLATE (.deflate) ,gzip (.gz), bzip2 (.bz2) ,LZO (.lzo),LZ4 (.lz4),Snappy (.snappy) . Only bzip2 is splittable , it very important for mapreduce jobs that input file compression format is splittable, otherwise all the data has to be read by single map job.

CompressionCodec is an interface which will be implemented by the different compression formats. The CompressionCodec implementations will encapsulate both compression and decompression algorithms.

Compressing and decompressing example

CompressionCodec provides two methods handling compression/decompression on streams

createOutputStream(OutputStream out) – To write data to Output Stream
createInputStream(InputStream in) – To read data form a input stream

Compressing standard input
 public static void main(String[] args) throws ClassNotFoundException, IOException{ //will take compression codec as argumnet
 Class<?> codeClass=Class.forName(args[0]);
 Configuration conf = new Configuration();
 CompressionCodec codec=(CompressionCodec) ReflectionUtils.newInstance(codeClass, conf);
 CompressionOutputStream out = codec.createOutputStream(System.out);
 IOUtils.copyBytes(System.in, out, 4096, false);
 out.finish();//will not close the underlying output stream
 }

Running the jar (put the jar on class path first)

echo "Text" | yarn net.icircuit.hadoop.HadoopCompression.Compress org.apache.hadoop.io.compress.GzipCodec | gunzip -

Compress input stream
Compress input stream

You can also redirect the output to a file.

Decompressing the standard input stream

you need to pipe a compressed file to the program and it will decompress it and write it to standard out

public static void main(String[] args) throws ClassNotFoundException, IOException{ //will take compression codec as argumnet
 Class<?> codeClass=Class.forName(args[0]);
 Configuration conf = new Configuration();
 CompressionCodec codec=(CompressionCodec) ReflectionUtils.newInstance(codeClass, conf);
 CompressionInputStream in = codec.createInputStream(System.in);
 IOUtils.copyBytes(in,System.out, 4096, false);
 in.close();
 }

Running the program

cat student_marks.gz | yarn net.icircuit.hadoop.HadoopCompression.Decompress org.apache.hadoop.io.compress.GzipCodec

Decompress input stream
Decompress input stream

Hadoop provides Codec factory which can detect the compression format of a input file based on file extension, and provides appropriate CompressionCodec

CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
 CompressionCodec codec = factory.getCodec(inputPath); //inputPath is Path Object

You can register additional codes with the codec factory with the help of io.compression.codecs , by default codec factory only loads the codecs shipped with the hadoop.

If you are using hadoop/yarn to run the jars, they will load the native compression libraries for improved performance. You can disable the native library by setting io.native.lib.available to false .

You can use CodecPool API if you want to reuse the created compressors.

Compression with MapReduce

MapReduce programs should be able to read data from any point in the file. But if you use a compression that doesn’t support arbitrary reading (not splittable) , you will loose data locality. Single mapreduce program will process complete file. You can use bzip2 to avoid this problem, because bzip2 is splittable but bzip2 is slow. Best way to use fast compressors (Snappy,LZO) is to use the container file formats such as sequence files, Avro datafiles, ORCFiles , Parquet files.

You can directly feed compressed files to mapreduce programs, framework will take care of decompressing it for you, but the files extension has to be correct. If you want to compress your map/reducer output you need to handle that your self by setting few properties.

Enable Compression of map output :

by enabling the compression of map out, you can speed of the jabs as the amount data written to disk/network will be reduced. You need to use fast compression codes such as LZO,LZ4 or Snappy.

Configuration conf = new Configuration();
 conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
 conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class,CompressionCodec.class);
 Job job = new Job(conf);

If your driver program is implementing Tools interface, you can pass these settings from command line while running the job.

Enable compression on Reducer output

mapreduce.output.fileoutputformat.compress set to true to enable the compression
mapreduce.output.fileoutputformat.compress.codec provide class name (example : GzipCodec.class)

you can also set these parameters on FileOutputFormat

 FileOutputFormat.setCompressOutput(job, true);
 FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

In case of SequenceFileOutputFormat you also need set the type of compression (by default RECORD level)

SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK)

Add a Comment

Your email address will not be published. Required fields are marked *