,谁说程序员不能有文艺范?
TextInputFormat提供了对文本文件的处理方式,通过InputSplit进行分片(FileSplit),每一个分片分别new一个LineRecordReader进行读取解析,解析得到的每一行以<key, value>的形式传给Mapper的map()函数。
应用示例:随机生成100个小数并求最大值。
MapReduce自带的输入类型都是基于HDFS的,本示例不从HDFS读取数据,而是从内存中生成100个小数,然后求最大值。
自定义InputFormat
package com.lucl.hadoop.mapreduce.rand;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Random;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.InputFormat;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * @author luchunli * @description 自定义InputFormat */public class RandomInputFormat extends InputFormat{ public static float [] floatValues = null; /** 自定义分片规则 **/ @Override public List getSplits(JobContext context) throws IOException, InterruptedException { // 初始化数组的长度 int NumOfValues = context.getConfiguration().getInt("lucl.random.nums", 100); floatValues = new float[NumOfValues]; Random random = new Random (); for (int i = 0; i < NumOfValues; i++) { floatValues[i] = random.nextFloat(); } System.out.println("生成的随机数的值如下:"); for (float f : floatValues) { System.out.println(f); } System.out.println("===================="); // 如下代码表示指定两个map task来处理这100个小数,每个map task处理50个小数 // 初始化split分片数目,split分片的数量等于map任务的数量,但是也可以通过配置参数mapred.map.tasks来指定 // 如果该参数和splits的切片数不一致时,map task的数目如何确定,后续再通过代码分析 int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2); int begin = 0; // Math.floor是为了下取整,这里是100刚好整除,如果是99的话Math.floor的值是49.0 // 50 int length = (int)Math.floor(NumOfValues / NumSplits); // end = 49,第一个split的范围就是0~49 int end = length - 1; // 默认的FileInputFormat类的getSplits方法中是通过文件数目和blocksize进行分的, // 文件超过一个块会分成多个split,否则一个文件一个split分片 List splits = new ArrayList (); for (int i = 0; i < NumSplits - 1; i++) { // 2个splits分片,分别为0和1 RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); // begin是上一个split切片的下一个值 begin = end + 1; // 50 // 切片的长度不变,结束位置为起始位置+分片的长度,而数组下标是从0开始的, // 因此结束位置应该是begin加长度-1 end = begin + (length - 1); // 50 + (50 -1) = 99 } RandomInputSplit split = new RandomInputSplit(begin, end); splits.add(split); /** * * 通过默认的TextInputFormat来实现的时候,如果有两个小文件,则splits=2,参见: * http://luchunli.blog.51cto.com/2368057/1676185 **/ return splits; } @Override public RecordReadercreateRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new RandomRecordReader(); }}
自定义InputSplit
package com.lucl.hadoop.mapreduce.rand;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputSplit;/** * @author luchunli * @description * 自定义InputSplit,参照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit} * * FileSplit是针对HDFS上文件的实现,因此其属性包括文件绝对路径(Path)、分片起始位置(start)、 * 分片长度(length)、副本信息(保存Block复本数据到的主机数组)。 * * 自定义的InputSplit是针对内存中的数组数据进行的处理,因此无需记录文件路径及副本信息,只需要记录对数组分片的起始位置、分片长度即可。 * */public class RandomInputSplit extends InputSplit implements Writable { private int start; private int end; private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class); public RandomInputSplit () {} /** * Constructs a split * * @param start * @param end * */ public RandomInputSplit (int start, int end) { this.start = start; this.end = end; int len = this.end - this.start + 1; int index = start; FloatWritable [] result = new FloatWritable[len]; for (int i = 0; i < len; i++) { float f = RandomInputFormat.floatValues[index]; FloatWritable fw = new FloatWritable(f); result[i] = fw; index++; } floatArray.set(result); // System.out.println("查看分片数据:");// for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) {// System.out.println(fw.get());// }// System.out.println("====================="); } @Override public long getLength() throws IOException, InterruptedException { return this.end - this.start; } @Override public String[] getLocations() throws IOException, InterruptedException { return new String[]{"dnode1", "dnode2"}; } @Override public void readFields(DataInput in) throws IOException { this.start = in.readInt(); this.end = in.readInt(); this.floatArray.readFields(in); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.getStart()); out.writeInt(this.getEnd()); this.floatArray.write(out); } public int getStart() { return start; } public void setStart(int start) { this.start = start; } public int getEnd() { return end; } public void setEnd(int end) { this.end = end; } public ArrayWritable getFloatArray() { return floatArray; } @Override public String toString() { return this.getStart() + "-" + this.getEnd(); }}
自定义RecordReader
package com.lucl.hadoop.mapreduce.rand;import java.io.IOException;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * @author luchunli * @description 自定义RecordReader * */public class RandomRecordReader extends RecordReader{ private int start; private int end; private int index; private IntWritable key = null; private ArrayWritable value = null; private RandomInputSplit rsplit = null; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.rsplit = (RandomInputSplit)split; this.start = this.rsplit.getStart(); this.end = this.rsplit.getEnd(); this.index = this.start; } @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (null == key) { key = new IntWritable(); } if (null == value) { value = new ArrayWritable(FloatWritable.class); } if (this.index <= this.end) { key.set(this.index); value = rsplit.getFloatArray(); index = end + 1; return true; } return false; } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public ArrayWritable getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { if (this.index == this.end) { return 0F; } return Math.min(1.0F, (this.index - this.start) / (float)(this.end - this.start)); } @Override public void close() throws IOException { // ...... }}
实现Mapper
package com.lucl.hadoop.mapreduce.rand;import java.io.File;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;/** * @author luchunli * @description Mapper */public class RandomMapper extends Mapper{ private static final IntWritable one = new IntWritable(1); @Override protected void setup(Context context) throws IOException, InterruptedException { // 为了查看当前map是在那台机器上执行的,在该机器上创建个随机文件, // 执行完成后到DN节点对应目录下查看即可 SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); File file = new File("/home/hadoop", "Mapper-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void map(IntWritable key, ArrayWritable value, Context context) throws IOException, InterruptedException { FloatWritable [] floatArray = (FloatWritable[])value.toArray(); float maxValue = floatArray[0].get(); float tmp = 0; for (int i = 1; i < floatArray.length; i++) { tmp = floatArray[i].get(); if (tmp > maxValue) { maxValue = tmp; } } // 这里必须要保证多个map输出的key是一样的,否则reduce处理时会认为是不同的数据, // shuffle会分成多个组,导致每个map task算出一个最大值 context.write(one, new FloatWritable(maxValue)); }}
实现Reducer
package com.lucl.hadoop.mapreduce.rand;import java.io.File;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * @author luchunli * @description Rducer */public class RandomReducer extends Reducer{ @Override protected void setup(Context context) throws IOException, InterruptedException { SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss"); // 为了查看当前reduce是在那台机器上执行的,在该机器上创建个随机文件 File file = new File("/home/hadoop", "Reducer-" + format.format(new Date())); if (!file.exists()) { file.createNewFile(); } } @Override protected void reduce(IntWritable key, Iterable value, Context context) throws IOException, InterruptedException { Iterator it = value.iterator(); float maxValue = 0; float tmp = 0; if (it.hasNext()) { maxValue = it.next().get(); } else { context.write(new Text("The max value is : "), new FloatWritable(maxValue)); return; } while (it.hasNext()) { tmp = it.next().get(); if (tmp > maxValue) { maxValue = tmp; } } context.write(new Text("The max value is : "), new FloatWritable(maxValue)); }}
定义驱动类
package com.lucl.hadoop.mapreduce.rand;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;/** * @author luchunli * @description MapReduce自带的输入类都是基于HDFS的,如下示例代码不用从HDFS上面读取内容, * 而是在内存里面随机生成100个(0-1)float类型的小数,然后求这100个小数的最大值。 */public class RandomDriver extends Configured implements Tool { public static void main(String[] args) { try { ToolRunner.run(new RandomDriver(), args); } catch (Exception e) { e.printStackTrace(); } } @Override public int run(String[] args) throws Exception { Configuration conf = this.getConf(); conf.set("lucl.random.nums", "100"); conf.set("mapreduce.job.maps", "2"); Job job = Job.getInstance(getConf(), this.getClass().getSimpleName()); job.setJarByClass(RandomDriver.class); job.setInputFormatClass(RandomInputFormat.class); job.setMapperClass(RandomMapper.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(FloatWritable.class); job.setReducerClass(RandomReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FloatWritable.class); FileOutputFormat.setOutputPath(job, new Path(args[0])); return job.waitForCompletion(true) ? 0 : 1; } }
打包运行
[hadoop@nnode code]$ hadoop jar RandomMR.jar /20151202002715/12/02 00:28:07 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032生成的随机数的值如下:0.0200757380.7003490.96178760.82860180.033576370.550332550.1126459240.433125080.331843550.69609020.239120540.85234240.41338520.0282425880.90318140.393978710.382789670.58426540.45692240.40088810.22305370.928893270.201279940.095746460.231739040.43659060.115678550.0279440280.69659570.783119440.23656410.85753010.074726580.52190220.94099520.71225190.87224650.302889230.517736260.912117540.931724250.384843650.448441150.245897890.833616260.409832240.94449630.120615420.84466410.53035810.112955390.0940039160.118222180.49971490.982963440.487460370.314205350.11513960.79043370.800051150.183444020.81716190.87496990.480232540.00445050.438798670.223678350.629249160.69983150.2221480.73928840.41748650.45282370.700348260.30571490.291778330.227823670.81826110.466802950.47785210.63658230.439719140.270550550.268396740.52632450.88246490.511824850.204947830.76794030.319364070.134768720.472816880.34021110.287065270.0382034780.73518790.61654040.417611960.52292570.7284225====================15/12/02 00:28:08 INFO mapreduce.JobSubmitter: number of splits:215/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_001415/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_001415/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/15/12/02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_001415/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode : false15/12/02 00:28:38 INFO mapreduce.Job: map 0% reduce 0%15/12/02 00:29:13 INFO mapreduce.Job: map 100% reduce 0%15/12/02 00:29:32 INFO mapreduce.Job: map 100% reduce 100%15/12/02 00:29:32 INFO mapreduce.Job: Job job_1448981819300_0014 completed successfully15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=26 FILE: Number of bytes written=323256 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=520 HDFS: Number of bytes written=31 HDFS: Number of read operations=7 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=64430 Total time spent by all reduces in occupied slots (ms)=16195 Total time spent by all map tasks (ms)=64430 Total time spent by all reduce tasks (ms)=16195 Total vcore-seconds taken by all map tasks=64430 Total vcore-seconds taken by all reduce tasks=16195 Total megabyte-seconds taken by all map tasks=65976320 Total megabyte-seconds taken by all reduce tasks=16583680 Map-Reduce Framework Map input records=2 Map output records=2 Map output bytes=16 Map output materialized bytes=32 Input split bytes=520 Combine input records=0 Combine output records=0 Reduce input groups=1 Reduce shuffle bytes=32 Reduce input records=2 Reduce output records=1 Spilled Records=4 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=356 CPU time spent (ms)=1940 Physical memory (bytes) snapshot=513851392 Virtual memory (bytes) snapshot=2541506560 Total committed heap usage (bytes)=257171456 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=0 File Output Format Counters Bytes Written=31[hadoop@nnode code]$
查看输出结果
[hadoop@nnode code]$ hdfs dfs -ls /201512020027Found 2 items-rw-r--r-- 2 hadoop hadoop 0 2015-12-02 00:29 /201512020027/_SUCCESS-rw-r--r-- 2 hadoop hadoop 31 2015-12-02 00:29 /201512020027/part-r-00000[hadoop@nnode code]$ hdfs dfs -text /201512020027/part-r-00000The max value is : 0.98296344[hadoop@nnode code]$