最近在使用starRocks,记录一些临时的操作技巧,防止遗忘。

1. 创建表

CREATE TABLE IF NOT EXISTS ODS.T_TEST(pk_day date,pool_address string,code string comment '唯一主键',test1 string,test2 string,test3 string,pk_year varchar(4),pk_month varchar(7))primary KEY(pk_day,pool_address,code)PARTITION BY range(pk_day)(PARTITION p20230916 VALUES LESS THAN ("2023-09-16"),PARTITION p20230917 VALUES LESS THAN ("2023-09-17"),PARTITION p20230918 VALUES LESS THAN ("2023-09-18"),PARTITION p20230919 VALUES LESS THAN ("2023-09-19"),PARTITION p20230920 VALUES LESS THAN ("2023-09-20"),PARTITION p20230921 VALUES LESS THAN ("2023-09-21"))DISTRIBUTED BY HASH(pool_address)PROPERTIES("dynamic_partition.enable" = "true","dynamic_partition.time_unit" = "DAY","dynamic_partition.start" = "-3","dynamic_partition.end" = "3","dynamic_partition.prefix" = "p","dynamic_partition.time_zone" = "UTC","dynamic_partition.buckets" = "4");

指定了动态分区表,主键的三个字段必须放到前面3个。
PARTITION BY range(pk_day)中间必须创建几个分区。
而且根据日期分区的字段必须设置为Date类型,即使该字段是”2023-09-20″这样的字符串类型。

2. rotineLoader

2.1 创建脚本导入

数据来源Kafka:

# 创建导入脚本CREATE ROUTINE LOAD ODS.TEST ON TESTWHERE pk_day is not null and pk_day >= '2023-09-18'PROPERTIES("desired_concurrent_number"="12","format" ="json","jsonpaths" ="[\"$.pk_day\", \"$.pool_address\", \"$.code\", \"$.test1\", \"$.test2\", \"$.test3\", \"$.pk_year\",\"$.pk_month\"]" )FROM KAFKA("kafka_broker_list" ="localhost:9092","kafka_topic" = "ods_test","property.kafka_default_offsets" = "OFFSET_BEGINNING","property.group.id" = "g1");# 停止脚本STOP ROUTINE LOAD FOR ODS.TEST;

如果该脚本需要修改,先停止脚本。
如果json字段和表的字段一致,则可以不使用jsonpaths属性一个个字段解析出来。
可以在on后面加筛选条件过滤部分数据。

2.2 其他命令

# 查看在运行的脚本show routine load;# 查看分区show partitions from ODS.TEST;# 手工添加分区(必须先停止设置动态分区,然后才能添加)ALTER TABLE ODS.TEST set("dynamic_partition.enable" = "false");ALTER TABLE ODS.TEST ADD PARTITION p20230917 VALUES LESS THAN ("2023-09-17") DISTRIBUTED BY HASH(pool_address);ALTER TABLE ODS.TEST set("dynamic_partition.enable" = "true");