Elk+Filebeat+Kafka实现日志收集(本机nginx)

部署Zookeeper

1.实验组件

#准备3台服务器做Zookeeper集群20.0.0.1020.0.0.2020.0.0.30

2.安装前准备

#关闭防火墙systemctl stop firewalldsystemctl disable firewalldsetenforce 0#安装JDKyum install -y java-1.8.0-openjdk java-1.8.0-openjdk-develjava -version#将apache-zookeeper-3.5.7-bin.tar.gz压缩包上传至/opt目录

3.安装Zookeeper

#三台服务器一齐操作cd /opttar -zxvf apache-zookeeper-3.5.7-bin.tar.gzmv apache-zookeeper-3.5.7-bin /opt/zookeeper#修改配置文件cd /opt/zookeeper/conf/cp zoo_sample.cfg zoo.cfgvim zoo.cfg--2--tickTime=2000#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒--5--initLimit=10#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s--8--syncLimit=5#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer--12--修改dataDir=/opt/zookeeper/data#指定保存Zookeeper中的数据的目录,目录需要单独创建--添加--dataLogDir=/opt/zookeeper/logs#指定存放日志的目录,目录需要单独创建--15--clientPort=2181#客户端连接端口--末尾添加集群信息--server.1=20.0.0.10:3188:3288server.2=20.0.0.20:3188:3288server.3=20.0.0.30:3188:3288

#在每个节点上创建数据目录和日志目录mkdir /opt/zookeeper/datamkdir /opt/zookeeper/logs#在每个节点的dataDir指定的目录下创建一个 myid 的文件,不同节点分配1、2、3echo 1 > /opt/zookeeper/data/myidecho 2 > /opt/zookeeper/data/myidecho 3 > /opt/zookeeper/data/myid#配置 Zookeeper 启动脚本vim /etc/init.d/zookeeper#!/bin/bash#chkconfig:2345 20 90#description:Zookeeper Service Control ScriptZK_HOME='/opt/zookeeper'case $1 instart)echo "---------- zookeeper 启动 ------------"$ZK_HOME/bin/zkServer.sh start;;stop)echo "---------- zookeeper 停止 ------------"$ZK_HOME/bin/zkServer.sh stop;;restart)echo "---------- zookeeper 重启 ------------"$ZK_HOME/bin/zkServer.sh restart;;status)echo "---------- zookeeper 状态 ------------"$ZK_HOME/bin/zkServer.sh status;;*)echo "Usage: $0 {start|stop|restart|status}"esac#设置开机自启chmod +x /etc/init.d/zookeeperchkconfig --add zookeeper#分别启动 Zookeeperservice zookeeper start#查看当前状态service zookeeper status

部署Kafka(3.4.1版本)

1.安装Kafka

cd /opt--上传kafka_2.13-3.4.1.tgz--tar -xf kafka_2.13-3.4.1.tgzmv kafka_2.13-3.4.1 kafkacd kafka/config/cp server.properties server.properties.bakvim server.properties--24--broker.id=1#broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=2、broker.id=3--34--listeners=PLAINTEXT://20.0.0.10:9092#每台服务器分别为10、20、30,不用加地址映射--62--log.dirs=/var/log/kafka#kafka运行日志存放的路径,也是数据存放的路径--125--zookeeper.connect=20.0.0.10:2181,20.0.0.20:2181,20.0.0.30:2181#配置连接Zookeeper集群地址#修改全局配置vim /etc/profile--添加--export KAFKA_HOME=/opt/kafkaexport PATH=$PATH:$KAFKA_HOME/binsource /etc/profile
#配置Zookeeper启动脚本vim /etc/init.d/kafka#!/bin/bash#chkconfig:2345 22 88#description:Kafka Service Control ScriptKAFKA_HOME='/opt/kafka'case $1 instart)echo "---------- Kafka 启动 ------------"${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties;;stop)echo "---------- Kafka 停止 ------------"${KAFKA_HOME}/bin/kafka-server-stop.sh;;restart)$0 stop$0 start;;status)echo "---------- Kafka 状态 ------------"count=$(ps -ef | grep kafka | egrep -cv "grep|$$")if [ "$count" -eq 0 ];thenecho "kafka is not running"elseecho "kafka is running"fi;;*)echo "Usage: $0 {start|stop|restart|status}"esac#设置开机自启chmod +x /etc/init.d/kafkachkconfig --add kafka#分别启动Kafkaservice kafka start

2.命令行测试

#创建topickafka-topics.sh --create --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --replication-factor 2 --partitions 3 --topic test1#查看当前服务器中的所有 topickafka-topics.sh --list --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092#发布消息kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092--topic test1#消费消息kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1 --from-beginning#修改分区数kafka-topics.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --alter --topic test1 --partitions 6#删除 topickafka-topics.sh --delete --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1

部署Filebeat

1.安装Filebeat

#10cd /opt/--上传filebeat-6.7.2-linux-x86_64.tar.gz--tar -xf filebeat-6.7.2-linux-x86_64.tar.gzmv filebeat-6.7.2-linux-x86_64 filebeatvim /etc/logstash/logstash.yml--64--path.config: /opt/logsystemctl restart logstash

2.时间同步

#所有节点yum -y install ntpdatentpdate ntp.aliyun.com date

3.配置filebeat

#给nginx日志文件赋权cd /var/log/nginx/chmod 777 access.log error.log

#配置filebeatcd /opt/filebeatvim filebeat.ymlfilebeat.inputs:- type: logenabled: truepaths:- /var/log/nginx/access.log- /var/log/nginx/error.logtags: ["nginx"]fields:service_name: 20.0.0.10_nginxlog_type: nginxfrom: 20.0.0.10output.kafka:enabled: truehosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]topic: "nginx"--------------Elasticsearch output-------------------(全部注释掉)----------------Logstash output---------------------(全部注释掉)nohup ./filebeat -e -c filebeat.yml > filebeat.out ">部署Filebeat 

1.安装配置filebeat

#收集81服务器上的mysql和apache日志cd /opt/--上传filebeat-6.7.2-linux-x86_64.tar.gz--tar -xf filebeat-6.7.2-linux-x86_64.tar.gzmv filebeat-6.7.2-linux-x86_64 filebeatcd filebeat/vim filebeat.yml filebeat.inputs:- type: logenabled: truepaths:- /etc/httpd/logs/access_log- /etc/httpd/logs/error_logtags: ["httpd_81"]fields:service_name: 20.0.0.81_httpdlog_type: httpdfrom: 20.0.0.81 - type: logenabled: truepaths:- /usr/local/mysql/data/mysql_general.logtags: ["mysql_81"]fields:service_name: 20.0.0.81_mysqllog_type: mysqlfrom: 20.0.0.81output.kafka:enabled: truehosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]topic: "httpdmysql" --------------Elasticsearch output-------------------(全部注释掉)----------------Logstash output---------------------(全部注释掉)nohup ./filebeat -e -c filebeat.yml > filebeat.out &#启动filebeat

2.配置logstash

10:cd /opt/log/vim 81_a+m.confinput {kafka {bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"topics=> "httpdmysql"type => "httpd+mysql_kafka"codec => "json"auto_offset_reset => "earliest"decorate_events => true}}output {if "httpd_81" in [tags] {elasticsearch {hosts => ["20.0.0.20:9200","20.0.0.30:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}if "mysql_81" in [tags] {elasticsearch {hosts => ["20.0.0.20:9200","20.0.0.30:9200"]index => "nginx_access-%{+YYYY.MM.dd}"}}stdout { codec => rubydebug }}logstash -f 81_a+m.conf --path.data /opt/test2