龙空技术网

ELK Stack生产实践——自定义日志采集(Filebeat方式)

崔亮的博客 112

前言:

现在小伙伴们对“apache2221msi”大致比较关切,你们都需要学习一些“apache2221msi”的相关知识。那么小编也在网络上搜集了一些有关“apache2221msi””的相关资讯,希望看官们能喜欢,咱们一起来了解一下吧!

在某些存在业务高峰期的场景下,期间可能会产生大量日志,如果继续使用fleet采集日志,使用ingest处理数据,可能会出现写入堆积的情况。此时可采用传统的Filebeat方式采集日志,引入Kafka作为消息缓冲队列,保证日志传输数据的可靠性和稳定性。接下来以日志demo程序为例,实现Filebeat采集——>kafka消息缓冲队列——>logstash解析处理数据——>es存储——>kibana数据可视化分析完整流程的演示。

日志demo程序部署项目地址

代码仓库地址:

日志格式

模拟常见的后端服务日志,格式如下

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '186.196.110.240', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}2023-07-23 09:35:19.728 | WARNING  | __main__:debug_log:47 - {'access_status': 403, 'request_method': 'PUT', 'request_uri': '/public/', 'request_length': 72, 'remote_address': '158.113.125.213', 'server_name': 'cu-35.cn', 'time_start': '2023-07-23T09:35:18.948+08:00', 'time_finish': '2023-07-23T09:35:20.343+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}2023-07-23 09:35:19.793 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/public/', 'request_length': 46, 'remote_address': '153.83.121.71', 'server_name': 'cm-17.cn', 'time_start': '2023-07-23T09:35:19.318+08:00', 'time_finish': '2023-07-23T09:35:20.563+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; rv:57.0) Gecko/20100101 Firefox/57.0'}2023-07-23 09:35:20.614 | ERROR    | __main__:debug_log:45 - {'access_status': 502, 'request_method': 'GET', 'request_uri': '/public/', 'request_length': 62, 'remote_address': '130.190.246.56', 'server_name': 'cu-34.cn', 'time_start': '2023-07-23T09:35:20.061+08:00', 'time_finish': '2023-07-23T09:35:21.541+08:00', 'http_user_agent': 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1; Trident/4.0; Hot Lingo 2.0)'}
部署
[root@es-master ~]# cd /opt/[root@es-master opt]# git clone [root@es-master opt]# cd log_demo/[root@es-master log_demo]# lsDockerfile  log.py  main.py  readme.md  requirements.txt[root@es-master log_demo]# docker build -t log_demo:1.0 .[root@es-master log_demo]# docker run --name log_demo -d -v /var/log/log_demo:/opt/logDemo/log --restart always log_demo:1.0 [root@es-master log]# cd /var/log/log_demo/[root@es-master log_demo]# lltotal 44-rw-r--r-- 1 root root  4320 Jul 19 22:33 error.log-rw-r--r-- 1 root root 22729 Jul 19 22:33 info.log-rw-r--r-- 1 root root  8612 Jul 19 22:33 warning.log
Filebeat部署配置下载地址

安装

[root@es-master ~]# cat > /etc/yum.repos.d/elastic.repo << EOF[elastic-8.x]name=Elastic repository for 8.x packagesbaseurl=[root@es-master ~]# dnf -y install filebeat[root@es-master ~]# systemctl enable filebeat
修改配置

读取/opt/log_demo/log下的文件,输出到kafka集群中

[root@es-master ~]# vim /etc/filebeat/filebeat.ymlfilebeat.inputs:- type: log  enabled: true  paths:    - /var/log/log_demo/info.logoutput.kafka:  enable: true  hosts:  ["es-hot1:9092","es-hot2:9092","es-hot3:9092"]  topic: "log_demo"  partition.round_robin:    reachable_only: false  required_acks: -1  compression: gziplogging.level: infologging.to_files: truelogging.files:  path: /var/log/filebeat  name: filebeat  keepfiles: 7  permissions: 0644[root@es-master ~]# mkdir /var/log/filebeat
测试配置文件

使用控制台直接指定yml文件方式启动测试,查看日志是否有报错。

