查看: 892|回复: 0

[other] 【hadoop】M/R编程入门:WordCount

[复制链接]
  • TA的每日心情
    无聊
    2016-12-1 15:20
  • 签到天数: 668 天

    连续签到: 3 天

    [LV.9]以坛为家II

    发表于 2015-9-11 01:05:13 | 显示全部楼层 |阅读模式
    本帖最后由 忘忧的记忆 于 2015-9-11 01:13 编辑

    Most importantly,MapReduce programs are inherently parallel, thus
    puttingvery large-scale data analysis into the hands of anyone with enough machines
    at their disposal.
    以上是Hadoop核心技术中一句。 MapReduce 可以处理大规模的数据分析,只要你有足够的机组。  

    WordCount处理过程分析:  
    1. 将文件拆分成splits,由于测试用的文件较小,所以每个文件为一个split,并将文件按行分割形成<key,value>对,如图所示。(hadoop来做)
    MR1.png
    2.将分割好的<key,value>对交给用户定义的map方法进行处理,生成新的<key,value>对,如图所示:
    Mr2.png

    3. 得到map方法输出的<key,value>对后,Mapper会将它们按照key值进行排序,并执行Combine过程,将key至相同value值累加,得到Mapper的最终输出结果。如图所示。(hadoop来做)
    mr3.png
    4. Reducer先对从Mapper接收的数据进行排序,再交由用户自定义的reduce方法进行处理,得到新的<key,value>对,并作为WordCount的输出结果,如图所示(Combine对 key,value做了反转)
    mr4.png

    自己的理解:其实 M/R的编程实现并不难,开发者所要做的仅仅就是实现Map(分词输出)和 Reduce(统计单词个数),对于读取文本,将Map的输出合并排序反转都是hadoop来完成。

    MyMapper:
    public classMyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

    private Text word = new Text();

    public void map(LongWritable ikey, Text ivalue, Context context)
    throws IOException, InterruptedException {
    // 这里做一些改动, 最终统计输出    word:4   3   
    String line = ivalue.toString();
             //将一行 分词
    StringTokenizer tokenizer = newStringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
    String w = tokenizer.nextToken();
    word.set(w+":"+w.length());
    context.write(word, newLongWritable(1));
    }
    }
    }


    Reduce:
      
    public  class MyReducer extends Reducer<Text,  LongWritable, Text, LongWritable> {
      
      
    public void reduce(Text _key, Iterable<LongWritable> values, Context context)
      
    throws IOException, InterruptedException {
      
    // key:hello    values:{1,1,1,1,1,....}
      
    long  count = 0;
      
    for (LongWritable val : values) {
      
    count  = count +  val.get();
      
    }
      
    //返回  key: hello   value :  n
      
    context.write(_key, new  LongWritable(count));
      
    }
      
      
    }
      
    Runner:
      
      
    public  class Runner {
      
      
    public static void main(String[] args) throws Exception {
      
    Configuration conf = new  Configuration();
      
    //为程序分配一个  job
      
    Job job = Job.getInstance(conf, "JobName");
      
    job.setJarByClass(com.ddy.hadoop.mapreduce.wc.Runner.class);
      
    // 设置 M/R 的Mapper和Reduce类
      
    job.setMapperClass(MyMapper.class);
      
    job.setReducerClass(MyReducer.class);
      
             //设置Reduce 输出 key,value的类型
      
    job.setOutputKeyClass(Text.class);
      
    job.setOutputValueClass(LongWritable.class);
      
            // 读取和输出的文件路径
      
    FileInputFormat.setInputPaths(job, new Path(args[0]));
      
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
            //job完成之后 返回  
      
    if (!job.waitForCompletion(true))
      
    return;
      
    }
      
      
    }
      
      

    更多图片 小图 大图
    组图打开中,请稍候......
    您需要登录后才可以回帖 登录 | 注册

    本版积分规则

    站长推荐上一条 /1 下一条