,谁说程序员不能有文艺范?


 

TextInputFormat提供了对文本文件的处理方式,通过InputSplit进行分片(FileSplit),每一个分片分别new一个LineRecordReader进行读取解析,解析得到的每一行以<key, value>的形式传给Mapper的map()函数。

应用示例:随机生成100个小数并求最大值。

MapReduce自带的输入类型都是基于HDFS的,本示例不从HDFS读取数据,而是从内存中生成100个小数,然后求最大值。

自定义InputFormat

package com.lucl.hadoop.mapreduce.rand;import java.io.IOException;import java.util.ArrayList;import java.util.List;import java.util.Random;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.InputFormat;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * @author luchunli * @description 自定义InputFormat */public class RandomInputFormat extends InputFormat
 {    public static float [] floatValues = null;        /** 自定义分片规则 **/    @Override    public List
 getSplits(JobContext context) throws IOException,            InterruptedException {        // 初始化数组的长度        int NumOfValues = context.getConfiguration().getInt("lucl.random.nums", 100);        floatValues = new float[NumOfValues];                Random random = new Random ();        for (int i = 0; i < NumOfValues; i++) {            floatValues[i] = random.nextFloat();        }        System.out.println("生成的随机数的值如下:");        for (float f : floatValues) {            System.out.println(f);        }        System.out.println("====================");                // 如下代码表示指定两个map task来处理这100个小数,每个map task处理50个小数        // 初始化split分片数目,split分片的数量等于map任务的数量,但是也可以通过配置参数mapred.map.tasks来指定        // 如果该参数和splits的切片数不一致时,map task的数目如何确定,后续再通过代码分析        int NumSplits = context.getConfiguration().getInt("mapreduce.job.maps", 2);        int begin = 0;        // Math.floor是为了下取整,这里是100刚好整除,如果是99的话Math.floor的值是49.0        // 50        int length = (int)Math.floor(NumOfValues / NumSplits);            // end = 49,第一个split的范围就是0~49        int end = length - 1;                    // 默认的FileInputFormat类的getSplits方法中是通过文件数目和blocksize进行分的,        // 文件超过一个块会分成多个split,否则一个文件一个split分片        List
 splits = new ArrayList
();        for (int i = 0; i < NumSplits - 1; i++) {    // 2个splits分片,分别为0和1            RandomInputSplit split = new RandomInputSplit(begin, end);            splits.add(split);                        // begin是上一个split切片的下一个值            begin = end + 1;        // 50            // 切片的长度不变,结束位置为起始位置+分片的长度,而数组下标是从0开始的,            // 因此结束位置应该是begin加长度-1            end = begin + (length - 1);    // 50 + (50 -1) = 99        }        RandomInputSplit split = new RandomInputSplit(begin, end);        splits.add(split);                /**         * 
         *     通过默认的TextInputFormat来实现的时候,如果有两个小文件,则splits=2,参见:         *     http://luchunli.blog.51cto.com/2368057/1676185         * 
         */                return splits;    }    @Override    public RecordReader
 createRecordReader(InputSplit split,            TaskAttemptContext context) throws IOException, InterruptedException {        return new RandomRecordReader();    }}

自定义InputSplit

package com.lucl.hadoop.mapreduce.rand;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.InputSplit;/** * @author luchunli * @description  *     自定义InputSplit,参照了{@link org.apache.hadoop.mapreduce.lib.input.Filesplit} *  
 *     FileSplit是针对HDFS上文件的实现,因此其属性包括文件绝对路径(Path)、分片起始位置(start)、 *     分片长度(length)、副本信息(保存Block复本数据到的主机数组)。 *     
 *  自定义的InputSplit是针对内存中的数组数据进行的处理,因此无需记录文件路径及副本信息,只需要记录对数组分片的起始位置、分片长度即可。 *  */public class RandomInputSplit extends InputSplit implements Writable {    private int start;    private int end;    private ArrayWritable floatArray = new ArrayWritable(FloatWritable.class);        public RandomInputSplit () {}        /**     * Constructs a split      *      * @param start     * @param end      *     */    public RandomInputSplit (int start, int end) {        this.start = start;        this.end = end;                int len = this.end - this.start + 1;        int index = start;        FloatWritable [] result = new FloatWritable[len];        for (int i = 0; i < len; i++) {            float f = RandomInputFormat.floatValues[index];            FloatWritable fw = new FloatWritable(f);                        result[i] = fw;                        index++;        }        floatArray.set(result);        //        System.out.println("查看分片数据:");//        for (FloatWritable fw : (FloatWritable[])floatArray.toArray()) {//            System.out.println(fw.get());//        }//        System.out.println("=====================");    }    @Override    public long getLength() throws IOException, InterruptedException {        return this.end - this.start;    }    @Override    public String[] getLocations() throws IOException, InterruptedException {        return new String[]{"dnode1", "dnode2"};    }    @Override    public void readFields(DataInput in) throws IOException {        this.start = in.readInt();        this.end = in.readInt();        this.floatArray.readFields(in);            }    @Override    public void write(DataOutput out) throws IOException {        out.writeInt(this.getStart());        out.writeInt(this.getEnd());        this.floatArray.write(out);    }    public int getStart() {        return start;    }    public void setStart(int start) {        this.start = start;    }    public int getEnd() {        return end;    }    public void setEnd(int end) {        this.end = end;    }    public ArrayWritable getFloatArray() {        return floatArray;    }    @Override    public String toString() {        return this.getStart() + "-" + this.getEnd();    }}

自定义RecordReader

package com.lucl.hadoop.mapreduce.rand;import java.io.IOException;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.InputSplit;import org.apache.hadoop.mapreduce.RecordReader;import org.apache.hadoop.mapreduce.TaskAttemptContext;/** * @author luchunli * @description 自定义RecordReader * */public class RandomRecordReader extends RecordReader
 {    private int start;    private int end;    private int index;    private IntWritable key = null;    private ArrayWritable value = null;    private RandomInputSplit rsplit = null;        @Override    public void initialize(InputSplit split, TaskAttemptContext context)            throws IOException, InterruptedException {        this.rsplit = (RandomInputSplit)split;        this.start = this.rsplit.getStart();        this.end = this.rsplit.getEnd();        this.index = this.start;    }    @Override    public boolean nextKeyValue() throws IOException, InterruptedException {        if (null == key) {            key = new IntWritable();        }        if (null == value) {            value = new ArrayWritable(FloatWritable.class);        }        if (this.index <= this.end) {            key.set(this.index);            value = rsplit.getFloatArray();            index = end + 1;            return true;        }         return false;    }    @Override    public IntWritable getCurrentKey() throws IOException, InterruptedException {        return key;    }    @Override    public ArrayWritable getCurrentValue() throws IOException, InterruptedException {        return value;    }    @Override    public float getProgress() throws IOException, InterruptedException {        if (this.index == this.end) {            return 0F;        }        return Math.min(1.0F, (this.index - this.start) / (float)(this.end - this.start));    }    @Override    public void close() throws IOException {        // ......    }}

实现Mapper

package com.lucl.hadoop.mapreduce.rand;import java.io.File;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import org.apache.hadoop.io.ArrayWritable;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Mapper;/** * @author luchunli * @description Mapper */public class RandomMapper extends Mapper
 {    private static final IntWritable one = new IntWritable(1);        @Override    protected void setup(Context context) throws IOException, InterruptedException {        // 为了查看当前map是在那台机器上执行的,在该机器上创建个随机文件,        // 执行完成后到DN节点对应目录下查看即可        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss");        File file = new File("/home/hadoop", "Mapper-" + format.format(new Date()));        if (!file.exists()) {            file.createNewFile();        }    }        @Override    protected void map(IntWritable key, ArrayWritable value, Context context)            throws IOException, InterruptedException {        FloatWritable [] floatArray = (FloatWritable[])value.toArray();        float maxValue = floatArray[0].get();        float tmp = 0;                for (int i = 1; i < floatArray.length; i++) {            tmp = floatArray[i].get();            if (tmp > maxValue) {                maxValue = tmp;            }        }                // 这里必须要保证多个map输出的key是一样的,否则reduce处理时会认为是不同的数据,        // shuffle会分成多个组,导致每个map task算出一个最大值        context.write(one, new FloatWritable(maxValue));    }}

实现Reducer

package com.lucl.hadoop.mapreduce.rand;import java.io.File;import java.io.IOException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;/** * @author luchunli * @description Rducer */public class RandomReducer extends Reducer
 {    @Override    protected void setup(Context context) throws IOException, InterruptedException {        SimpleDateFormat format = new SimpleDateFormat("yyyyMMddhhmmss");        // 为了查看当前reduce是在那台机器上执行的,在该机器上创建个随机文件        File file = new File("/home/hadoop", "Reducer-" + format.format(new Date()));        if (!file.exists()) {            file.createNewFile();        }    }        @Override    protected void reduce(IntWritable key, Iterable
 value, Context context)            throws IOException, InterruptedException {        Iterator
 it = value.iterator();        float maxValue = 0;        float tmp = 0;        if (it.hasNext()) {            maxValue = it.next().get();        } else {            context.write(new Text("The max value is : "), new FloatWritable(maxValue));            return;        }                while (it.hasNext()) {            tmp = it.next().get();            if (tmp > maxValue) {                maxValue = tmp;            }        }        context.write(new Text("The max value is : "), new FloatWritable(maxValue));    }}

定义驱动类

package com.lucl.hadoop.mapreduce.rand;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.FloatWritable;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider.Text;/** * @author luchunli * @description MapReduce自带的输入类都是基于HDFS的,如下示例代码不用从HDFS上面读取内容, * 而是在内存里面随机生成100个(0-1)float类型的小数,然后求这100个小数的最大值。 */public class RandomDriver extends Configured implements Tool {    public static void main(String[] args) {        try {            ToolRunner.run(new RandomDriver(), args);        } catch (Exception e) {            e.printStackTrace();        }    }        @Override    public int run(String[] args) throws Exception {        Configuration conf = this.getConf();        conf.set("lucl.random.nums", "100");        conf.set("mapreduce.job.maps", "2");                Job job = Job.getInstance(getConf(), this.getClass().getSimpleName());                job.setJarByClass(RandomDriver.class);                job.setInputFormatClass(RandomInputFormat.class);                job.setMapperClass(RandomMapper.class);        job.setMapOutputKeyClass(IntWritable.class);        job.setMapOutputValueClass(FloatWritable.class);                job.setReducerClass(RandomReducer.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(FloatWritable.class);                FileOutputFormat.setOutputPath(job, new Path(args[0]));                return job.waitForCompletion(true) ? 0 : 1;    }    }

打包运行

[hadoop@nnode code]$ hadoop jar RandomMR.jar /20151202002715/12/02 00:28:07 INFO client.RMProxy: Connecting to ResourceManager at nnode/192.168.137.117:8032生成的随机数的值如下:0.0200757380.7003490.96178760.82860180.033576370.550332550.1126459240.433125080.331843550.69609020.239120540.85234240.41338520.0282425880.90318140.393978710.382789670.58426540.45692240.40088810.22305370.928893270.201279940.095746460.231739040.43659060.115678550.0279440280.69659570.783119440.23656410.85753010.074726580.52190220.94099520.71225190.87224650.302889230.517736260.912117540.931724250.384843650.448441150.245897890.833616260.409832240.94449630.120615420.84466410.53035810.112955390.0940039160.118222180.49971490.982963440.487460370.314205350.11513960.79043370.800051150.183444020.81716190.87496990.480232540.00445050.438798670.223678350.629249160.69983150.2221480.73928840.41748650.45282370.700348260.30571490.291778330.227823670.81826110.466802950.47785210.63658230.439719140.270550550.268396740.52632450.88246490.511824850.204947830.76794030.319364070.134768720.472816880.34021110.287065270.0382034780.73518790.61654040.417611960.52292570.7284225====================15/12/02 00:28:08 INFO mapreduce.JobSubmitter: number of splits:215/12/02 00:28:08 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1448981819300_001415/12/02 00:28:09 INFO impl.YarnClientImpl: Submitted application application_1448981819300_001415/12/02 00:28:09 INFO mapreduce.Job: The url to track the job: http://nnode:8088/proxy/application_1448981819300_0014/15/12/02 00:28:09 INFO mapreduce.Job: Running job: job_1448981819300_001415/12/02 00:28:38 INFO mapreduce.Job: Job job_1448981819300_0014 running in uber mode : false15/12/02 00:28:38 INFO mapreduce.Job:  map 0% reduce 0%15/12/02 00:29:13 INFO mapreduce.Job:  map 100% reduce 0%15/12/02 00:29:32 INFO mapreduce.Job:  map 100% reduce 100%15/12/02 00:29:32 INFO mapreduce.Job: Job job_1448981819300_0014 completed successfully15/12/02 00:29:32 INFO mapreduce.Job: Counters: 49        File System Counters                FILE: Number of bytes read=26                FILE: Number of bytes written=323256                FILE: Number of read operations=0                FILE: Number of large read operations=0                FILE: Number of write operations=0                HDFS: Number of bytes read=520                HDFS: Number of bytes written=31                HDFS: Number of read operations=7                HDFS: Number of large read operations=0                HDFS: Number of write operations=2        Job Counters                 Launched map tasks=2                Launched reduce tasks=1                Data-local map tasks=2                Total time spent by all maps in occupied slots (ms)=64430                Total time spent by all reduces in occupied slots (ms)=16195                Total time spent by all map tasks (ms)=64430                Total time spent by all reduce tasks (ms)=16195                Total vcore-seconds taken by all map tasks=64430                Total vcore-seconds taken by all reduce tasks=16195                Total megabyte-seconds taken by all map tasks=65976320                Total megabyte-seconds taken by all reduce tasks=16583680        Map-Reduce Framework                Map input records=2                Map output records=2                Map output bytes=16                Map output materialized bytes=32                Input split bytes=520                Combine input records=0                Combine output records=0                Reduce input groups=1                Reduce shuffle bytes=32                Reduce input records=2                Reduce output records=1                Spilled Records=4                Shuffled Maps =2                Failed Shuffles=0                Merged Map outputs=2                GC time elapsed (ms)=356                CPU time spent (ms)=1940                Physical memory (bytes) snapshot=513851392                Virtual memory (bytes) snapshot=2541506560                Total committed heap usage (bytes)=257171456        Shuffle Errors                BAD_ID=0                CONNECTION=0                IO_ERROR=0                WRONG_LENGTH=0                WRONG_MAP=0                WRONG_REDUCE=0        File Input Format Counters                 Bytes Read=0        File Output Format Counters                 Bytes Written=31[hadoop@nnode code]$

查看输出结果

[hadoop@nnode code]$ hdfs dfs -ls /201512020027Found 2 items-rw-r--r--   2 hadoop hadoop          0 2015-12-02 00:29 /201512020027/_SUCCESS-rw-r--r--   2 hadoop hadoop         31 2015-12-02 00:29 /201512020027/part-r-00000[hadoop@nnode code]$ hdfs dfs -text /201512020027/part-r-00000The max value is :      0.98296344[hadoop@nnode code]$