博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
一脸懵逼学习Hadoop中的MapReduce程序中自定义分组的实现
阅读量:5902 次
发布时间:2019-06-19

本文共 16671 字,大约阅读时间需要 55 分钟。

1:首先搞好实体类对象:

  write 是把每个对象序列化到输出流,readFields是把输入流字节反序列化,实现WritableComparable,Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法

1 package com.areapartition;  2   3 import java.io.DataInput;  4 import java.io.DataOutput;  5 import java.io.IOException;  6   7 import org.apache.hadoop.io.Writable;  8 import org.apache.hadoop.io.WritableComparable;  9  10 /*** 11  *  12  * @author Administrator 13  * 1:write 是把每个对象序列化到输出流 14  * 2:readFields是把输入流字节反序列化 15  * 3:实现WritableComparable 16  *      Java值对象的比较:一般需要重写toString(),hashCode(),equals()方法 17  *  18  */ 19 public class FlowBean implements WritableComparable
{ 20 21 22 private String phoneNumber;//电话号码 23 private long upFlow;//上行流量 24 private long downFlow;//下行流量 25 private long sumFlow;//总流量 26 27 28 29 public String getPhoneNumber() { 30 return phoneNumber; 31 } 32 public void setPhoneNumber(String phoneNumber) { 33 this.phoneNumber = phoneNumber; 34 } 35 public long getUpFlow() { 36 return upFlow; 37 } 38 public void setUpFlow(long upFlow) { 39 this.upFlow = upFlow; 40 } 41 public long getDownFlow() { 42 return downFlow; 43 } 44 public void setDownFlow(long downFlow) { 45 this.downFlow = downFlow; 46 } 47 public long getSumFlow() { 48 return sumFlow; 49 } 50 public void setSumFlow(long sumFlow) { 51 this.sumFlow = sumFlow; 52 } 53 54 //为了对象数据的初始化方便,加入一个带参的构造函数 55 public FlowBean(String phoneNumber, long upFlow, long downFlow) { 56 this.phoneNumber = phoneNumber; 57 this.upFlow = upFlow; 58 this.downFlow = downFlow; 59 this.sumFlow = upFlow + downFlow; 60 } 61 //在反序列化时候,反射机制需要调用空参的构造函数,所以定义了一个空参的构造函数 62 public FlowBean() { 63 } 64 65 //重写toString()方法 66 @Override 67 public String toString() { 68 return "" + upFlow + "\t" + downFlow + "\t" + sumFlow + ""; 69 } 70 71 72 //从数据流中反序列出对象的数据 73 //从数据流中读取字段时必须和序列化的顺序保持一致 74 @Override 75 public void readFields(DataInput in) throws IOException { 76 phoneNumber = in.readUTF(); 77 upFlow = in.readLong(); 78 downFlow = in.readLong(); 79 sumFlow = in.readLong(); 80 81 } 82 83 //将对象数据序列化到流中 84 @Override 85 public void write(DataOutput out) throws IOException { 86 out.writeUTF(phoneNumber); 87 out.writeLong(upFlow); 88 out.writeLong(downFlow); 89 out.writeLong(sumFlow); 90 91 } 92 93 //流量比较的实现方法 94 @Override 95 public int compareTo(FlowBean o) { 96 97 //大就返回-1,小于等于返回1,进行倒序排序 98 return sumFlow > o.sumFlow ? -1 : 1; 99 }100 101 102 103 }

 2:流量分区处理操作的步骤:

   2. 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件;

   2.2:需要自定义改造两个机制:

    2.2.1:改造分区的逻辑,自定义一个partitioner

    2.2.2:自定义reducer task的并发任务数

1 package com.areapartition;  2   3 import java.io.IOException;  4   5 import org.apache.commons.lang.StringUtils;  6 import org.apache.hadoop.conf.Configuration;  7 import org.apache.hadoop.fs.Path;  8 import org.apache.hadoop.io.LongWritable;  9 import org.apache.hadoop.io.Text; 10 import org.apache.hadoop.mapreduce.Job; 11 import org.apache.hadoop.mapreduce.Mapper; 12 import org.apache.hadoop.mapreduce.Reducer; 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 15  16 /*** 17  * 流量分区处理操作 18  * @author Administrator 19  * 1:对流量原始日志进行流量统计,将不同的省份的用户统计结果输出到不同文件; 20  * 2:需要自定义改造两个机制: 21  *      2.1:改造分区的逻辑,自定义一个partitioner 22  *      2.2:自定义reducer task的并发任务数    23  */ 24 public class FlowSumArea { 25  26      27     public static class FlowSumAreaMapper extends Mapper
{ 28 @Override 29 protected void map(LongWritable key, Text value, Context context) 30 throws IOException, InterruptedException { 31 //拿到一行数据 32 String line = value.toString(); 33 //切分成各个字段 34 String[] fields = StringUtils.split(line, "\t"); 35 36 //获取到我们需要的字段 37 String phoneNumber = fields[1]; 38 long up_flow = Long.parseLong(fields[7]); 39 long down_flow = Long.parseLong(fields[8]); 40 41 //封装成key-value并且输出 42 context.write(new Text(phoneNumber), new FlowBean(phoneNumber, up_flow, down_flow)); 43 } 44 } 45 46 47 public static class FlowSumAreaReducer extends Reducer
{ 48 @Override 49 protected void reduce(Text key, Iterable
values, Context context) 50 throws IOException, InterruptedException { 51 //遍历求和 52 long up_flowSum = 0; 53 long down_flowSum = 0; 54 for(FlowBean fb : values){ 55 up_flowSum += fb.getUpFlow(); 56 down_flowSum += fb.getDownFlow(); 57 } 58 59 //封装成key-value并且输出 60 context.write(key, new FlowBean(key.toString(),up_flowSum,down_flowSum)); 61 } 62 63 } 64 65 66 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { 67 //创建配置文件 68 Configuration conf = new Configuration(); 69 //获取一个作业 70 Job job = Job.getInstance(conf); 71 72 //设置整个job所用的那些类在哪个jar包 73 job.setJarByClass(FlowSumArea.class); 74 //本job使用的mapper和reducer的类 75 job.setMapperClass(FlowSumAreaMapper.class); 76 job.setReducerClass(FlowSumAreaReducer.class); 77 78 //设置我们自定义的分组逻辑定义 79 job.setPartitionerClass(AreaPartitioner.class); 80 81 //指定mapper的输出数据key-value类型 82 job.setMapOutputKeyClass(Text.class); 83 job.setMapOutputValueClass(FlowBean.class); 84 85 //指定reduce的输出数据key-value类型Text 86 job.setOutputKeyClass(Text.class); 87 job.setOutputValueClass(FlowBean.class); 88 89 90 //设置reduce的任务并发数,应该跟分组的数量保持一致 91 job.setNumReduceTasks(7); 92 93 //指定要处理的输入数据存放路径 94 //FileInputFormat是所有以文件作为数据源的InputFormat实现的基类, 95 //FileInputFormat保存作为job输入的所有文件,并实现了对输入文件计算splits的方法。 96 //至于获得记录的方法是有不同的子类——TextInputFormat进行实现的。 97 FileInputFormat.setInputPaths(job, new Path(args[0])); 98 99 //指定处理结果的输出数据存放路径100 FileOutputFormat.setOutputPath(job, new Path(args[1]));101 102 //将job提交给集群运行 103 //job.waitForCompletion(true);104 //正常执行成功返回0,否则返回1105 System.exit(job.waitForCompletion(true) ? 0 : 1);;106 107 }108 109 }

 3:从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号:

  3.1:Partitioner是partitioner的基类,如果需要定制partitioner也需要继承该类。

  3.2:HashPartitioner是mapreduce的默认partitioner。计算方法是 which reducer=(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks,得到当前的目的reducer。

1 package com.areapartition; 2  3 import java.util.HashMap; 4  5 import org.apache.hadoop.mapreduce.Partitioner; 6  7 public class AreaPartitioner
extends Partitioner
{ 8 9 private static HashMap
areaMap = new HashMap
();10 11 static{12 areaMap.put("135", 0);13 areaMap.put("136", 1);14 areaMap.put("137", 2);15 areaMap.put("138", 3);16 areaMap.put("139", 4);17 areaMap.put("841", 5);18 }19 20 @Override21 public int getPartition(KEY key, VALUE value, int numPartitions) {22 //从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号23 Integer areaCoder = areaMap.get(key.toString().subSequence(0, 3)) == null ? 6 : areaMap.get(key.toString().subSequence(0, 3));24 25 26 return areaCoder;27 }28 29 30 }

4:将打好的jar包上传到虚拟机上面:

然后启动搭建的集群start-dfs.sh,start-yarn.sh:

然后操作如下所示:

1 [root@master hadoop]# hadoop jar flowarea.jar com.areapartition.FlowSumArea /flow/data /flow/areaoutput4  2 17/09/25 15:36:38 INFO client.RMProxy: Connecting to ResourceManager at master/192.168.0.55:8032  3 17/09/25 15:36:38 WARN mapreduce.JobSubmitter: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.  4 17/09/25 15:36:38 INFO input.FileInputFormat: Total input paths to process : 1  5 17/09/25 15:36:38 INFO mapreduce.JobSubmitter: number of splits:1  6 17/09/25 15:36:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1506324201206_0004  7 17/09/25 15:36:38 INFO impl.YarnClientImpl: Submitted application application_1506324201206_0004  8 17/09/25 15:36:38 INFO mapreduce.Job: The url to track the job: http://master:8088/proxy/application_1506324201206_0004/  9 17/09/25 15:36:38 INFO mapreduce.Job: Running job: job_1506324201206_0004 10 17/09/25 15:36:43 INFO mapreduce.Job: Job job_1506324201206_0004 running in uber mode : false 11 17/09/25 15:36:43 INFO mapreduce.Job:  map 0% reduce 0% 12 17/09/25 15:36:48 INFO mapreduce.Job:  map 100% reduce 0% 13 17/09/25 15:36:56 INFO mapreduce.Job:  map 100% reduce 14% 14 17/09/25 15:37:04 INFO mapreduce.Job:  map 100% reduce 29% 15 17/09/25 15:37:08 INFO mapreduce.Job:  map 100% reduce 43% 16 17/09/25 15:37:10 INFO mapreduce.Job:  map 100% reduce 71% 17 17/09/25 15:37:11 INFO mapreduce.Job:  map 100% reduce 86% 18 17/09/25 15:37:12 INFO mapreduce.Job:  map 100% reduce 100% 19 17/09/25 15:37:12 INFO mapreduce.Job: Job job_1506324201206_0004 completed successfully 20 17/09/25 15:37:12 INFO mapreduce.Job: Counters: 49 21     File System Counters 22         FILE: Number of bytes read=1158 23         FILE: Number of bytes written=746635 24         FILE: Number of read operations=0 25         FILE: Number of large read operations=0 26         FILE: Number of write operations=0 27         HDFS: Number of bytes read=2322 28         HDFS: Number of bytes written=526 29         HDFS: Number of read operations=24 30         HDFS: Number of large read operations=0 31         HDFS: Number of write operations=14 32     Job Counters  33         Launched map tasks=1 34         Launched reduce tasks=7 35         Data-local map tasks=1 36         Total time spent by all maps in occupied slots (ms)=2781 37         Total time spent by all reduces in occupied slots (ms)=98540 38         Total time spent by all map tasks (ms)=2781 39         Total time spent by all reduce tasks (ms)=98540 40         Total vcore-seconds taken by all map tasks=2781 41         Total vcore-seconds taken by all reduce tasks=98540 42         Total megabyte-seconds taken by all map tasks=2847744 43         Total megabyte-seconds taken by all reduce tasks=100904960 44     Map-Reduce Framework 45         Map input records=22 46         Map output records=22 47         Map output bytes=1072 48         Map output materialized bytes=1158 49         Input split bytes=93 50         Combine input records=0 51         Combine output records=0 52         Reduce input groups=21 53         Reduce shuffle bytes=1158 54         Reduce input records=22 55         Reduce output records=21 56         Spilled Records=44 57         Shuffled Maps =7 58         Failed Shuffles=0 59         Merged Map outputs=7 60         GC time elapsed (ms)=1751 61         CPU time spent (ms)=4130 62         Physical memory (bytes) snapshot=570224640 63         Virtual memory (bytes) snapshot=2914865152 64         Total committed heap usage (bytes)=234950656 65     Shuffle Errors 66         BAD_ID=0 67         CONNECTION=0 68         IO_ERROR=0 69         WRONG_LENGTH=0 70         WRONG_MAP=0 71         WRONG_REDUCE=0 72     File Input Format Counters  73         Bytes Read=2229 74     File Output Format Counters  75         Bytes Written=526 76 [root@master hadoop]# hadoop fs -ls /flow/ 77 Found 10 items 78 drwxr-xr-x   - root supergroup          0 2017-09-25 15:25 /flow/areaoutput 79 drwxr-xr-x   - root supergroup          0 2017-09-25 15:34 /flow/areaoutput2 80 drwxr-xr-x   - root supergroup          0 2017-09-25 15:35 /flow/areaoutput3 81 drwxr-xr-x   - root supergroup          0 2017-09-25 15:37 /flow/areaoutput4 82 -rw-r--r--   1 root supergroup       2229 2017-09-20 10:00 /flow/data 83 drwxr-xr-x   - root supergroup          0 2017-09-20 09:35 /flow/output 84 drwxr-xr-x   - root supergroup          0 2017-09-20 09:47 /flow/output2 85 drwxr-xr-x   - root supergroup          0 2017-09-20 10:01 /flow/output3 86 drwxr-xr-x   - root supergroup          0 2017-09-20 10:21 /flow/output4 87 drwxr-xr-x   - root supergroup          0 2017-09-21 19:32 /flow/sortoutput 88 [root@master hadoop]# hadoop fs -ls /flow/areaoutput4 89 Found 8 items 90 -rw-r--r--   1 root supergroup          0 2017-09-25 15:37 /flow/areaoutput4/_SUCCESS 91 -rw-r--r--   1 root supergroup         77 2017-09-25 15:36 /flow/areaoutput4/part-r-00000 92 -rw-r--r--   1 root supergroup         49 2017-09-25 15:37 /flow/areaoutput4/part-r-00001 93 -rw-r--r--   1 root supergroup        104 2017-09-25 15:37 /flow/areaoutput4/part-r-00002 94 -rw-r--r--   1 root supergroup         22 2017-09-25 15:37 /flow/areaoutput4/part-r-00003 95 -rw-r--r--   1 root supergroup        102 2017-09-25 15:37 /flow/areaoutput4/part-r-00004 96 -rw-r--r--   1 root supergroup         24 2017-09-25 15:37 /flow/areaoutput4/part-r-00005 97 -rw-r--r--   1 root supergroup        148 2017-09-25 15:37 /flow/areaoutput4/part-r-00006 98 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00000 99 13502468823    102    7335    7437100 13560436666    954    200    1154101 13560439658    5892    400    6292102 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00001103 13602846565    12    1938    1950104 13660577991    9    6960    6969105 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00002106 13719199419    0    200    200107 13726230503    2481    24681    27162108 13726238888    2481    24681    27162109 13760778710    120    200    320110 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00003111 13826544101    0    200    200112 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00004113 13922314466    3008    3720    6728114 13925057413    63    11058    11121115 13926251106    0    200    200116 13926435656    1512    200    1712117 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00005118 84138413    4116    1432    5548119 [root@master hadoop]# hadoop fs -cat /flow/areaoutput4/part-r-00006120 13480253104    180    200    380121 15013685858    27    3659    3686122 15920133257    20    3156    3176123 15989002119    3    1938    1941124 18211575961    12    1527    1539125 18320173382    18    9531    9549

 5:复制多份测试数据操作如下,测试map的多线程执行:

  5.1:map task 的并发数是切片的数量决定的,有多少个切片,就启动多少个map task。

  5.2:切片是一个逻辑的概念,指的就是文件中数据的偏移量的范围。

  5.3:切片的具体大小应该根据所处理的文件的大小来调整。

[root@master hadoop]# hadoop fs -mkdir /flow/data/[root@master hadoop]# hadoop fs -put HTTP_20130313143750.dat /flow/data/[root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.2[root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.3[root@master hadoop]# hadoop fs -cp /flow/data/HTTP_20130313143750.dat /flow/data/HTTP_20130313143750.dat.4[root@master hadoop]# hadoop fs -ls /flow/data/Found 4 items-rw-r--r--   1 root supergroup       2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat-rw-r--r--   1 root supergroup       2229 2017-09-25 16:36 /flow/data/HTTP_20130313143750.dat.2-rw-r--r--   1 root supergroup       2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.3-rw-r--r--   1 root supergroup       2229 2017-09-25 16:37 /flow/data/HTTP_20130313143750.dat.4[root@master hadoop]#

6:Combiners编程

  6.1:每一个map可能会产生大量的输出,combiner的作用就是在map端对输出先做一次合并,以减少传输到reducer的数据量。

  6.2:combiner最基本是实现本地key的归并,combiner具有类似本地的reduce功能。

  6.3: 如果不用combiner,那么,所有的结果都是reduce完成,效率会相对低下。使用combiner,先完成的map会在本地聚合,提升速度。

  6.4:注意:Combiner的输出是Reducer的输入,如果Combiner是可插拔的,添加Combiner绝不能改变最终的计算结果。所以Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。

7:shuffle机制:

   7.1:每个map有一个环形内存缓冲区,用于存储任务的输出。默认大小100MB(io.sort.mb属性),一旦达到阀值0.8(io.sort.spill.percent),一个后台线程把内容写到(spill)磁盘的指定目录(mapred.local.dir)下的新建的一个溢出写文件。

   7.2:写磁盘前,要partition(分组),sort(排序)。如果有combiner,combine排序后数据。

   7.3:等最后记录写完,合并全部溢出写文件为一个分区且排序的文件。

   7.4:Reducer通过Http方式得到输出文件的分区。

   7.5:TaskTracker为分区文件运行Reduce任务。复制阶段把Map输出复制到Reducer的内存或磁盘。一个Map任务完成,Reduce就开始复制输出。

   7.6:排序阶段合并map输出。然后走Reduce阶段。

 

转载地址:http://bqupx.baihongyu.com/

你可能感兴趣的文章
DW 正则
查看>>
抓屏原理
查看>>
UNIX网络编程读书笔记:TCP输出、UDP输出和SCTP输出
查看>>
扩展 DbUtility (1)
查看>>
iOS开发UI篇—使用picker View控件完成一个简单的选餐应用
查看>>
Apple Developer Registration and DUNS Number Not Accepted
查看>>
Hadoop学习笔记系列文章导航
查看>>
SpringMVC中ModelAndView addObject()设置的值jsp取不到的问题
查看>>
Prometheus : 入门
查看>>
使用 PowerShell 创建和修改 ExpressRoute 线路
查看>>
在C#中获取如PHP函数time()一样的时间戳
查看>>
Redis List数据类型
查看>>
大数据项目实践(四)——之Hive配置
查看>>
初学vue2.0-组件-文档理解笔记v1.0
查看>>
Centos7安装Gitlab10.0
查看>>
上传图片预览
查看>>
lagp,lacp详解
查看>>
LVS之DR模式原理与实践
查看>>
Docker的系统资源限制及验证
查看>>
c++ ios_base register_callback方法使用
查看>>