[ Pobierz całość w formacie PDF ]
.Another way is to spec-ify a CustomComparator in the job configuration via the setOutputKeyComparatorClass() method on theJobConf object.An example of implementing a custom comparator is provided in Chapter 9.You also need to provide a mapper class that performs the transforma-tion.The sample mapper class TransformKeysToLongMapper.java does this.TheTransformKeysToLongMapper.java class file has a number of changes from the IdentityMapperclass (shown earlier in Listing 2-2).First, the class declaration is no longer generic; the types have been made concrete:/** Transform the input Text, Text key value* pairs into LongWritable, Text key/value pairs.*/public class TransformKeysToLongMapperMapperextends MapReduceBase implements MapperNotice that the code actually provides the types for the key/value pairs for input and foroutput.The original IdentityMapper class was completely generic.In addition, the identitymapper s declaration was implements Mapper.In TransformKeysToLongMapperMapper, the declaration is implements Mapper.The map() method of TransformKeysToLongMapper is substantially different from theIdentityMapper and introduces the use of the reporter object.The Reporter ObjectThe map and reduce methods both take four parameters: the key, the value, the output collec-tor, and the reporter.The reporter object provides a mechanism for informing the frameworkof the current status of your job.The reporter object provides three methods:" incrCounter(): Provides counters that are aggregated and reported at the end ofthe job." setStatus(): Provides a status line for this map or reduce task." getInputSplit(): Returns information about the input source for this task.If the inputis simple files, this can provide useful information for log messages.Each call on the reporter object or the output collector provides a heartbeat to the frame-work, informing it that the task is not deadlocked or otherwise unresponsive.If your map orreduce method takes substantial time, the method must make periodic calls on the reporter58CHAPTER 2 %ÿþ THE BASICS OF A MAPREDUCE JOBobject methods, to inform the framework that it is still working.The framework will kill tasksthat have not reported in 600 seconds by default.Listing 2-6 shows the body of the TransformKeysToLongMapper mapper that uses thereporter object.Listing 2-6.The Reporter Object in TransformKeysToLongMapper.java/** Map input to the output, transforming the input {@link Text}* keys into {@link LongWritable} keys.* The values are passed through unchanged.** Report on the status of the job.* @param key The input key, supplied by the framework, a {@link Text} value.* @param value The input value, supplied by the framework, a {@link Text} value.* @param output The {@link OutputCollector} that takes* {@link LongWritable}, {@link Text} pairs.* @param reporter The object that provides a way* to report status back to the framework.* @exception IOException if there is any error.*/public void map(Text key, Text value,OutputCollector output, Reporter reporter)throws IOException {try {try {reporter.incrCounter( "Input", "total records", 1 );LongWritable newKey =new LongWritable( Long.parseLong( key.toString() ) );reporter.incrCounter( "Input", "parsed records", 1 );output.collect(newKey, value);} catch( NumberFormatException e ) {/** This is a somewhat expected case and we handle it specially.*/logger.warn( "Unable to parse key as a long for key,"+" value " + key + " " + value, e );reporter.incrCounter( "Input", "number format", 1 );return;}} catch( Throwable e ) {/** It is very important to report back if there were* exceptions in the mapper.* In particular it is very handy to report the number of exceptions.* If this is done, the driver can make better assumptions* on the success or failure of the job.*/59CHAPTER 2 %ÿþ THE BASICS OF A MAPREDUCE JOBlogger.error( "Unexpected exception in mapper for key,"+ " value " + key + ", " + value, e );reporter.incrCounter( "Input", "Exception", 1 );reporter.incrCounter( "Exceptions", e.getClass().getName(), 1 );if (e instanceof IOException) {throw (IOException) e;}if (e instanceof RuntimeException) {throw (RuntimeException) e;}throw new IOException( "Unknown Exception", e );}}This block of code introduces a new object, reporter, and some best practice patterns.The key piece of this is the transformation of the Text key to a LongWritable key.LongWritable newKey = new LongWritable(Long.parseLong(key.toString()));output.collect(newKey, value);The code in Listing 2-6 is sufficient to perform the transformation, and also includes someadditional code for tracking and reporting.CODE EFFICIENCYThe pattern of creating a new key object in the mapper for the transformation object is not the most effi-cient pattern.Most key classes provide a set() method, which sets the current value of the key.Theoutput.collect() method uses the current value of the key, and once the collect() method is com-plete, the key object or the value object is free to be reused.If the job is configured to multithread the map method, via conf.setMapRunner(MultithreadedMapRunner.class), the map method will be called by multiple threads.Extreme care must be taken inusing the mapper class member variables.A ThreadLocal LongWritable object could be used to ensurethread safety.To simplify the example, a new LongWritable is constructed.In the reduce method; there areno threading issues.Object churn is a significant performance issue in a map method, and to a lesser extent, in the reducemethod.Object reuse can provide a significant performance gain
[ Pobierz całość w formacie PDF ]