本文共 32299 字,大约阅读时间需要 107 分钟。
目前停车场现金统计报表使用日表,数据采用累加方式,存在冗余数据,保存数据时需先查询出对应条目加上这次数据累计再进行更新,高并发下存在问题,且无法按具体时间查询,代码逻辑也存在大量重复性代码,代码结构较差;统计类型数据不存在关联查询,适合使用非关系型数据库,可提高查询性能,这里选择HBASE。
1.阅读目前代码,梳理业务逻辑,重新设计实体类
统计数据包括包期充值、包期续费、包期退款、账户充值、账户退款、临时车缴费(称作收支类型)操作产生的交易行为。
收支方式包括现金、账户、第三方、支付宝、微信、无感支付。
临时车缴费操作中,按收费来源分为岗亭缴费、中央缴费、自助缴费、手持终端缴费(收费宝)、手机H5缴费、第三方缴费。
车辆包期退款记录、账户充值退款记录、临时车缴费记录都有自己专门的表,为了快速地查询出统计数据的日报表、月报表、年报等报表,把这些数据另外统一记录在了一张表中,专门用于统计数据的展示。
原先:每种收支类型和收费来源占用一个字段,同一天相同停车场和收支类型或收费来源的数据叠加在一条记录上,保存数据时需先查询出对应条目加上这次数据累计再进行更新,高并发下存在问题,使用锁的话会导致性能下降。且这样的逻辑下不能按具体时刻来查询统计数据,最小单位只能以天来进行查询。原先实体类中还有一些不插入数据库的应属dto的字段,也需进行分离。
修改后:收支类型单独占一个字段,收费来源单独占一个字段,收支方式占一个字段。
原先的29个字段(22个数据库字段)改为10个字段。
Integer reportType; //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付 EnumChargeStyleTypeInteger incomeOutcomeType; // 收支类型 String parkId; //停车场IDInteger reportItem; //收费来源Date costTime; //缴费时间String indexCode; //系统编号BigDecimal amount; //金额String regionIndexCode;String userIndexCode; //用户IndexCodeString selfPosId; //自助缴费机IDBigDecimal reductPay; //优惠金额
一是按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。
二是按收支方式(现金等)、停车库、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,供收费报表统计列表展示。
三是收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。
四是收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。
HBase应用场景:
“HBase作为一种kv数据库,能够很好的面对高吞吐率的在线数据读写服务,尤其是写操作,但是在非rowkey多条件查询、数据分析、统计等场景下,HBase表现的就不是很好了,这些场景下就比较适合来用MapReduce来计算。”
所以,我们这里使用HBase存储数据,使用MapReduce对数据进行简单的统计和计算,以满足需求。
Hbase表结构
表名:Report
rowkey:costTime_indexCode //缴费时间_系统编号
列族(列族怎么分?):
info:reportType; //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付
info:incomeOutcomeType; //收支类型 info:parkId; //停车场ID info:reportItem; //收费来源 info:amount; //金额 info:regionIndexCode; // info:userIndexCode; //用户IndexCode info:selfPosId; //自助缴费机ID info:reductPay; //优惠金额
一是按收支方式(reportType,现金等)、收支类型(incomeOutcomeType,包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。
Map程序
ReportAnalyzeMapper 继承HBase的TableMapper基类,把incomeOutcomeType_reportType看做文本,金额数是BigDecimal,用Text格式保存,所以key-value输出类型是<Text,Text>。
根据收支类型属于收入还是支出来区分,收入则生成in_reportType,同时统一生成in_all,支出则生成out_reportType,同时统一生成out_all。这样mapper就能根据in_all计算出总收入,根据in_reportType计算出收入中每种收支方式的金额(比如总收入100元,现金收入20元,支付宝收入40元,微信收入40元),根据out_all计算出总支出,根据out_reportType计算出支出中每种收支方式的金额(比如总支出100元,现金支出40元,支付宝支出30元,微信支出30元)。
/**** 按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。* key time_indexcode* info:reportType; //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType; //收支类型 1账户充值 2包期充值 3包期续费 4免费放行 5临时车缴费 6账户退款 7包期退款* info:parkId; //停车场ID* info:reportItem; //1岗亭缴费 2人工缴费 3收费宝 4第三方* info:amount; //金额* info:regionIndexCode; //* info:userIndexCode; //用户IndexCode* info:selfPosId; //自助缴费机ID* info:reduct; //优惠金额** incomeOutcomeType_reportType作为key in/out_收支方式(现金等)作为key* 1_1 1.2* 2_3 5.0*/public class ReportAnalyzeMapper extends TableMapper{ @Override protected void map(ImmutableBytesWritable key, Result value, Mapper .Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String amount = ""; for (java.util.Map.Entry val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列 String str = new String(val.getValue()); if (str != null) { String columnKey = new String(val.getKey()); if (columnKey.equals("reportType")) { reportType = str; } if (columnKey.equals("incomeOutcomeType")) { incomeOutcomeType = str; } if (columnKey.equals("amount")) { amount = str; } } } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "1": case "2": case "3": case "4": case "5": //不同收支方式的收入 context.write(new Text("in" + "_" + reportType), new Text(amount)); //全部的收入 context.write(new Text("in" + "_" + "all"), new Text(amount)); case "6": //不同收支方式的支出 context.write(new Text("out" + "_" + reportType), new Text(amount)); //全部的支出 context.write(new Text("out" + "_" + "all"), new Text(amount)); default:break; } } } catch (Exception e) { e.printStackTrace(); } }}
Reduce程序
TableAnalyzeReduce继承HBase的TableReducer基类,这里需要把最终聚合后的结果写到目标表中,rowkey对应source,行数对应目标表的column:count列。
把相同的key的值加起来。
key为in_all累计得出总收入,key为in_1累计得出总收入中现金收入的金额,key为in_4累计得出总收入中支付宝收入的金额,等等,key为out_all累计得出总支出,key为out_1累计得出支出中现金支出的金额,key为out_5累计得出支出中微信支出的金额,等等。
/** 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* 按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。* 1_1 3.7* 2_3 5.0** 收支类型属于收入的,累加* 收支类型属于支出的,累加* 收支类型属于收入的,按收支方式累加* 收支类型属于支出的,按收支方式累加**/public class ReportAnalyzeReduce extends ReducerCombiner{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); for (Text val : values) { BigDecimal d = new BigDecimal(val.toString()); i.add(d); } context.write(key, new Text(i.toString())); }}
分析map程序,可以看到map阶段处理后的数据是没有任何合并的,key为数据来源source,value都是1,这样的话,如果直接进入reduce阶段,要分发的数据量还是比较大的,会造成网络负担,针对这个问题,可以在map阶段后,做一下本地reduce,这样进入reduce的阶段的数据量会大大减少。
主程序程序接收四个参数,依次为:业务表名称、source字段列族、source字段列名、目标表名,当然也可以使用Apache Commons CLI类解析命令行参数。
scan.setCaching(500)设置每次读取行数,根据实际情况进行配置,scan.setCacheBlocks(false)告诉HBase本次扫描的数据不要放入缓存中。
public class ReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String tableName = "report"; String family = "info"; Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/ReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); try { String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 1) { System.exit(1); } Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); scan.setStartRow("1593071693858_".getBytes()); scan.setStopRow("1593071694491_".getBytes()); Job job = Job.getInstance(conf, "ReportAnalyzeJob" + tableName); job.setJarByClass(ReportAnalyzeJob.class); job.setMapperClass(ReportAnalyzeMapper.class); job.setReducerClass(ReportAnalyzeReduce.class); job.setNumReduceTasks(4); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, ReportAnalyzeMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path(otherArgs[0])); //FileOutputFormat.setOutputPath(job, new Path("hdfs://mycluster:8020/usr/output1")); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
public class TableReportAnalyzeMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { try { String itr = value.toString(); String[] part = itr.split("\t"); if (part.length == 2) { String amount = part[1].toString(); String[] elements = part[0].split("_"); if ("in".equals(elements[0])) { //收入 context.write(new Text(elements[0]), new Text(elements[1] + "_" + amount)); } else if ("out".equals(elements[0])) { //支出 context.write(new Text(elements[0]), new Text(elements[1] + "_" + amount)); } } } catch (Exception e) { e.printStackTrace(); } }}
public class TableReportAnalyzeReduce extends TableReducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); for (Text val : values) { String[] elements = val.toString().split("_"); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(elements[0]), Bytes.toBytes(elements[1])); //key:in/out family:statistics 1:2.5 2:5.8 all:7.3 //查询时按key查 } context.write(new Text(key.toString()), put); }}
public class TableReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String targetTable = "report"; System.out.println("targetTable=" + targetTable); Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/TableReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); try { Job job = Job.getInstance(conf, "TableReportAnalyzeJob" + targetTable); job.setJarByClass(TableReportAnalyzeJob.class); job.setMapperClass(TableReportAnalyzeMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(TableReportAnalyzeReduce.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path("hdfs://mycluster:8020/usr/output1")); TableMapReduceUtil.initTableReducerJob(targetTable, TableReportAnalyzeReduce.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
in_all:amount、in_1:amount、in_4:amount,out_all:amount、out_1:amount、out_5:amount等等,转化成in:all_amount、in:1_amount、in:4_amount,out:all_amount、out:1_amount、out:5_amount等等。
mapper将数据存入hbase,业务方就可以通过key为in来获取收入相关统计值,包括总收入、现金收入、微信收入、支付宝收入等,key为out来获取支出相关统计值,包括总支出、现金支出、微信支出、支付宝支出等。
二、按收支方式(现金等)、停车库、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,供收费报表统计列表展示。
根据收支类型属于收入还是支出来区分,收入则生成in_reportType_incomeOutcomeType_parkId,支出则生成out_reportType_incomeOutcomeType_parkId。
/** 按收支方式(现金等)、停车库、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,供收费报表统计列表展示** info:reportType; //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType; //收支类型* info:parkId; //停车场ID* info:reportItem; //收费来源* info:amount; //金额* info:regionIndexCode; //* info:userIndexCode; //用户IndexCode* info:selfPosId; //自助缴费机ID* info:reductPay; //优惠金额** incomeOutcomeType_reportType_parkId作为key* 1_1_111 1.2* 2_3_222 5.0*/public class ListReportAnalyzeMapper extends TableMapper{ @Override protected void map(ImmutableBytesWritable key, Result value, Mapper .Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String parkId = ""; String amount = ""; for (java.util.Map.Entry val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列 String str = new String(val.getValue()); if (str != null) { String columnKey = new String(val.getKey()); if (columnKey.equals("reportType")) { reportType = str; } if (columnKey.equals("incomeOutcomeType")) { incomeOutcomeType = str; } if (columnKey.equals("parkId")) { parkId = str; } if (columnKey.equals("amount")) { amount = str; } } } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "1": case "2": case "3": case "4": case "5": //不同收支方式的收入 context.write(new Text("in" + "_" + reportType + "_" + incomeOutcomeType + "_" + parkId), new Text(amount)); break; case "6": case "7": //不同收支方式的支出 context.write(new Text("out" + "_" + reportType + "_" + incomeOutcomeType + "_" + parkId), new Text(amount)); break; default:break; } } } catch (Exception e) { e.printStackTrace(); } } }
/** 按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。* 1_1 3.7* 2_3 5.0**/public class ListReportAnalyzeReduce extends TableReducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); for (Text val : values) { BigDecimal d = new BigDecimal(val.toString()); i = i.add(d); } String itr = key.toString(); String amount = i.toString(); String[] elements = itr.split("_"); //in_reportType_incomeOutcomeType_parkId amount //in_1_1_111 Put put = new Put(Bytes.toBytes(key.toString())); if (elements.length == 4) { put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parkId"), Bytes.toBytes(elements[3])); } put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("incomeOutcomeType"), Bytes.toBytes(elements[2])); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("amount"), Bytes.toBytes(amount)); //key:in_1_1_111 family:list //查询时按key范围查 context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put); }}
public class ListReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String tableName = "report"; String family = "info"; String targetTbale = "report"; // System.out.println("tableName=" + tableName + ", family=" + family + ", column=" + column + ", targetTbale=" // + targetTbale); Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/ListReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); try { Job job = Job.getInstance(conf, "ListReportAnalyzeJob" + tableName); job.setJarByClass(ListReportAnalyzeJob.class); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, ListReportAnalyzeMapper.class, Text.class, Text.class, job); TableMapReduceUtil.initTableReducerJob(targetTbale, ListReportAnalyzeReduce.class, job); job.setMapperClass(ListReportAnalyzeMapper.class); job.setReducerClass(ListReportAnalyzeReduce.class); job.setNumReduceTasks(4); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
三、收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。
/** 收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。* info:reportType; //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType; //收支类型* info:parkId; //停车场ID* info:reportItem; //收费来源* info:amount; //金额* info:regionIndexCode; //* info:userIndexCode; //用户IndexCode* info:selfPosId; //自助缴费机ID* info:reductPay; //优惠金额** reportType_reportItem作为key* 1_1 1.2* 2_3 5.0*/public class OneReportAnalyzeMapper extends TableMapper{ @Override protected void map(ImmutableBytesWritable key, Result value, Mapper .Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String reportItem = ""; String amount = ""; for (java.util.Map.Entry val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列 String str = new String(val.getValue()); if (str != null) { String columnKey = new String(val.getKey()); if (columnKey.equals("reportType")) { reportType = str; } if (columnKey.equals("incomeOutcomeType")) { incomeOutcomeType = str; } if (columnKey.equals("reportItem")) { reportItem = str; } if (columnKey.equals("amount")) { amount = str; } } } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "5": //不同收支方式的不同收费来源的岗亭缴费 context.write(new Text("terminal" + "_" + reportType + "_" + reportItem), new Text(amount)); //全部收支方式的不同收费来源的岗亭缴费 context.write(new Text("terminal" + "_" + "all" + "_" + reportItem), new Text(amount)); break; default:break; } } } catch (Exception e) { e.printStackTrace(); } }}
/** 收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。* terminal_reportType_reportItem amount*/public class OneReportAnalyzeReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); for (Text val : values) { BigDecimal d = new BigDecimal(val.toString()); i = i.add(d); } context.write(key, new Text(i.toString())); }}
public class OneReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String tableName = "report"; String family = "info"; // String column = args[2]; // System.out.println("tableName=" + tableName + ", family=" + family + ", column=" + column + ", targetTbale=" // + targetTbale); Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/OneReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); scan.setStartRow("1593071693858_".getBytes()); scan.setStopRow("1593071694491_".getBytes()); try { Job job = Job.getInstance(conf, "OneReportAnalyzeJob" + tableName); job.setJarByClass(OneReportAnalyzeJob.class); job.setMapperClass(OneReportAnalyzeMapper.class); job.setReducerClass(OneReportAnalyzeReduce.class); job.setNumReduceTasks(4); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, OneReportAnalyzeMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("hdfs://mycluster:8020/usr/output2")); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
public class TableOneReportAnalyzeMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { try { String itr = value.toString(); String[] part = itr.split("\t"); if (part.length == 2) { String amount = part[1].toString(); String[] elements = part[0].split("_"); context.write(new Text(elements[0] + "_" + elements[1]), new Text(elements[2] + "_" + amount)); //terminal_1/all 1_2.5 } } catch (Exception e) { e.printStackTrace(); } }}
/** terminal_1/all 1_2.5*/public class TableOneReportAnalyzeReduce extends TableReducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); BigDecimal i = new BigDecimal(0); for (Text val : values) { String[] elements = val.toString().split("_"); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(elements[0]), Bytes.toBytes(elements[1])); i = i.add(new BigDecimal(elements[1].toString())); //key:terminal_1/all family:statistics 1:2.5 2:5.8 //查询时按key查 } put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("all"), Bytes.toBytes(i)); context.write(new Text(key.toString()), put); // System.out.println(key.toString() + "\t" + ); }}
public class TableOneReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String targetTbale = "report"; System.out.println("targetTable=" + targetTbale); Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/TableOneReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); try { Job job = Job.getInstance(conf, "TableOneReportAnalyzeJob" + targetTbale); job.setJarByClass(TableOneReportAnalyzeJob.class); job.setMapperClass(TableOneReportAnalyzeMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(TableOneReportAnalyzeReduce.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path("hdfs://mycluster:8020/usr/output2")); TableMapReduceUtil.initTableReducerJob(targetTbale, TableOneReportAnalyzeReduce.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
四、收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。
/** 收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。** info:reportType; //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType; //收支类型* info:parkId; //停车场ID* info:reportItem; //收费来源* info:amount; //金额* info:regionIndexCode; //* info:userIndexCode; //用户IndexCode* info:selfPosId; //自助缴费机ID* info:reductPay; //优惠金额** terminal_reportType_reportItem_parkId作为key* terminal_1_111 1.2* terminal_3_222 5.0*/public class OneListReportAnalyzeMapper extends TableMapper{ @Override protected void map(ImmutableBytesWritable key, Result value, Mapper .Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String parkId = ""; String reportItem = ""; String amount = ""; String reduce = ""; for (java.util.Map.Entry val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列 String str = new String(val.getValue()); if (str != null) { String columnKey = new String(val.getKey()); if (columnKey.equals("reportType")) { reportType = str; } if (columnKey.equals("incomeOutcomeType")) { incomeOutcomeType = str; } if (columnKey.equals("reportItem")) { reportItem = str; } if (columnKey.equals("parkId")) { parkId = str; } if (columnKey.equals("amount")) { amount = str; } if (columnKey.equals("reduce")) { reduce = str; } } } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "5": //不同收支方式的收入 context.write(new Text("terminal" + "_" + reportType + "_" + parkId + "_" + reportItem), new Text(amount + "_" + reduce)); //全部 context.write(new Text("terminal" + "_" + "all" + "_" + parkId + "_" + reportItem), new Text(amount + "_" + reduce)); break; default:break; } } } catch (Exception e) { e.printStackTrace(); } }}
/** 收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。* terminal_reportType_parkId_reportItem amount_reduce*/public class OneListReportAnalyzeReduce extends Reducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); BigDecimal iReduce = new BigDecimal(0); for (Text val : values) { String[] vals = val.toString().split("_"); BigDecimal d = new BigDecimal(vals[0].toString()); i = i.add(d); if (vals.length == 2) { BigDecimal dReduce = new BigDecimal(vals[1].toString()); iReduce = iReduce.add(dReduce); } } context.write(key, new Text(i.toString() + "_" + iReduce.toString())); }}
public class OneListReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String tableName = "report"; String family = "info"; // System.out.println("tableName=" + tableName + ", family=" + family + ", column=" + column + ", targetTbale=" // + targetTbale); Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/OneListReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); Scan scan = new Scan(); scan.setCaching(500); scan.setCacheBlocks(false); scan.setStartRow("1593071693858_".getBytes()); scan.setStopRow("1593071694491_".getBytes()); try { Job job = Job.getInstance(conf, "OneListReportAnalyzeJob" + tableName); job.setJarByClass(OneListReportAnalyzeJob.class); job.setMapperClass(OneListReportAnalyzeMapper.class); job.setReducerClass(OneListReportAnalyzeReduce.class); job.setNumReduceTasks(4); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, OneListReportAnalyzeMapper.class, Text.class, Text.class, job); FileOutputFormat.setOutputPath(job, new Path("hdfs://mycluster:8020/usr/output3")); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
public class TableOneListReportAnalyzeMapper extends Mapper{ @Override protected void map(LongWritable key, Text value, Mapper .Context context) throws IOException, InterruptedException { try { String itr = value.toString(); String[] part = itr.split("\t"); if (part.length == 2) { String amount_reduce = part[1].toString(); String[] elements = part[0].split("_"); context.write(new Text(elements[0] + "_" + elements[1] + "_" + elements[2]), new Text(elements[3] + "_" + amount_reduce)); //terminal_1/all_111 1_1.2_2.5 } } catch (Exception e) { e.printStackTrace(); } }}
/** terminal_1/all_111 1_1.2_2.5*/public class TableOneListReportAnalyzeReduce extends TableReducer{ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); BigDecimal i = new BigDecimal(0); BigDecimal reduce = new BigDecimal(0); for (Text val : values) { String[] elements = val.toString().split("_"); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(elements[0]), Bytes.toBytes(elements[1])); i = i.add(new BigDecimal(elements[1].toString())); reduce = reduce.add(new BigDecimal(elements[2].toString())); //key:terminal_1/all_111 family:list 1:2.5 2:5.8 reduce:2.5 //查询时按key范围查 } put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("all"), Bytes.toBytes(i)); put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("reduce"), Bytes.toBytes(reduce)); context.write(new Text(key.toString()), put); }}
public class TableOneListReportAnalyzeJob { public static void main(String[] args) throws ClassNotFoundException, InterruptedException { String targetTbale = "report"; System.out.println("targetTbale=" + targetTbale); Configuration conf = HBaseConfiguration.create(); conf.set("mapred.jar", "/Users/Desktop/TableOneListReportAnalyzeJob.jar"); conf.set("hbase.zookeeper.quorum", "node1,node2,node3"); try { Job job = Job.getInstance(conf, "TableOneListReportAnalyzeJob" + targetTbale); job.setJarByClass(TableOneListReportAnalyzeJob.class); job.setMapperClass(TableOneListReportAnalyzeMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(TableOneListReportAnalyzeReduce.class); job.setNumReduceTasks(1); FileInputFormat.addInputPath(job, new Path("hdfs://mycluster:8020/usr/output3")); TableMapReduceUtil.initTableReducerJob(targetTbale, TableOneListReportAnalyzeReduce.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { e.printStackTrace(); } }}
性能测试
制造虚拟数据:可先创建tsv格式文件,再使用MapReduce将数据导入到HBASE
1 在本地创建一个tsv格式的文件:fruit.tsv
1593400710000 1 1 park1 1 10.1
1593400710001 1 2 5.5
1593400710002 1 3 park1 2 6.7
1593400710002 1 5 park1 3 8.8
......
2 创建HBase表(本例中之前已创建)
hbase(main):001:0> create 'report','info'
3 在HDFS中创建input_report文件夹并上传report.tsv文件
$ /usr/hadoop-3.2.1/bin/hdfs dfs -mkdir /input_report/
$ /usr/hadoop-3.2.1/bin/hdfs dfs -put report.tsv /input_report/
4 执行MapReduce到HBase的report表中
$ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:reportType,info:incomeOutcomeType report\
hdfs://mycluster:9000/input_report
转载地址:http://ayfgi.baihongyu.com/