Flink读写Doris操作介绍

​ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改、删除) Doris 中存储的数据。可以将 Doris 表映射为 DataStream 或者 Table。

  • Flink操作Doris修改和删除只支持在 Unique Key 模型上

1. 准备开发环境

  • pom.xml加入依赖
<dependency><groupId>org.apache.doris</groupId><artifactId>flink-doris-connector-1.13_2.12</artifactId><version>1.0.3</version></dependency>
  • 创建测试库测试表
-- 切测试库use test_db;-- 创建测试表flinktestCREATE TABLE flinktest(siteid INT DEFAULT '10',citycode SMALLINT,username VARCHAR(32) DEFAULT '',pv BIGINT SUM DEFAULT '0')AGGREGATE KEY(siteid, citycode, username)DISTRIBUTED BY HASH(siteid) BUCKETS 10PROPERTIES("replication_num" = "1");-- 插入样例数据insert into flinktest values(1,1,'jim',2),(2,1,'grace',2),(3,2,'tom',2),(4,3,'bush',3),(5,3,'helen',3);-- 查看表数据情况select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|1 |1 | jim|2 ||5 |3 | helen|3 ||4 |3 | bush |3 ||3 |2 | tom|2 ||2 |1 | grace|2 |+--------+----------+----------+------+
  • Doris 和 Flink 列类型映射关系
Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
DATETIMETIMESTAMP
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

2. Flink-DataStream读Doris

代码示例:

package com.zenitera.bigdata.doris;import org.apache.doris.flink.cfg.DorisStreamOptions;import org.apache.doris.flink.datastream.DorisSourceFunction;import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;public class Flink_stream_read_doris {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties props = new Properties();props.setProperty("fenodes", "hdt-dmcp-ops01:8130");props.setProperty("username", "root");props.setProperty("password", "123456");props.setProperty("table.identifier", "test_db.flinktest");env.addSource(new DorisSourceFunction(new DorisStreamOptions(props), new SimpleListDeserializationSchema())).print();try {env.execute();} catch (Exception e) {e.printStackTrace();}}}/*代码控制台输出:[4, 3, bush, 3][2, 1, grace, 2][1, 1, jim, 2][5, 3, helen, 3][3, 2, tom, 2] */

3. Flink写Doris

Flink 读写 Doris 数据主要有两种方式

  • DataStream
  • SQL

3.1 Flink-DataStream以 JSON 数据 写到Doris

代码示例:

package com.zenitera.bigdata.doris;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisSink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import java.util.Properties;/** * 使用 Flink 将 JSON 数据 写到Doris数据库 */public class Flink_stream_write_doris_json {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);Properties pro = new Properties();pro.setProperty("format", "json");pro.setProperty("strip_outer_array", "true");env.fromElements("{\"siteid\":\"10\", \"citycode\": \"1001\",\"username\": \"ww\",\"pv\":\"100\"}").addSink(DorisSink.sink(new DorisExecutionOptions.Builder().setBatchIntervalMs(2000L).setEnableDelete(false).setMaxRetries(3).setStreamLoadProp(pro).build(),new DorisOptions.Builder().setFenodes("hdt-dmcp-ops01:8130").setUsername("root").setPassword("123456").setTableIdentifier("test_db.flinktest").build()));try {env.execute();} catch (Exception e) {e.printStackTrace();}}}/*代码执行前: 5 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|1 |1 | jim|2 ||5 |3 | helen|3 ||4 |3 | bush |3 ||3 |2 | tom|2 ||2 |1 | grace|2 |+--------+----------+----------+------+代码执行后: 6 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|2 |1 | grace|2 ||3 |2 | tom|2 ||5 |3 | helen|3 ||1 |1 | jim|2 || 10 | 1001 | ww |100 ||4 |3 | bush |3 |+--------+----------+----------+------+ */

3.2 Flink-DataStream以 RowData 数据 写Doris

代码示例:

