Text To Avro Data File using Mapreduce
|We have already seen how to convert a text file to Avro data file using a simple java program. In this blog we will see how to process a text file and store the result in avro data file. To run map reduce jobs on Avro data files see this blog.
Input File Format :
Our input file is a movie database, contains the following information
serial number :: movie name (year)::tag1|tag2
example records
1::Toy Story (1995)::Animation|Children's|Comedy 2::Jumanji (1995)::Adventure|Children's|Fantasy 3::Grumpier Old Men (1995)::Comedy|Romance 4::Waiting to Exhale (1995)::Comedy|Drama 5::Father of the Bride Part II (1995)::Comedy 6::Heat (1995)::Action|Crime|Thriller 7::Sabrina (1995)::Comedy|Romance 8::Tom and Huck (1995)::Adventure|Children's
Avro Schema for output file :
we can ignore the serial number,we can store the tags in array.Check this blog to know more about the supported types. The resulting schema is
{ "name":"movies", "type":"record", "fields":[ {"name":"movieName", "type":"string" }, {"name":"year", "type":"string" }, {"name":"tags", "type":{"type":"array","items":"string"} } ] }
we could have stored the year in the integer format as well.
The map function will extract the different fields from the input record and constructs a generic record. The reduce function will simply write it’s key to the output, no processing is done in reducer.
Program Code
public class MRTextToAvro extends Configured implements Tool{ public static void main(String[] args) throws Exception { int exitCode=ToolRunner.run(new MRTextToAvro(),args ); System.out.println("Exit code "+exitCode); } public int run(String[] arg0) throws Exception { Job job= Job.getInstance(getConf(),"Text To Avro"); job.setJarByClass(getClass()); FileInputFormat.setInputPaths(job, new Path(arg0[0])); FileOutputFormat.setOutputPath(job, new Path(arg0[1])); Schema.Parser parser = new Schema.Parser(); Schema schema=parser.parse(getClass().getResourceAsStream("movies.avsc")); job.getConfiguration().setBoolean( Job.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true); AvroJob.setMapOutputKeySchema(job,schema); job.setMapOutputValueClass( NullWritable.class); AvroJob.setOutputKeySchema(job, schema); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(AvroKeyOutputFormat.class); job.setMapperClass(TextToAvroMapper.class); job.setReducerClass(TextToAvroReduce.class); return job.waitForCompletion(true)?0:1; } public static class TextToAvroMapper extends Mapper<LongWritable ,Text,AvroKey<GenericRecord>,NullWritable>{ Schema schema; protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); Schema.Parser parser = new Schema.Parser(); schema=parser.parse(getClass().getResourceAsStream("movies.avsc")); } public void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ GenericRecord record=new GenericData.Record(schema); String inputRecord=value.toString(); record.put("movieName", getMovieName(inputRecord)); record.put("year", getMovieRelaseYear(inputRecord)); record.put("tags", getMovieTags(inputRecord)); context.write(new AvroKey(record), NullWritable.get()); } public String getMovieName(String record){ String movieName=record.split("::")[1]; return movieName.substring(0, movieName.lastIndexOf('(' )).trim(); } public String getMovieRelaseYear(String record){ String movieName=record.split("::")[1]; return movieName.substring( movieName.lastIndexOf( '(' )+1,movieName.lastIndexOf( ')' )).trim(); } public String[] getMovieTags(String record){ return (record.split("::")[2]).split("\\|"); } } public static class TextToAvroReduce extends Reducer<AvroKey<GenericRecord>,NullWritable,AvroKey<GenericRecord>,NullWritable>{ @Override public void reduce(AvroKey<GenericRecord> key,Iterable<NullWritable> value,Context context) throws IOException, InterruptedException{ context.write(key, NullWritable.get()); } } }
To run the job we need to put the avro related lib on class path
export HADOOP_CLASSPATH=/path/to/targets/avro-mapred-1.7.4-hadoop2.jar yarn jar HadoopAvro-1.0-SNAPSHOT.jar MRTextToAvro -libjars avro-mapred-1.7.4-hadoop2.jar /input/path output/path
The resulting Avro file
Full project is available on github
What jar file should be added ? Because is showing error for me in (import org.apache.avro.*;) in this part of code
Hi,
all the dependencies are present in pom.xml. eclipse should pull them automatically
export HADOOP_CLASSPATH=/path/to/targets/avro-mapred-1.7.4-hadoop2.jar
yarn jar HadoopAvro-1.0-SNAPSHOT.jar MRTextToAvro -libjars avro-mapred-1.7.4- hadoop2.jar /input/path output/path
In this hadoop2.jar is exported jarfile from eclipse? and export hadoop_classpath is set in bashfile? can u explain the hadoop comment for executing this(like input file is record? where you are placing movies.avsc in hdfs) in detail. Im newbie to hadoop i cant understand the flow