博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
HBase+MapReduce案例
阅读量:4281 次
发布时间:2019-05-27

本文共 32299 字,大约阅读时间需要 107 分钟。

        目前停车场现金统计报表使用日表,数据采用累加方式,存在冗余数据,保存数据时需先查询出对应条目加上这次数据累计再进行更新,高并发下存在问题,且无法按具体时间查询,代码逻辑也存在大量重复性代码,代码结构较差;统计类型数据不存在关联查询,适合使用非关系型数据库,可提高查询性能,这里选择HBASE。

        1.阅读目前代码,梳理业务逻辑,重新设计实体类

        统计数据包括包期充值、包期续费、包期退款、账户充值、账户退款、临时车缴费(称作收支类型)操作产生的交易行为。

        收支方式包括现金、账户、第三方、支付宝、微信、无感支付。

        临时车缴费操作中,按收费来源分为岗亭缴费、中央缴费、自助缴费、手持终端缴费(收费宝)、手机H5缴费、第三方缴费。        

        车辆包期退款记录、账户充值退款记录、临时车缴费记录都有自己专门的表,为了快速地查询出统计数据的日报表、月报表、年报等报表,把这些数据另外统一记录在了一张表中,专门用于统计数据的展示。

        原先:每种收支类型和收费来源占用一个字段,同一天相同停车场和收支类型或收费来源的数据叠加在一条记录上,保存数据时需先查询出对应条目加上这次数据累计再进行更新,高并发下存在问题,使用锁的话会导致性能下降。且这样的逻辑下不能按具体时刻来查询统计数据,最小单位只能以天来进行查询。原先实体类中还有一些不插入数据库的应属dto的字段,也需进行分离。

       修改后:收支类型单独占一个字段,收费来源单独占一个字段,收支方式占一个字段。

       原先的29个字段(22个数据库字段)改为10个字段。

 

Integer reportType;        //收支方式  1现金 2账户 3第三方 4支付宝 5微信 6无感支付 EnumChargeStyleTypeInteger incomeOutcomeType; // 收支类型 String parkId;              //停车场IDInteger reportItem;        //收费来源Date costTime;              //缴费时间String indexCode;           //系统编号BigDecimal amount; //金额String regionIndexCode;String userIndexCode;        //用户IndexCodeString selfPosId;           //自助缴费机IDBigDecimal reductPay;       //优惠金额

 

        一是按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。

        二是按收支方式(现金等)、停车库、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,供收费报表统计列表展示。

        三是收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。

        四是收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。

 

HBase应用场景:

        “HBase作为一种kv数据库,能够很好的面对高吞吐率的在线数据读写服务,尤其是写操作,但是在非rowkey多条件查询、数据分析、统计等场景下,HBase表现的就不是很好了,这些场景下就比较适合来用MapReduce来计算。”

        所以,我们这里使用HBase存储数据,使用MapReduce对数据进行简单的统计和计算,以满足需求。

 

Hbase表结构

表名:Report

rowkey:costTime_indexCode   //缴费时间_系统编号

列族(列族怎么分?):

info:reportType;                  //收支方式  1现金 2账户 3第三方 4支付宝 5微信 6无感支付