package com.zenitera.bigdata.doris;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONObject;import org.apache.doris.flink.cfg.DorisExecutionOptions;import org.apache.doris.flink.cfg.DorisOptions;import org.apache.doris.flink.cfg.DorisSink;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.data.GenericRowData;import org.apache.flink.table.data.StringData;import org.apache.flink.table.types.logical.*;public class Flink_stream_write_doris_rowdata {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);LogicalType[] types = {new IntType(), new SmallIntType(), new VarCharType(), new BigIntType()};String[] fields = {"siteid", "citycode", "username", "pv"};env.fromElements("{\"siteid\":\"100\", \"citycode\": \"1002\",\"username\": \"wang\",\"pv\":\"100\"}").map(json -> {JSONObject obj = JSON.parseObject(json);GenericRowData rowData = new GenericRowData(4);rowData.setField(0, obj.getIntValue("siteid"));rowData.setField(1, obj.getShortValue("citycode"));rowData.setField(2, StringData.fromString(obj.getString("username")));rowData.setField(3, obj.getLongValue("pv"));return rowData;}).addSink(DorisSink.sink(fields,types,new DorisExecutionOptions.Builder().setBatchIntervalMs(2000L).setEnableDelete(false).setMaxRetries(3).build(),new DorisOptions.Builder().setFenodes("hdt-dmcp-ops01:8130").setUsername("root").setPassword("123456").setTableIdentifier("test_db.flinktest").build()));try {env.execute();} catch (Exception e) {e.printStackTrace();}}}/*代码执行前: 6 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|2 |1 | grace|2 ||3 |2 | tom|2 ||5 |3 | helen|3 ||1 |1 | jim|2 || 10 | 1001 | ww |100 ||4 |3 | bush |3 |+--------+----------+----------+------+代码执行后: 7 rows select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|1 |1 | jim|2 ||2 |1 | grace|2 ||3 |2 | tom|2 ||5 |3 | helen|3 || 10 | 1001 | ww |100 ||100 | 1002 | wang |100 ||4 |3 | bush |3 |+--------+----------+----------+------+ */

3.3 Flink-SQL 方式写Doris

Doris测试表:

use test_db;truncate table flinktest;insert into flinktest values(1,1,'aaa',1),(2,2,'bbb',2),(3,3,'ccc',3);select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|2 |2 | bbb|2 ||1 |1 | aaa|1 ||3 |3 | ccc|3 |+--------+----------+----------+------+3 rows in set (0.01 sec)

Flink-SQL代码示例:

package com.zenitera.bigdata.doris;import lombok.AllArgsConstructor;import lombok.Data;import lombok.NoArgsConstructor;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_SQL_doris {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("create table flink_0518(" +" siteid int, " +" citycode int, " +" username string, " +" pv bigint " +")with(" +"'connector' = 'doris', " +"'fenodes' = 'hdt-dmcp-ops01:8130', " +"'table.identifier' = 'test_db.flinktest', " +"'username' = 'root', " +"'password' = '123456' " +")");tEnv.executeSql("insert into flink_0518(siteid, citycode, username, pv) values(4, 4, 'wangting', 4) ");}@Data@NoArgsConstructor@AllArgsConstructorpublic static class Flink_0518 {private Integer siteid;private Integer citycode;private String username;private Long pv;}}

执行代码,执行完成后查看Doris对应表数据进行验证:

select * from flinktest;+--------+----------+----------+------+| siteid | citycode | username | pv |+--------+----------+----------+------+|3 |3 | ccc|3 ||2 |2 | bbb|2 ||1 |1 | aaa|1 ||4 |4 | wangting |4 |+--------+----------+----------+------+4 rows in set (0.01 sec)

3.4 Flink-SQL 方式读Doris

package com.zenitera.bigdata.doris;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class Flink_SQL_doris_read {public static void main(String[] args) {Configuration conf = new Configuration();conf.setInteger("rest.port", 2000);StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);env.setParallelism(1);StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);tEnv.executeSql("create table flink_0520(" +" siteid int, " +" citycode SMALLINT, " +" username string, " +" pv bigint " +")with(" +"'connector' = 'doris', " +"'fenodes' = 'hdt-dmcp-ops01:8130', " +"'table.identifier' = 'test_db.flinktest', " +"'username' = 'root', " +"'password' = '123456' " +")");tEnv.sqlQuery("select * from flink_0520").execute().print();}}/* 控制台输出信息:+----+-------------+----------+---------------+---------+| op |siteid | citycode |username |pv |+----+-------------+----------+---------------+---------+| +I | 1 |1 | aaa | 1 || +I | 3 |3 | ccc | 3 || +I | 2 |2 | bbb | 2 || +I | 4 |4 |wangting | 4 |+----+-------------+----------+---------------+---------+4 rows in set*/