[root@es-master ~]# filebeat -e -c /etc/filebeat/filebeat.yml {"log.level":"info","@timestamp":"2023-07-18T17:56:18.409+0800","log.origin":{"file.name":"instance/beat.go","file.line":779},"message":"Home path: [/usr/share/filebeat] Config path: [/etc/filebeat] Data path: [/var/lib/filebeat] Logs path: [/var/log/filebeat]","service.name":"filebeat","ecs.version":"1.6.0"}{"log.level":"info","@timestamp":"2023-07-18T17:56:18.410+0800","log.origin":{"file.name":"instance/beat.go","file.line":787},"message":"Beat ID: 5d6d4273-b4f8-4cf4-ba39-abab53f6aea2","service.name":"filebeat","ecs.version":"1.6.0"}
启动filebeat
[root@es-master ~]# systemctl start filebeat
kafka部署

本示例中采用最新的kafka kraft模式,优点在于不再依赖 zookeeper 集群,部署维护更加方便。用三台 controller 节点代替zookeeper,元数据保存在 controller 中,由 controller 直接进行 Kafka 集群管理。

下载地址

[root@es-hot1 ~]# wget 
安装jdk

yum方式安装

[root@es-hot1 ~]# dnf -y install java-11-openjdk

或者使用二进制安装

[root@es-hot1 ~]# wget [root@es-hot1 ~]# tar -zxf openlogic-openjdk-17.0.7+7-linux-x64.tar.gz -C /usr/local[root@es-hot1 ~]# cd /usr/local/openlogic-openjdk-17.0.7+7-linux-x64/[root@es-hot1 ~ openlogic-openjdk-17.0.7+7-linux-x64]# lsbin  conf  demo  include  jmods  legal  lib  man  release# 添加环境变量[root@jenkins ~]# vim /etc/profileexport JAVA_HOME=/usr/local/openlogic-openjdk-17.0.7+7-linux-x64export CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jarexport PATH=$PATH:${JAVA_HOME}/bin[root@es-hot1 ~ ~]# source /etc/profile[root@jenkins openlogic-openjdk-17.0.7+7-linux-x64]# java -versionopenjdk version "17.0.7" 2023-04-18OpenJDK Runtime Environment OpenLogic-OpenJDK (build 17.0.7+7-adhoc.root.jdk17u)OpenJDK 64-Bit Server VM OpenLogic-OpenJDK (build 17.0.7+7-adhoc.root.jdk17u, mixed mode, sharing)# 创建软连接[root@es-hot1 ~ ~]# ln -s /usr/local/openlogic-openjdk-17.0.7+7-linux-x64/bin/java /usr/bin/java
解压安装kafka
[root@es-hot1 ~]# tar -zxf kafka_2.13-3.5.0.tgz -C /usr/local[root@es-hot1 ~]# cd /usr/local/kafka_2.13-3.5.0/[root@es-hot1 kafka_2.13-3.5.0]# lsLICENSE  NOTICE  bin  config  libs  licenses  site-docs# 创建kafka数据目录[root@es-hot1 kafka]# mkdir /data/kafka-data
修改配置
[root@es-hot1 kafka]# vim /usr/local/kafka_2.13-3.5.0/config/kraft/server.propertiesprocess.roles=broker,controllernode.id=1 # 节点ID,每个节点的值要不同controller.quorum.voters=1@es-hot1:9093,2@es-hot2:9093,3@es-hot3:9093 # Controller节点配置,用于管理状态的节点(替换Zookeeper作用)listeners=PLAINTEXT://:9092,CONTROLLER://:9093inter.broker.listener.name=PLAINTEXTadvertised.listeners=PLAINTEXT://192.168.10.133:9092 # 使用IP端口,每个节点填写自己节点的IPcontroller.listener.names=CONTROLLERlistener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSLnum.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/data/kafka-data # 数据存储位置num.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000

三台hot服务器都要修改,修改内容为

# 节点ID,每个节点的值要不同,与controller.quorum.voters列表保持一致node.id=1# 使用IP端口,每个节点填写自己节点的IPadvertised.listeners=PLAINTEXT://192.168.10.133:9092
初始化集群

在其中一台服务器上执行下面命令生成一个uuid

[root@es-hot1 ~]# sh /usr/local/kafka_2.13-3.5.0/bin/kafka-storage.sh random-uuidmO5FD8M9S0aRmVZxHZkZIA

用该uuid格式化kafka存储目录,三台服务器都要执行以下命令

