flink写入hbase-程序员宅基地

技术标签: flink  hbase  

参考:  https://www.cnblogs.com/swordfall/p/10527423.html

flink 流处理写入数据到hbase. 采用的是批量写入(500条数据写入一次)。

 

HBaseWriter.java

package com.flink;

import com.flink.model.DeviceData;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 *
 * 写入HBase
 * 继承RichSinkFunction重写父类方法
 *
 * 写入hbase时500条flush一次, 批量插入, 使用的是writeBufferSize
 */
class HBaseWriter extends RichSinkFunction<DeviceData>{
    private static final Logger logger = LoggerFactory.getLogger(HBaseWriter.class);

    private static org.apache.hadoop.conf.Configuration configuration;
    private static Connection connection = null;
    private static BufferedMutator mutator;
    private static int count = 0;

    @Override
    public void open(Configuration parameters) throws Exception {
        configuration = HBaseConfiguration.create();
        configuration.set("hbase.master", "192.168.3.101:60020");
        configuration.set("hbase.zookeeper.quorum", "192.168.3.101");
        configuration.set("hbase.zookeeper.property.clientPort", "2181");
        try {
            connection = ConnectionFactory.createConnection(configuration);
        } catch (IOException e) {
            e.printStackTrace();
        }
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf("t1"));
        params.writeBufferSize(2 * 1024 * 1024);
        mutator = connection.getBufferedMutator(params);
    }

    @Override
    public void close() throws IOException {
        if (mutator != null) {
            mutator.close();
        }
        if (connection != null) {
            connection.close();
        }
    }

    @Override
    public void invoke(DeviceData values, Context context) throws Exception {
        //Date 1970-01-06 11:45:55  to 445555000
        long unixTimestamp= 0;
        try {
            String gatherTime = values.GatherTime;
            //毫秒和秒分开处理
            if (gatherTime.length() > 20) {
                long ms = Long.parseLong(gatherTime.substring(20, 23));
                Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
                unixTimestamp = date.getTime() + ms;
            } else {
                Date date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(gatherTime);
                unixTimestamp = date.getTime();
            }
        } catch (ParseException e) {
            e.printStackTrace();
        }
        String RowKey = values.MachID + String.valueOf(unixTimestamp);
        String Key = values.OperationValue;
        String Value = values.OperationData;
        System.out.println("Column Family=f1,  RowKey=" + RowKey + ", Key=" + Key + " ,Value=" + Value);
        Put put = new Put(RowKey.getBytes());
        put.addColumn("f1".getBytes(), Key.getBytes(), Value.getBytes());
        mutator.mutate(put);
        //每满500条刷新一下数据
        if (count >= 500){
            mutator.flush();
            count = 0;
        }
        count = count + 1;
    }
}

Main.java

//写入hbase
dataStream.addSink(new HBaseWriter());
DeviceData.java
package com.flink.model;

/**
 * 设备数据的数据结构
 */
class DeviceData {
    String compID;
    String machID;
    String Type;
    String gateMac;
    String operationValue;
    String operationData;
    String gatherTime;
}

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012447842/article/details/90203512

智能推荐

微信小程序云开发之创建数据库表和初始化云环境_微信云开发 自动创建表-程序员宅基地

文章浏览阅读1w次,点赞4次,收藏59次。微信小程序云开发创建数据库表1.进入小程序后点击云开发,开启云开发功能。2.开启云开发功能后会进入云开发控制台后点击左上角的数据库,后点击下面的“+”创建数据表。表名随便取,你开心就好,我的表名叫testDatabase。2.创建完表后,点击右侧的“+添加记录”会弹出一个弹窗,这个弹窗就是填写你所需要的字段。看你需要自己定义字段名,字段类型和值。3.一次只能创建一条数据,且每次创建每一条数据的字段名和类型必须一样。创建完后每一条数据都会多一个“_id”字段,这是云开发数据库自动给_微信云开发 自动创建表

使用选项卡创建CSS3 / jQuery跨浏览器下拉菜单-程序员宅基地

文章浏览阅读126次。CSS3/jQuery dropdown menu with tabs tutorial. This is our seventh CSS3 menu. Today we will make dropdown menu with parental tabs (as first level). Part of the work we pass on to the shoulders of jQuer..._./images/tabs/left_memu_1.png

Hypack 2016-2018设置使用测试与相关下载_hypack2016-程序员宅基地