info:incomeOutcomeType; //收支类型
info:parkId;                         //停车场ID
info:reportItem;                   //收费来源
info:amount;                      //金额
info:regionIndexCode;       //
info:userIndexCode;          //用户IndexCode
info:selfPosId;                   //自助缴费机ID
info:reductPay;                  //优惠金额

 

        一是按收支方式(reportType,现金等)、收支类型(incomeOutcomeType,包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。

 

Map程序

        ReportAnalyzeMapper 继承HBase的TableMapper基类,把incomeOutcomeType_reportType看做文本,金额数是BigDecimal,用Text格式保存,所以key-value输出类型是<Text,Text>。

        根据收支类型属于收入还是支出来区分,收入则生成in_reportType,同时统一生成in_all,支出则生成out_reportType,同时统一生成out_all。这样mapper就能根据in_all计算出总收入,根据in_reportType计算出收入中每种收支方式的金额(比如总收入100元,现金收入20元,支付宝收入40元,微信收入40元),根据out_all计算出总支出,根据out_reportType计算出支出中每种收支方式的金额(比如总支出100元,现金支出40元,支付宝支出30元,微信支出30元)。

/**** 按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。* key time_indexcode* info:reportType;                  //收支方式  1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType;           //收支类型 1账户充值 2包期充值 3包期续费 4免费放行 5临时车缴费 6账户退款 7包期退款* info:parkId;                      //停车场ID* info:reportItem;                  //1岗亭缴费 2人工缴费 3收费宝 4第三方* info:amount;                      //金额* info:regionIndexCode;       //* info:userIndexCode;          //用户IndexCode* info:selfPosId;                   //自助缴费机ID* info:reduct;                  //优惠金额** incomeOutcomeType_reportType作为key   in/out_收支方式(现金等)作为key* 1_1 1.2* 2_3 5.0*/public class ReportAnalyzeMapper extends TableMapper
{ @Override protected void map(ImmutableBytesWritable key, Result value, Mapper
.Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String amount = ""; for (java.util.Map.Entry
val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列                String str = new String(val.getValue());                if (str != null) {                String columnKey = new String(val.getKey());                if (columnKey.equals("reportType")) {                reportType = str;                }                if (columnKey.equals("incomeOutcomeType")) {                incomeOutcomeType = str;                }                if (columnKey.equals("amount")) {                amount = str;                }            }        } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "1": case "2": case "3": case "4": case "5": //不同收支方式的收入 context.write(new Text("in" + "_" + reportType), new Text(amount)); //全部的收入 context.write(new Text("in" + "_" + "all"), new Text(amount)); case "6": //不同收支方式的支出 context.write(new Text("out" + "_" + reportType), new Text(amount)); //全部的支出 context.write(new Text("out" + "_" + "all"), new Text(amount)); default:break; } } } catch (Exception e) { e.printStackTrace(); } }}

Reduce程序

        TableAnalyzeReduce继承HBase的TableReducer基类,这里需要把最终聚合后的结果写到目标表中,rowkey对应source,行数对应目标表的column:count列。

        把相同的key的值加起来。

        key为in_all累计得出总收入,key为in_1累计得出总收入中现金收入的金额,key为in_4累计得出总收入中支付宝收入的金额,等等,key为out_all累计得出总支出,key为out_1累计得出支出中现金支出的金额,key为out_5累计得出支出中微信支出的金额,等等。

/** 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* 按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。* 1_1 3.7* 2_3 5.0** 收支类型属于收入的,累加* 收支类型属于支出的,累加* 收支类型属于收入的,按收支方式累加* 收支类型属于支出的,按收支方式累加**/public class ReportAnalyzeReduce extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); for (Text val : values) { BigDecimal d = new BigDecimal(val.toString()); i.add(d); } context.write(key, new Text(i.toString())); }}

Combiner

        分析map程序,可以看到map阶段处理后的数据是没有任何合并的,key为数据来源source,value都是1,这样的话,如果直接进入reduce阶段,要分发的数据量还是比较大的,会造成网络负担,针对这个问题,可以在map阶段后,做一下本地reduce,这样进入reduce的阶段的数据量会大大减少。

主程序

        程序接收四个参数,依次为:业务表名称、source字段列族、source字段列名、目标表名,当然也可以使用Apache Commons CLI类解析命令行参数。

        scan.setCaching(500)设置每次读取行数,根据实际情况进行配置,scan.setCacheBlocks(false)告诉HBase本次扫描的数据不要放入缓存中。

 

