前言:
此刻咱们对“centos72安装配置elk”大致比较关心,同学们都需要学习一些“centos72安装配置elk”的相关文章。那么小编也在网络上收集了一些对于“centos72安装配置elk””的相关内容,希望朋友们能喜欢,姐妹们一起来了解一下吧!课程标题:<基于ELK+Kafka构建分布式日志采集系统>
1.传统日志采集存在哪些缺点
2.elk+kafka日志采集的原理
3.基于docker compose 安装elk+kafka环境
4.基于AOP+并发队列实现日志的采集
分布式日志采集产生背景
在传统项目中,如果在生产环境中,有多台不同的服务器集群,如果生产环境需要通过日志定位项目的Bug的话,需要在每台节点上使用传统的命令方式查询,这样效率非常低下。
因此我们需要集中化的管理日志,ELK则应运而生。
传统方式服务器搜索日志命令:tail -200f 日志文件名称
ELK+Kafka组成
Elk E= ElasticSeach(存储日志信息)
l Logstash(搬运工)
K Kibana连接到我们ElasticSeach图形化界面查询日志
Elk+kafka实现分布式日志采集
为什么需要将日志存储在ElasticSeach 而不是MySQL中呢
ElasticSeach 底层使用到倒排索引存储数据 ,在 搜索日志效率比mysql要高的。
elk+kafka原理
1. springboot项目会基于aop的方式拦截系统中日志
请求与响应日志信息---前置或者环绕通知;
2. 将该日志投递到我们kafka中 注意该过程一定要是异步的形式,如果是同步形式会影响到整体
接口的响应速度。
3. Logstash 数据源---kafka 订阅kafka的主题 获取日志消息内容
4. Logstash 在将日志消息内容输出到es中存放
5.开发者使用Kibana连接到ElasticSeach 查询存储日志内容。
为什么ELK需要结合Kafka
如果只整合elk 不结合kafka这样的话 每个服务器节点上都会安装Logstash做读写日志IO操作,可能性能不是很好,而且比较冗余。
ELK+Kafka环境构建docker compose构建ELK+Kafka环境
整个环境采用 docker compose 来实现构建
注意:环境 cpu 多核 内存 4GB以上
kafka环境的安装:
1.使用docker compose 安装 kafka
对docker compose 不熟悉可以查看:
docker compose 安装包
docker 相关学习文档:
2. docker compose文件
3. mkdir dockerkakfa
4.cd dockerkakfa
5.创建docker-compose.yml
version: '2'services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" restart: always kafka: image: wurstmeister/kafka:2.12-2.3.0 ports: - "9092:9092" environment: - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.75.129:9092 - KAFKA_LISTENERS=PLAINTEXT://:9092 volumes: - /var/run/docker.sock:/var/run/docker.sock restart: always kafka-manager: image: sheepkiller/kafka-manager ## 镜像:开源的web管理kafka集群的界面 environment: ZK_HOSTS: 192.168.75.129 ## 修改:宿主机IP ports: - "9001:9000" ## 暴露端口 elasticsearch: image: daocloud.io/library/elasticsearch:6.5.4 restart: always container_name: elasticsearch environment: - "ES_JAVA_OPTS=-Xms512m -Xmx512m" ports: - 9200:9200 kibana: image: daocloud.io/library/kibana:6.5.4 restart: always container_name: kibana ports: - 5601:5601 environment: - elasticsearch_url= depends_on: - elasticsearch
docker run zookeeper容器
docker run kafka容器
docker run kafka容器ElasticSeach
docker run Kibana容器
docker run Logstash容器
使用容器编排技术
6.关闭防火墙
systemctl stop firewalld
service iptables stop
7.docker-compose up 执行即可。
没有这个命令 需要先安装docker-compose
注意:构建elk+kafka环境过程中,需要非常多的依赖镜像。
如果es启动报错: 无法启动大多数内存不足的原因
建议虚拟机内存4G以上
es 启动报错:max virtual memory areas vm.max_count(65530) is too
解决步骤:
1.先切换到root用户;
2.执行命令:
sysctl -w vm.max_map_count=262144
可以查看结果:
sysctl -a|grep vm.max_map_count
会显示如下信息:
vm.max_map_count = 262144
注意:
上述方法修改之后,如果重启虚拟机将失效,所以:
一劳永逸的解决办法:
在/etc/sysctl.conf文件的最后添加一行代码:
vm.max_map_count=262144
即可永久修改。
验证elk+kafka 环境
docker ps
访问:zk 192.168.75.143:2181
访问:es
访问:kibana
安装 logstash
上传logstash-6.4.3.tar.gz到服务中tar zxvf logstash-6.4.3.tar.gzcd logstash-6.4.3bin/logstash-plugin install logstash-input-kafkabin/logstash-plugin install logstash-output-elasticsearch
注意:安装
bin/logstash-plugin install logstash-input-kafka
bin/logstash-plugin install logstash-output-elasticsearch
本机电脑需要有JDK的环境,如果没有JDK环境直接安装 logstash-input-kafka 或者logstash-output-elasticsearch会报错的
创建 在 logstash config 目录 创建 kafka.conf
input { kafka { bootstrap_servers => "192.168.75.143:9092" topics => "mayikt-log" }}filter { #Only matched data are send to output.}output { elasticsearch { action => "index" #The operation on ES hosts => "192.168.75.143:9200" #ElasticSearch host, can be array. index => "my_logs" #The index to write data to. } }
进入logstash bin 目录 执行 ./logstash -f ../config/kafka.conf
springboot项目整合elk+kafka
maven依赖
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.66</version> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>commons-lang</groupId> <artifactId>commons-lang</artifactId> <version>2.6</version> </dependency> </dependencies>aop拦截系统日志
import java.net.InetAddress;import java.net.UnknownHostException;import java.text.SimpleDateFormat;import java.util.Arrays;import java.util.Date;import javax.servlet.http.HttpServletRequest;import com.alibaba.fastjson.JSONObject;import com.mayikt.container.LogContainer;import org.aspectj.lang.JoinPoint;import org.aspectj.lang.annotation.*;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;import org.springframework.util.concurrent.ListenableFuture;import org.springframework.web.context.request.RequestContextHolder;import org.springframework.web.context.request.ServletRequestAttributes;@Aspect@Componentpublic class AopLogAspect { @Value("${server.port}") private String serverPort; // 申明一个切点 里面是 execution表达式 @Pointcut("execution(* com.mayikt.api.service.*.*(..))") private void serviceAspect() { } // @Autowired private LogContainer logContainer; // // 请求method前打印内容 @Before(value = "serviceAspect()") public void methodBefore(JoinPoint joinPoint) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 jsonObject.put("request_time", df.format(new Date())); jsonObject.put("request_url", request.getRequestURL().toString()); jsonObject.put("request_method", request.getMethod()); jsonObject.put("signature", joinPoint.getSignature()); jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs())); // IP地址信息 jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort); JSONObject requestJsonObject = new JSONObject(); requestJsonObject.put("request", jsonObject); jsonObject.put("request_time", df.format(new Date())); jsonObject.put("log_type", "info"); // 将日志信息投递到kafka中 String log = requestJsonObject.toJSONString();// ListenableFuture<SendResult<String, Object>> send = kafkaTemplate.send("mayikt-log",ctx); logContainer.addLog(log); } //// // 在方法执行完结后打印返回内容// @AfterReturning(returning = "o", pointcut = "serviceAspect()")// public void methodAfterReturing(Object o) {// ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder// .getRequestAttributes();// HttpServletRequest request = requestAttributes.getRequest();// JSONObject respJSONObject = new JSONObject();// JSONObject jsonObject = new JSONObject();// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式// jsonObject.put("response_time", df.format(new Date()));// jsonObject.put("response_content", JSONObject.toJSONString(o));// // IP地址信息// jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort);// jsonObject.put("log_type", "info");// respJSONObject.put("response", jsonObject);// // 将日志信息投递到kafka中//// kafkaTemplate.send("mayikt-log",respJSONObject.toJSONString());//// logContainer.put(respJSONObject.toJSONString());// }//// /** * 异常通知 * * @param point */ @AfterThrowing(pointcut = "serviceAspect()", throwing = "e") public void serviceAspect(JoinPoint point, Exception e) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest(); JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");// 设置日期格式 jsonObject.put("request_time", df.format(new Date())); jsonObject.put("request_url", request.getRequestURL().toString()); jsonObject.put("request_method", request.getMethod()); jsonObject.put("signature", point.getSignature()); jsonObject.put("request_args", Arrays.toString(point.getArgs())); jsonObject.put("error", e.toString()); // IP地址信息 jsonObject.put("ip_addres", getIpAddr(request) + ":" + serverPort); jsonObject.put("log_type", "info"); JSONObject requestJsonObject = new JSONObject(); requestJsonObject.put("request", jsonObject); // 将日志信息投递到kafka中 String log = requestJsonObject.toJSONString(); logContainer.addLog(log); } // public static String getIpAddr(HttpServletRequest request) { //X-Forwarded-For(XFF)是用来识别通过HTTP代理或负载均衡方式连接到Web服务器的客户端最原始的IP地址的HTTP请求头字段。 String ipAddress = request.getHeader("x-forwarded-for"); if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getHeader("WL-Proxy-Client-IP"); } if (ipAddress == null || ipAddress.length() == 0 || "unknown".equalsIgnoreCase(ipAddress)) { ipAddress = request.getRemoteAddr(); if (ipAddress.equals("127.0.0.1") || ipAddress.equals("0:0:0:0:0:0:0:1")) { //根据网卡取本机配置的IP InetAddress inet = null; try { inet = InetAddress.getLocalHost(); } catch (UnknownHostException e) { e.printStackTrace(); } ipAddress = inet.getHostAddress(); } } //对于通过多个代理的情况,第一个IP为客户端真实IP,多个IP按照','分割 if (ipAddress != null && ipAddress.length() > 15) { //"***.***.***.***".length() = 15 if (ipAddress.indexOf(",") > 0) { ipAddress = ipAddress.substring(0, ipAddress.indexOf(",")); } } return ipAddress; }}配置文件内容
spring: application: ###服务的名称 name: mayikt-elkkafka jackson: date-format: yyyy-MM-dd HH:mm:ss kafka: bootstrap-servers: 192.168.75.143:9092 #指定kafka server的地址,集群配多个,中间,逗号隔开 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default_consumer_group #群组ID enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializerserver: port: 9000
标签: #centos72安装配置elk