文章浏览阅读2.3k次,点赞2次,收藏9次。Hypack 2016-2018使用测试与相关下载1、hypack 2016、hypack 2017、hypack2018通过现场使用测试,发现64位版本破解无效,32位破解成功。2、Hypack 6.2以前的版本,导航硬件典型配置库文件为nmea.DLL。(配置为nmea.DLL输出WGS84与本地54坐标XY出现错误)3、Hypack 2016-2018版本导航硬件配置库文件应该采用G..._hypack2016

CiteSpace使用入门教程_citespace怎么用-程序员宅基地

文章浏览阅读8.1k次,点赞4次,收藏17次。CiteSpace软件的使用及应用_citespace怎么用

这6种最佳移动自动化测试工具你知道吗?_移动应用测试工具有哪些-程序员宅基地

文章浏览阅读1k次。它还带有一个客户端库的特性,可以托管 Java、Python、C# 等中的锅炉代码,以帮助 QA 更快、更有效地开发测试脚本。testRigor 是超级可靠的,因为它为人类各自创建了一个测试套件,也就是说,它不依赖于 XPath 之类的源。它是一个基于云的移动自动化测试工具,允许用户在各种设备和操作系统版本上以连续的时间间隔快速运行测试。总而言之,它是一个很好的工具,但既不是免费的也不是开源的。ZAPTEST 的卖点是它的投资回报率计算器,它允许像您这样的企业计算这种自动化工具的投资回报率。_移动应用测试工具有哪些

分类预测 | Matlab实现CNN-LSTM-Mutilhead-Attention卷积神经网络-长短期记忆网络融合多头注意力机制多特征分类预测_nn.multiheadattention 图像分类-程序员宅基地

文章浏览阅读1.1k次,点赞21次,收藏12次。分类预测 | Matlab实现CNN-LSTM-Mutilhead-Attention卷积神经网络-长短期记忆网络融合多头注意力机制多特征分类预测_nn.multiheadattention 图像分类

随便推点

vue3的代码改成vue2的转换_vue3组件 转 vue2-程序员宅基地

文章浏览阅读8.7k次。vue3<!DOCTYPE html><html><head><meta charset="utf-8"><title>Vue的方法_侠课岛(9xkd.com)</title><script src="https://unpkg.com/vue@next"></script></head><body> <div id="hello-vue" class="_vue3组件 转 vue2

vue2.0与支付个人总结-程序员宅基地

文章浏览阅读275次。最近在使用vue写webapp,app中要求可以实现线上支付,研究了微信H5支付与支付宝H5支付。其中微信H5支付处在内测阶段,需要申请,按照格式写了邮件七个工作日也没得到回复邮件,据说微信H5支付对于单量和交易额有要求,满足要求后才有很大几率开通。支付宝H5支付相对申请较为简单,人工技术客服强大,基本能解决很多问题,不得不说这点还是阿里..._vue2.0 调用app支付

yml配置文件中map的配置-程序员宅基地

文章浏览阅读6.5k次。yml中map的配置文件

python中摄氏度的符号咋打_linux下怎么方便的输入度数符号 °-程序员宅基地

文章浏览阅读5k次。你的位置:问答吧-> Linux 入门-> 问题详情linux下怎么方便的输入度数符号 °在windows下可以用Alt 0176输入,在MAC下也有方便的方法可以输入,在linux下呢?参见:http://anonymouse.org/cgi-bin/anon-w...egree_(symbol)作者: mu..._linux 输入法 摄氏度

Java面试题之接口和抽象类的区别_java面试题 接口和抽象类的区别-程序员宅基地

文章浏览阅读706次。下面一些东西可能会让你不敢相信,这次分为jdk8和jdk9来测试的,大家也可以用不同的jdk尝试一下。先说一下基本的定义,在说一下特别的东西。有疑惑的同学一定要去自己手动的试验一下,加深印象,面试官比如说接口中的方法不能有方法体啊什么的,一定坚定告诉他是可以的!一个类可以同时实现多个接口 一个抽象类只能继承一个类(单继承)接口和接口之间支持多继承 类和类之间只能单继承。抽象类是半抽象的 接口是完全抽象的。_java面试题 接口和抽象类的区别

Machine and Deep Learning with Python-程序员宅基地

文章浏览阅读407次。Machine and Deep Learning with PythonEducationTutorials and coursesSupervised learning superstitions cheat sheetIntroduction to Deep Learning with PythonHow to implement a neural network..._pyqtgraph eeg

推荐文章

热门文章

相关标签