1.场景分析

现有需求需要将elasticsearch的备份至hdfs存储,根据以上需求,使用logstash按照天级别进行数据的同步

2.重难点

  • 数据采集存在时间漂移问题,数据保存时使用的是采集时间而不是数据生成时间
  • 采用webhdfs无法对文件大小进行设置
  • 解决@timestamp时区问题

3.问题解决

3.1 安装webhdfs插件

./bin/logstash-plugin install logstash-output-webhdfs

3.2 logstash配置

input{elasticsearch{hosts => "xxxx:9200"index => "xxxx"#自定义查询query => '{"query": {"range": {"create_time":{"gte": 1704668760000,"lte": 1704668820000}}}}'size => 10000scroll => "5m"slices => 1user => "xxx"password => "xxxx"}}filter {date {#增加@timestamp,并将记录产生时间赋值给@timestamp,时间处理默认是按照@timestamp的时间match => ["create_time","UNIX_MS"]timezone => "Asia/Shanghai"target => "@timestamp"}#增加一个timestamp,对@timestamp时间增加8小时ruby {code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"}#将timestamp赋值给@timestamp ruby { code => "event.set('@timestamp',event.get('timestamp'))" } #设置导入到hdfs的文件数量,需要增加一个字段,当然也可以用时间来控制文件数量,但是只有固定的几个数字,此处按照3个文件控制 ruby {code => "event.set('sync_bucket', event.get('created')%3)"} #删除上处增加的临时字段timestamp mutate {remove_field => ["timestamp"] }}output {webhdfs {#高可哟集群需要配置standbystandby_host => "xxx"standby_port => 9870host => "xxxx"port => 9870path => "/hadoop/test/part_day=%{+YYYYMMdd}/logstash-%{sync_bucket}.log"#按照时间控制文件生成数量,+a是上下午的意思#path => "/hadoop/dm_dw/on/ods/ods_cc_es_initLogPro_di/part_day=%{+YYYYMMdd}/logstash-%{+a}.log"user => "hadoop"compression => "gzip"idle_flush_time => 60codec => "jsonlines"}}

logstash时间处理官网:https://www.elastic.co/guide/en/logstash/current/plugins-filters-date.html#plugins-filters-date-match
比较不错的logstash介绍网站:https://doc.yonyoucloud.com/doc/logstash-best-practice-cn/get_start/full_config.html