public class ReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String tableName = "report";        String family = "info";        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/ReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");        try {            String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();            if (otherArgs.length != 1) {                System.exit(1);            }               Scan scan = new Scan();            scan.setCaching(500);            scan.setCacheBlocks(false);            scan.setStartRow("1593071693858_".getBytes());            scan.setStopRow("1593071694491_".getBytes());                    Job job = Job.getInstance(conf, "ReportAnalyzeJob" + tableName);            job.setJarByClass(ReportAnalyzeJob.class);            job.setMapperClass(ReportAnalyzeMapper.class);            job.setReducerClass(ReportAnalyzeReduce.class);            job.setNumReduceTasks(4);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, ReportAnalyzeMapper.class, Text.class, Text.class, job);            FileOutputFormat.setOutputPath(job, new Path(otherArgs[0]));            //FileOutputFormat.setOutputPath(job, new Path("hdfs://mycluster:8020/usr/output1"));            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}
public class TableReportAnalyzeMapper extends Mapper
{     @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { try { String itr = value.toString(); String[] part = itr.split("\t"); if (part.length == 2) { String amount = part[1].toString();         String[] elements = part[0].split("_");        if ("in".equals(elements[0])) { //收入        context.write(new Text(elements[0]), new Text(elements[1] + "_" + amount));        } else if ("out".equals(elements[0])) { //支出        context.write(new Text(elements[0]), new Text(elements[1] + "_" + amount));        } } } catch (Exception e) { e.printStackTrace(); } }}
public class TableReportAnalyzeReduce extends TableReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString()));        for (Text val : values) {        String[] elements = val.toString().split("_");     put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(elements[0]), Bytes.toBytes(elements[1]));    //key:in/out family:statistics  1:2.5  2:5.8 all:7.3     //查询时按key查        } context.write(new Text(key.toString()), put); }}
public class TableReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String targetTable = "report";        System.out.println("targetTable=" + targetTable);        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/TableReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");        try {            Job job = Job.getInstance(conf, "TableReportAnalyzeJob" + targetTable);            job.setJarByClass(TableReportAnalyzeJob.class);            job.setMapperClass(TableReportAnalyzeMapper.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            job.setReducerClass(TableReportAnalyzeReduce.class);            job.setNumReduceTasks(1);            FileInputFormat.addInputPath(job, new Path("hdfs://mycluster:8020/usr/output1"));            TableMapReduceUtil.initTableReducerJob(targetTable, TableReportAnalyzeReduce.class, job);            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}

in_all:amount、in_1:amount、in_4:amount,out_all:amount、out_1:amount、out_5:amount等等,转化成in:all_amount、in:1_amount、in:4_amount,out:all_amount、out:1_amount、out:5_amount等等。

mapper将数据存入hbase,业务方就可以通过key为in来获取收入相关统计值,包括总收入、现金收入、微信收入、支付宝收入等,key为out来获取支出相关统计值,包括总支出、现金支出、微信支出、支付宝支出等。

 

二、按收支方式(现金等)、停车库、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,供收费报表统计列表展示。

        根据收支类型属于收入还是支出来区分,收入则生成in_reportType_incomeOutcomeType_parkId,支出则生成out_reportType_incomeOutcomeType_parkId。

/** 按收支方式(现金等)、停车库、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,供收费报表统计列表展示** info:reportType;                  //收支方式 1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType;           //收支类型* info:parkId;                       //停车场ID* info:reportItem;                   //收费来源* info:amount;                      //金额* info:regionIndexCode;       //* info:userIndexCode;          //用户IndexCode* info:selfPosId;                   //自助缴费机ID* info:reductPay;                  //优惠金额** incomeOutcomeType_reportType_parkId作为key* 1_1_111 1.2* 2_3_222 5.0*/public class ListReportAnalyzeMapper extends TableMapper
{     @Override protected void map(ImmutableBytesWritable key, Result value, Mapper
.Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String parkId = ""; String amount = ""; for (java.util.Map.Entry
val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列                String str = new String(val.getValue());                if (str != null) {                 String columnKey = new String(val.getKey());                if (columnKey.equals("reportType")) {                reportType = str;                }                 if (columnKey.equals("incomeOutcomeType")) {                incomeOutcomeType = str;                }                if (columnKey.equals("parkId")) {                 parkId = str;                }                if (columnKey.equals("amount")) {                 amount = str;                 }                }            } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "1": case "2": case "3": case "4": case "5": //不同收支方式的收入 context.write(new Text("in" + "_" + reportType + "_" + incomeOutcomeType + "_" + parkId), new Text(amount)); break; case "6": case "7": //不同收支方式的支出 context.write(new Text("out" + "_"  + reportType  + "_" + incomeOutcomeType + "_" + parkId), new Text(amount)); break; default:break; } } } catch (Exception e) { e.printStackTrace(); } } }
/** 按收支方式(现金等)、收支类型(包期充值等)分组查询,然后按收支类型分为收入和支出,分别统计不同收支方式的金额,供收费报表统计环形图展示。* 1_1 3.7* 2_3 5.0**/public class ListReportAnalyzeReduce extends TableReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); for (Text val : values) { BigDecimal d = new BigDecimal(val.toString()); i = i.add(d); } String itr = key.toString(); String amount = i.toString();        String[] elements = itr.split("_"); //in_reportType_incomeOutcomeType_parkId   amount        //in_1_1_111        Put put = new Put(Bytes.toBytes(key.toString()));        if (elements.length == 4) {        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("parkId"), Bytes.toBytes(elements[3]));        }     put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("incomeOutcomeType"), Bytes.toBytes(elements[2]));    put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("amount"), Bytes.toBytes(amount));     //key:in_1_1_111 family:list    //查询时按key范围查     context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put); }}
public class ListReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String tableName = "report";        String family = "info";        String targetTbale = "report";        //        System.out.println("tableName=" + tableName + ", family=" + family + ", column=" + column + ", targetTbale="        //                + targetTbale);        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/ListReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");                Scan scan = new Scan();        scan.setCaching(500);        scan.setCacheBlocks(false);        try {            Job job = Job.getInstance(conf, "ListReportAnalyzeJob" + tableName);            job.setJarByClass(ListReportAnalyzeJob.class);            TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, ListReportAnalyzeMapper.class, Text.class, Text.class, job);            TableMapReduceUtil.initTableReducerJob(targetTbale, ListReportAnalyzeReduce.class, job);            job.setMapperClass(ListReportAnalyzeMapper.class);            job.setReducerClass(ListReportAnalyzeReduce.class);            job.setNumReduceTasks(4);            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}

三、收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。

 

/** 收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。* info:reportType;                  //收支方式  1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType;           //收支类型* info:parkId;                         //停车场ID* info:reportItem;                   //收费来源* info:amount;                      //金额* info:regionIndexCode;       //* info:userIndexCode;          //用户IndexCode* info:selfPosId;                   //自助缴费机ID* info:reductPay;                  //优惠金额** reportType_reportItem作为key* 1_1 1.2* 2_3 5.0*/public class OneReportAnalyzeMapper extends TableMapper
{     @Override protected void map(ImmutableBytesWritable key, Result value, Mapper
.Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String reportItem = ""; String amount = ""; for (java.util.Map.Entry
val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列                String str = new String(val.getValue());                if (str != null) {                 String columnKey = new String(val.getKey());                if (columnKey.equals("reportType")) {                reportType = str;                }                if (columnKey.equals("incomeOutcomeType")) {                 incomeOutcomeType = str;                }                if (columnKey.equals("reportItem")) {                 reportItem = str;                }                 if (columnKey.equals("amount")) {                amount = str;                }                }            } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "5": //不同收支方式的不同收费来源的岗亭缴费 context.write(new Text("terminal" + "_" + reportType + "_" + reportItem), new Text(amount)); //全部收支方式的不同收费来源的岗亭缴费 context.write(new Text("terminal" + "_" + "all" + "_" + reportItem), new Text(amount)); break; default:break; } } } catch (Exception e) { e.printStackTrace(); } }}
/** 收支类型为临时车缴费,按收支方式(现金等)、收费来源(岗亭缴费等)分组查询,然后将不同收费来源的数据展示在一条数据上,供临时车缴费统计展示。* terminal_reportType_reportItem amount*/public class OneReportAnalyzeReduce extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); for (Text val : values) { BigDecimal d = new BigDecimal(val.toString()); i = i.add(d); } context.write(key, new Text(i.toString())); }}
public class OneReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String tableName = "report";        String family = "info";        //        String column = args[2];        //        System.out.println("tableName=" + tableName + ", family=" + family + ", column=" + column + ", targetTbale="        //                + targetTbale);        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/OneReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");        Scan scan = new Scan();        scan.setCaching(500);        scan.setCacheBlocks(false);        scan.setStartRow("1593071693858_".getBytes());        scan.setStopRow("1593071694491_".getBytes());        try {            Job job = Job.getInstance(conf, "OneReportAnalyzeJob" + tableName);            job.setJarByClass(OneReportAnalyzeJob.class);            job.setMapperClass(OneReportAnalyzeMapper.class);            job.setReducerClass(OneReportAnalyzeReduce.class);            job.setNumReduceTasks(4);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, OneReportAnalyzeMapper.class, Text.class, Text.class, job);            FileOutputFormat.setOutputPath(job, new Path("hdfs://mycluster:8020/usr/output2"));            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}
public class TableOneReportAnalyzeMapper extends Mapper
{     @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { try { String itr = value.toString(); String[] part = itr.split("\t"); if (part.length == 2) { String amount = part[1].toString();         String[] elements = part[0].split("_");        context.write(new Text(elements[0] + "_" + elements[1]), new Text(elements[2] + "_" + amount));         //terminal_1/all 1_2.5 } } catch (Exception e) { e.printStackTrace(); } }}
/** terminal_1/all 1_2.5*/public class TableOneReportAnalyzeReduce extends TableReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); BigDecimal i = new BigDecimal(0);        for (Text val : values) {        String[] elements = val.toString().split("_");     put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(elements[0]), Bytes.toBytes(elements[1]));     i = i.add(new BigDecimal(elements[1].toString()));     //key:terminal_1/all family:statistics  1:2.5  2:5.8     //查询时按key查        }        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("all"), Bytes.toBytes(i)); context.write(new Text(key.toString()), put); // System.out.println(key.toString() + "\t" + ); }}
public class TableOneReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String targetTbale = "report";        System.out.println("targetTable=" + targetTbale);        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/TableOneReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");        try {            Job job = Job.getInstance(conf, "TableOneReportAnalyzeJob" + targetTbale);            job.setJarByClass(TableOneReportAnalyzeJob.class);            job.setMapperClass(TableOneReportAnalyzeMapper.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            job.setReducerClass(TableOneReportAnalyzeReduce.class);            job.setNumReduceTasks(1);            FileInputFormat.addInputPath(job, new Path("hdfs://mycluster:8020/usr/output2"));            TableMapReduceUtil.initTableReducerJob(targetTbale, TableOneReportAnalyzeReduce.class, job);            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}

四、收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。

 

/** 收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。** info:reportType;                  //收支方式  1现金 2账户 3第三方 4支付宝 5微信 6无感支付* info:incomeOutcomeType;           //收支类型* info:parkId;                         //停车场ID* info:reportItem;                   //收费来源* info:amount;                      //金额* info:regionIndexCode;       //* info:userIndexCode;          //用户IndexCode* info:selfPosId;                   //自助缴费机ID* info:reductPay;                  //优惠金额** terminal_reportType_reportItem_parkId作为key* terminal_1_111 1.2* terminal_3_222 5.0*/public class OneListReportAnalyzeMapper extends TableMapper
{     @Override protected void map(ImmutableBytesWritable key, Result value, Mapper
.Context context) throws IOException, InterruptedException { try { String reportType = ""; String incomeOutcomeType = ""; String parkId = ""; String reportItem = ""; String amount = ""; String reduce = ""; for (java.util.Map.Entry
val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){//遍历每一列                String str = new String(val.getValue());                if (str != null) {                String columnKey = new String(val.getKey());                 if (columnKey.equals("reportType")) {                reportType = str;                }                if (columnKey.equals("incomeOutcomeType")) {                incomeOutcomeType = str;                 }                if (columnKey.equals("reportItem")) {                 reportItem = str;                }                if (columnKey.equals("parkId")) {                 parkId = str;                }                if (columnKey.equals("amount")) {                amount = str;                }                 if (columnKey.equals("reduce")) {                reduce = str;                }            }            } if (null != incomeOutcomeType) { switch (incomeOutcomeType) { case "5": //不同收支方式的收入 context.write(new Text("terminal" + "_" + reportType + "_" + parkId + "_" + reportItem), new Text(amount + "_" + reduce)); //全部 context.write(new Text("terminal" + "_" + "all" + "_" + parkId + "_" + reportItem), new Text(amount + "_" + reduce)); break; default:break; } } } catch (Exception e) { e.printStackTrace(); } }}
/** 收支类型为临时车缴费,按收支方式(现金等)、停车库、收费来源(岗亭缴费等)分组查询,然后将同一停车场不同收费来源的数据展示在一条数据上,供临时车缴费列表展示。* terminal_reportType_parkId_reportItem  amount_reduce*/public class OneListReportAnalyzeReduce extends Reducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { BigDecimal i = new BigDecimal(0); BigDecimal iReduce = new BigDecimal(0); for (Text val : values) { String[] vals = val.toString().split("_"); BigDecimal d = new BigDecimal(vals[0].toString()); i = i.add(d); if (vals.length == 2) { BigDecimal dReduce = new BigDecimal(vals[1].toString()); iReduce = iReduce.add(dReduce); } } context.write(key, new Text(i.toString() + "_" + iReduce.toString())); }}
public class OneListReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String tableName = "report";        String family = "info";        //        System.out.println("tableName=" + tableName + ", family=" + family + ", column=" + column + ", targetTbale="        //                + targetTbale);        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/OneListReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");        Scan scan = new Scan();        scan.setCaching(500);        scan.setCacheBlocks(false);        scan.setStartRow("1593071693858_".getBytes());        scan.setStopRow("1593071694491_".getBytes());        try {            Job job = Job.getInstance(conf, "OneListReportAnalyzeJob" + tableName);            job.setJarByClass(OneListReportAnalyzeJob.class);            job.setMapperClass(OneListReportAnalyzeMapper.class);            job.setReducerClass(OneListReportAnalyzeReduce.class);            job.setNumReduceTasks(4);            job.setOutputKeyClass(Text.class);            job.setOutputValueClass(Text.class);            TableMapReduceUtil.initTableMapperJob(Bytes.toBytes(tableName), scan, OneListReportAnalyzeMapper.class, Text.class, Text.class, job);            FileOutputFormat.setOutputPath(job, new Path("hdfs://mycluster:8020/usr/output3"));            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}
public class TableOneListReportAnalyzeMapper extends Mapper
{     @Override protected void map(LongWritable key, Text value, Mapper
.Context context) throws IOException, InterruptedException { try { String itr = value.toString(); String[] part = itr.split("\t"); if (part.length == 2) { String amount_reduce = part[1].toString();         String[] elements = part[0].split("_");         context.write(new Text(elements[0] + "_" + elements[1] + "_" + elements[2]), new Text(elements[3] + "_" + amount_reduce));        //terminal_1/all_111 1_1.2_2.5 } } catch (Exception e) { e.printStackTrace(); } }}
/** terminal_1/all_111 1_1.2_2.5*/public class TableOneListReportAnalyzeReduce extends TableReducer
{ @Override protected void reduce(Text key, Iterable
values, Context context) throws IOException, InterruptedException { Put put = new Put(Bytes.toBytes(key.toString())); BigDecimal i = new BigDecimal(0); BigDecimal reduce = new BigDecimal(0);        for (Text val : values) {        String[] elements = val.toString().split("_");     put.addColumn(Bytes.toBytes("info"), Bytes.toBytes(elements[0]), Bytes.toBytes(elements[1]));     i = i.add(new BigDecimal(elements[1].toString()));     reduce = reduce.add(new BigDecimal(elements[2].toString()));     //key:terminal_1/all_111 family:list  1:2.5  2:5.8 reduce:2.5     //查询时按key范围查        }        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("all"), Bytes.toBytes(i));        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("reduce"), Bytes.toBytes(reduce)); context.write(new Text(key.toString()), put); }}
public class TableOneListReportAnalyzeJob {    public static void main(String[] args) throws ClassNotFoundException, InterruptedException {        String targetTbale = "report";        System.out.println("targetTbale=" + targetTbale);        Configuration conf = HBaseConfiguration.create();        conf.set("mapred.jar", "/Users/Desktop/TableOneListReportAnalyzeJob.jar");        conf.set("hbase.zookeeper.quorum", "node1,node2,node3");        try {            Job job = Job.getInstance(conf, "TableOneListReportAnalyzeJob" + targetTbale);            job.setJarByClass(TableOneListReportAnalyzeJob.class);            job.setMapperClass(TableOneListReportAnalyzeMapper.class);            job.setMapOutputKeyClass(Text.class);            job.setMapOutputValueClass(Text.class);            job.setReducerClass(TableOneListReportAnalyzeReduce.class);            job.setNumReduceTasks(1);            FileInputFormat.addInputPath(job, new Path("hdfs://mycluster:8020/usr/output3"));            TableMapReduceUtil.initTableReducerJob(targetTbale, TableOneListReportAnalyzeReduce.class, job);            System.exit(job.waitForCompletion(true) ? 0 : 1);        } catch (IOException e) {            e.printStackTrace();        }    }}

性能测试

制造虚拟数据:可先创建tsv格式文件,再使用MapReduce将数据导入到HBASE

1 在本地创建一个tsv格式的文件:fruit.tsv

    1593400710000    1    1    park1    1    10.1

    1593400710001    1    2                       5.5

    1593400710002    1    3    park1    2    6.7

    1593400710002    1    5    park1    3    8.8

    ......

2 创建HBase表(本例中之前已创建)

    hbase(main):001:0> create 'report','info'

3 在HDFS中创建input_report文件夹并上传report.tsv文件

    $ /usr/hadoop-3.2.1/bin/hdfs dfs -mkdir /input_report/

    $ /usr/hadoop-3.2.1/bin/hdfs dfs -put report.tsv /input_report/

4 执行MapReduce到HBase的report表中

    $ /opt/module/hadoop-2.7.2/bin/yarn jar lib/hbase-server-1.3.1.jar importtsv \

    -Dimporttsv.columns=HBASE_ROW_KEY,info:reportType,info:incomeOutcomeType report\

    hdfs://mycluster:9000/input_report

转载地址:http://ayfgi.baihongyu.com/

你可能感兴趣的文章
如何制作patch文件?
查看>>
玩玩DDNS
查看>>
svn使用笔记
查看>>
遇见的问题记录
查看>>
makefile
查看>>
数据抓包
查看>>
tcpdump的使用
查看>>
iptables与ebtables相关
查看>>
iptables基础知识详解
查看>>
什么是大公司病(太形象了)
查看>>
linux驱动程序-读书笔记(一)scull函数详解
查看>>
网络协议中的MTU和MSS
查看>>
iptables手册阅读笔记
查看>>
kill 和killall----杀死进程
查看>>
c++多线程编程
查看>>
绿盟扫描操作指导
查看>>
理解链路本地址与站点本地地址
查看>>
/proc/mtd 各个参数含义 -- linux内核
查看>>
linux nand flash常用命令
查看>>
NESSUS扫描操作指导
查看>>