查看: 840|回复: 0

[other] 【hadoop:M/R编程】Writable自定义类实战

[复制链接]
  • TA的每日心情
    无聊
    2016-12-1 15:20
  • 签到天数: 668 天

    连续签到: 3 天

    [LV.9]以坛为家II

    发表于 2015-9-13 00:36:42 | 显示全部楼层 |阅读模式
    本帖最后由 忘忧的记忆 于 2015-9-13 00:36 编辑



      1. 修改WordCount类中Mapper的key,改为自定义WritableCompable类型,自定义类存储每个单词的内容和单词的长度,执行M/R程序,让输出变为如下形式:
                 word                   count
      [word=Alexia, len=6]          2
      2. 修改上例中Mapper的Value, 改为自定义Writable类型,自定类型存储单词的长度和出现次数
      注意: 实现WritableCompare接口,需要重写toString,hashCode, equals方法

      hadoop 数据类型 :  
      数据类型.png




      • Writable接口
      如果要实现序列化,只需实现Writable接口即可;其接口有两个方法:
      void write(DataOutput out)
      void readFields(DataInput in)
      write方法是将对象属性序列化到DataOutput中去,readFields方法则是将DataInput中的数据反序列化到对象属性中


      • WritableComparable接口
      此接口在实现序列化的同时,还实现了比较功能。 它是Writable 和java.lang.Comparable<T>接口的组合。对于hadoop中的键来说,我们需要比较接口,因为他们将在reduce阶段进行排序比较
            而值只需要被简单的传递即可
      Hadoop带有一下预定义的类用于实现WritableComparable,包括面向所有基本数据类型的封装类
      Hadoop的键和值所采用的数据类型可以超出Hadoop自身所支持的基本类型,你可以自己定义
      只要实现了它的Writable或者WritableComparable接口
      也就是 说 hadoop 可以自定义  key 和 value   ,只是需要实现 接口而已  

      这样 题目就可以做了,   要求:  输出  格式   【word:hello, len:5】     2      
      也就是 最终 输出 的 reduce 的key 改变了而已   

      我们 自己定义一个 KeyWritable  : 实现了  WritableComparable接口   
      内有 两个 属性 word, length  用来保存单词  和 单词的长度, 其中
      1.   
      2. public   class KeyWritable implements   WritableComparable<KeyWritable>{
      3.    
      4.    
      5. private String word;
      6.    
      7. private int length;
      8.    
      9.    
      10. @Override
      11.    
      12. public String toString() {
      13.    
      14. return "[word=" + word + ",   length=" + length + "]";
      15.    
      16. }
      17.    
      18.    
      19. public KeyWritable(String word, int length) {
      20.    
      21. super();
      22.    
      23. this.word = word;
      24.    
      25. this.length = length;
      26.    
      27. }
      28.    
      29.    
      30. @Override
      31.    
      32. public void write(DataOutput out) throws IOException {
      33.    
      34. out.writeInt(length);
      35.    
      36. out.writeUTF(word);
      37.    
      38. }
      39.    
      40.    
      41. @Override
      42.    
      43. public void readFields(DataInput in) throws IOException {
      44.    
      45. in.readInt();
      46.    
      47. in.readUTF();
      48.    
      49. }
      50.    
      51.    
      52. @Override
      53.    
      54. public int compareTo(KeyWritable o) {
      55.    
      56. if(this.word.compareTo(o.getWord()) == 0){
      57.    
      58. return this.length - o.getLength();
      59.    
      60. }
      61.    
      62. return this.word.compareTo(o.getWord());
      63.    
      64. }
      65.    
      66.    
      67. get/set  hash/equals….
      68.    
      69. }
      复制代码
      对应  Map中 并不需要 修改 什么 , 我们要 修改 Reduce中 输出 的key 的值   
        

      1. public   class KeyReducer extends Reducer<Text,   LongWritable, KeyWritable, IntWritable> {
      2.    
      3.    
      4. public void reduce(Text _key, Iterable<LongWritable> values, Context context)
      5.    
      6. throws IOException, InterruptedException {
      7.    
      8. int  count = 0;
      9.    
      10. for (LongWritable val : values) {
      11.    
      12. count = (int) (count + val.get());
      13.    
      14. }
      15.    
      16. int length = _key.getLength();
      17.    
      18. KeyWritable  kw = new KeyWritable(_key.toString(),length);
      19.    
      20. //返回  key: 【word:hello,   len:5】    value :  n
      21.    
      22. context.write(kw,new IntWritable(count));
      23.    
      24. }
      25.    
      26.    
      27. }
      复制代码

      响应的 Run中job设置  Reduce 输出的key的类

        
         
      job.setOutputKeyClass(KeyWritable.class);
         
      job.setOutputValueClass(IntWritable.class);
         


      输出结果:  
      [word=(BIS),, length=6]        1
      [word=(ECCN), length=6]        1
      [word=(TSU), length=5]        1
      [word=(see, length=4]        1

      同样的,我们也可以 改写  输出的 value:  让格式 变成:  BIS   [count=1 ,length=6]
      自定义一个ValWritable,修改响应的  Reduce 输出格式 ,Job中对应的OutputValueClass即可。   




    您需要登录后才可以回帖 登录 | 注册

    本版积分规则

    站长推荐上一条 /1 下一条