In the previous blog we have seen how to convert a text file to Avro data file.In this blog we will see how to run a map reduce job on Avro data file. We will use the output generated by the TextToAvro job.
The job will count number of films released in each year. The map function will extract year of the film for each record and send it to reducer as a AvroKey .
Input file schema :
Though it has all the fields present in the input data, we will be using only the year
14
"type"
:{
"type"
:
"array"
,
"items"
:
"string"
}
We will be using TextOutputFormat to write the output.
Program
1
public
class
CountMoviesByYear
extends
Configured
implements
Tool{
3
public
static
void
main(String[] args)
throws
Exception {
5
int
exitCode=ToolRunner.run(
new
CountMoviesByYear(), args);
6
System.out.println(
"Exit code "
+exitCode);
10
public
int
run(String[] args)
throws
Exception {
11
Job job=Job.getInstance(getConf(),
"CountMoviesByYear"
);
12
job.setJarByClass(getClass());
13
job.setMapperClass(MovieCounterMapper.
class
);
14
job.setReducerClass(MovieCounterReducer.
class
);
16
Schema.Parser parser =
new
Schema.Parser();
17
Schema schema=parser.parse(getClass().getResourceAsStream(
"movies.avsc"
));
19
AvroJob.setInputKeySchema(job, schema);
21
AvroJob.setMapOutputKeySchema(job, Schema.create(Schema.Type.STRING));
22
AvroJob.setMapOutputValueSchema(job, Schema.create(Schema.Type.INT));
24
job.setOutputKeyClass(Text.
class
);
25
job.setOutputValueClass(IntWritable.
class
);
27
job.setInputFormatClass(AvroKeyInputFormat.
class
);
28
job.setOutputFormatClass(TextOutputFormat.
class
);
30
FileInputFormat.setInputPaths(job,
new
Path(args[
0
]));
31
FileOutputFormat.setOutputPath(job,
new
Path(args[
1
]));
32
return
job.waitForCompletion(
true
) ?
0
:
1
;
35
public
static
class
MovieCounterMapper
extends
Mapper<AvroKey<GenericRecord>,NullWritable,AvroKey<String> ,AvroValue<Integer> >{
37
public
void
map(AvroKey<GenericRecord> key,NullWritable value,Context context)
throws
IOException,InterruptedException{
38
context.write(
new
AvroKey(key.datum().get(
"year"
)),
new
AvroValue(
new
Integer(
1
)));
42
public
static
class
MovieCounterReducer
extends
Reducer<AvroKey<String> ,AvroValue<Integer>,Text,IntWritable>{
44
public
void
reduce (AvroKey<String> key,Iterable<AvroValue<Integer> > values,Context context)
throws
IOException,InterruptedException{
46
for
(AvroValue<Integer> value:values){
47
count+=value.datum().intValue();
49
context.write(
new
Text(key.datum().toString()),
new
IntWritable(count));
Running the Job
1
export HADOOP_CLASSPATH=/path/to/avro-mapred-
1.7
.
4
-hadoop2.jar
2
yarn jar HadoopAvro-
1.0
-SNAPSHOT.jar CountMoviesByYear -libjars avro-mapred-
1.7
.
4
-hadoop2.jar movies/part-r-
00000
.avro moviecount1
Output of the job
Mapreduce_on_avro_data_file