[root@es-hot1 ~]# sh /usr/local/kafka_2.13-3.5.0/bin/kafka-storage.sh format -t mO5FD8M9S0aRmVZxHZkZIA -c /usr/local/kafka_2.13-3.5.0/config/kraft/server.propertiesFormatting /data/kafka-data with metadata.version 3.5-IV2.
启动集群

三台都需要启动

[root@es-hot1 ~]# sh /usr/local/kafka_2.13-3.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.5.0/config/kraft/server.properties
验证

查看日志

[root@es-hot1 ~]# tail /usr/local/kafka_2.13-3.5.0/logs/kafkaServer.out [2023-07-18 21:46:44,857] INFO [BrokerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.BrokerServer)[2023-07-18 21:46:44,857] INFO [BrokerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)[2023-07-18 21:46:44,857] INFO [BrokerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.BrokerServer)[2023-07-18 21:46:44,857] INFO [BrokerServer id=1] Transition from STARTING to STARTED (kafka.server.BrokerServer)[2023-07-18 21:46:44,858] INFO Kafka version: 3.5.0 (org.apache.kafka.common.utils.AppInfoParser)[2023-07-18 21:46:44,858] INFO Kafka commitId: c97b88d5db4de28d (org.apache.kafka.common.utils.AppInfoParser)[2023-07-18 21:46:44,858] INFO Kafka startTimeMs: 1689688004857 (org.apache.kafka.common.utils.AppInfoParser)[2023-07-18 21:46:44,862] INFO [KafkaRaftServer nodeId=1] Kafka Server started (kafka.server.KafkaRaftServer)

查看进程

[root@es-hot1 ~]# ps -aux | grep kafka

创建topic测试

[root@es-hot1 ~]# sh /usr/local/kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test --partitions 3 --replication-factor 3Created topic test.

查看topic详情

[root@es-hot1 ~]# sh /usr/local/kafka_2.13-3.5.0/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe testTopic: test     TopicId: QvDV8pJwSTC7NQwrNfjfAw PartitionCount: 3       ReplicationFactor: 3    Configs: segment.bytes=1073741824        Topic: test     Partition: 0    Leader: 2       Replicas: 2,3,1 Isr: 2,3,1        Topic: test     Partition: 1    Leader: 3       Replicas: 3,1,2 Isr: 3,1,2        Topic: test     Partition: 2    Leader: 1       Replicas: 1,2,3 Isr: 1,2,3
配置服务管理脚本
[root@es-hot1 ~]# cat > /usr/lib/systemd/system/kafka.service << EOF[Unit]Description=Apache Kafka server (broker)After=network.target[Service]Type=forkingUser=rootGroup=rootExecStart=/usr/local/kafka_2.13-3.5.0/bin/kafka-server-start.sh -daemon /usr/local/kafka_2.13-3.5.0/config/kraft/server.propertiesExecStop=/usr/local/kafka_2.13-3.5.0/bin/kafka-server-stop.shRestart=on-failure[Install]WantedBy=multi-user.targetEOF[root@es-hot1 ~]# systemctl daemon-reload[root@es-hot1 ~]# systemctl restart kafka[root@es-hot1 ~]# systemctl enable kafkaCreated symlink /etc/systemd/system/multi-user.target.wants/kafka.service → /usr/lib/systemd/system/kafka.service.[root@es-hot1 ~]# systemctl status kafka● kafka.service - Apache Kafka server (broker)     Loaded: loaded (/usr/lib/systemd/system/kafka.service; enabled; preset: disabled)     Active: active (running) since Tue 2023-07-18 22:42:07 CST; 7s ago    Process: 3067 ExecStart=/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/kraft/server.properties (code=exited, status=0/SUCCESS)   Main PID: 3412 (java)      Tasks: 87 (limit: 23012)     Memory: 315.7M        CPU: 12.183s     CGroup: /system.slice/kafka.service             └─3412 java -Xmx1G -Xms1G -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -XX:Max>Jul 18 22:42:06 es-hot2 systemd[1]: Starting Apache Kafka server (broker)...Jul 18 22:42:07 es-hot2 systemd[1]: Started Apache Kafka server (broker).
kafka ui安装部署

项目地址:

启动容器

[root@es-master ~]# docker run --name kafka-ui -d -p 8080:8080 -e DYNAMIC_CONFIG_ENABLED=true -e KAFKA_CLUSTERS_0_NAME="local" -e KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS="192.168.10.133:9092" --restart always provectuslabs/kafka-ui:master

访问验证,查看集群状态

logstash部署下载地址

官方下载地址:

安装

[root@es-warm1 ~]# wget [root@es-warm1 ~]# rpm -ivh logstash-8.8.2-x86_64.rpm[root@es-warm1 ~]# systemctl enable logstashCreated symlink /etc/systemd/system/multi-user.target.wants/logstash.service → /usr/lib/systemd/system/logstash.service.
添加环境变量
[root@es-warm1 ~]# vim  /etc/profileexport PATH=$PATH:/usr/share/logstash/bin[root@es-warm1 ~]# source /etc/profile[root@es-warm1 ~]# logstash -VUsing bundled JDK: /usr/share/logstash/jdklogstash 8.8.2
修改pipeline文件

Logstash的过滤规则编写思路是:

使用grok正则捕获到log_timestamp和level以及日志内容content字段。

由于content字段内容不是标准json字符,使用mutate插件将单引号替换为双引号,并将level字段值大写转小写。

使用json插件,将替换好的content字符串转码为json对象。

使用geoip插件,根据remote_address字段的ip解析查询地理位置信息。

最后使用mutate插件,移除content字段。

需要注意的是在filter中用到了geoip地址查询插件,需要提前下载ip地理信息数据库,具体可参考文档

[root@es-warm1 ~]# cat > /etc/logstash/conf.d/log-to-es.conf << EOFinput {    kafka {        bootstrap_servers => "es-hot1:9092,es-hot2:9092,es-hot3:9092"        auto_offset_reset => "latest"         consumer_threads => 1         decorate_events => true        topics => ["log_demo"]        codec => "json"        group_id => "logstash"    }}filter{    grok{      match => {"message" => "%{TIMESTAMP_ISO8601:log_timestamp} \| %{LOGLEVEL:level} %{SPACE}* \| (?<class>[__main__:[\w]*:\d*]+) \- %{GREEDYDATA:content}"}    }    mutate {        gsub =>[            "content", "'", '"'        ]        lowercase => [ "level" ]    }    json {        source => "content"    }    geoip {        source => "remote_address"        database => "/etc/logstash/GeoLite2-City.mmdb"        ecs_compatibility => disabled    }    mutate {        remove_field => ["content"]    }}output {    elasticsearch {        hosts => [";]        data_stream => "true"        data_stream_type => "logs"        data_stream_dataset => "myapp"        data_stream_namespace => "default"        timeout => 120        pool_max => 800        validate_after_inactivity => 7000        user => elastic        password => "_21FDs+tGRRSaxg=q=4P"        ssl => true        cacert => "/etc/logstash/http_ca.crt"    }  }EOF
拷贝CA证书

Logstash连接es时需要指定ca证书,从master节点拷贝证书至Logstash机器上。

[root@es-warm1 ~]# scp es-master:/etc/elasticsearch/certs/http_ca.crt /etc/logstash/http_ca.crt[root@es-warm1 ~]# chown logstash:logstash /etc/logstash/http_ca.crt
测试配置文件

使用控制台直接指定pipeline文件方式启动测试,查看日志是否有报错。

[root@es-warm1 ~]# logstash -f /etc/logstash/conf.d/log-to-es.conf
启动logstash
[root@es-warm1 ~]# systemctl start logstash
kibana查看数据数据流信息

在kibana中查看数据流信息,已自动创建名为logs-myapp-default的数据流

字段内容查看

在discover中点击展开按钮即可查看每条文档的详细字段信息。

至此,我们顺利的完成了自定义日志从Filebeat采集——>kafka消息缓冲队列——>logstash解析处理数据——>es存储——>kibana数据展示的完整流程。

参考文档

filebeat读取文件配置:

filebeat输出kafka配置:

kafka部署文档:

Logstash输入kafka配置:

Logstash正则匹配插件:

Logstash字符处理插件:

Logstash日期处理插件:

Logstash json转换插件:

Logstash地理位置插件:

Logstash输出elasticsearch配置:

更多原创运维开发文章,欢迎访问博客网站或关注微信公共号《崔亮的博客》第一时间获取最新内容。

标签: #apache2221msi