龙空技术网

腾讯云CDN log拉取放入自建的ELK日志系统

Frankspr 53

前言:

现在姐妹们对“nginx获取铁通ip”可能比较着重,朋友们都想要分析一些“nginx获取铁通ip”的相关文章。那么小编也在网络上汇集了一些有关“nginx获取铁通ip””的相关知识,希望你们能喜欢,你们快快来了解一下吧!

1、下载脚本(不能实时,只能定时拉取)

#!/usr/bin/env python

# coding=utf-8

import hashlib

import requests

import hmac

import random

import time

import base64

import json

import gzip

import os

import sys

from datetime import datetime, timedelta

class Sign(object):

def __init__(self, secretId, secretKey):

self.secretId = secretId

self.secretKey = secretKey

# 生成签名串

def make(self, requestHost, requestUri, params, method='GET'):

srcStr = method.upper() + requestHost + requestUri + '?' + "&".join(k.replace("_",".") + "=" + str(params[k]) for k in sorted(params.keys()))

hashed = hmac.new(self.secretKey, srcStr, hashlib.sha1)

return base64.b64encode(hashed.digest())

class CdnHelper(object):

SecretId='ID'

SecretKey='key'

requestHost='cdn.api.qcloud.com'

requestUri='/v2/index.php'

def __init__(self, host, startDate, endDate):

self.host = host

self.startDate = startDate

self.endDate = endDate

self.params = {

'Timestamp': int(time.time()),

'Action': 'GetCdnLogList',

'SecretId': CdnHelper.SecretId,

'Nonce': random.randint(10000000,99999999),

'host': self.host,

'startDate': self.startDate,

'endDate': self.endDate

}

self.params['Signature'] = Sign(CdnHelper.SecretId, CdnHelper.SecretKey).make(CdnHelper.requestHost, CdnHelper.requestUri, self.params)

self.url = '; % (CdnHelper.requestHost, CdnHelper.requestUri)

def GetCdnLogList(self):

ret = requests.get(self.url, params=self.params)

return ret.json()

class GZipTool(object):

"""

压缩与解压gzip

"""

def __init__(self, bufSize = 1024*8):

self.bufSize = bufSize

self.fin = None

self.fout = None

def compress(self, src, dst):

self.fin = open(src, 'rb')

self.fout = gzip.open(dst, 'wb')

self.__in2out()

def decompress(self, gzFile, dst):

self.fin = gzip.open(gzFile, 'rb')

self.fout = open(dst, 'wb')

self.__in2out()

def __in2out(self,):

while True:

buf = self.fin.read(self.bufSize)

if len(buf) < 1:

break

self.fout.write(buf)

self.fin.close()

self.fout.close()

def download(link, name):

try:

r = requests.get(link)

with open(name, 'wb') as f:

f.write(r.content)

return True

except:

return False

def writelog(src, dst):

# 保存为以天命名日志

dst = dst.split('-')[0][:-2] + '-' + dst.split('-')[1]

with open(src, 'r') as f1:

with open(dst, 'a+') as f2:

for line in f1:

f2.write(line)

if __name__ == '__main__':

#startDate = "2018-03-13 12:00:00"

#endDate = "2018-03-13 12:00:00"

# 前一小时

# startDate = endDate = time.strftime('%Y-%m-%d ', time.localtime()) + str(time.localtime().tm_hour-1) + ":00:00"

tm = datetime.now() + timedelta(hours=-2)

startDate = endDate = tm.strftime("%Y-%m-%d %H:00:00")

#hosts = ['abc.demo.com'i,'test.demo.com']

hosts = [

'flash.demo.com'

]

for host in hosts:

try:

obj = CdnHelper(host, startDate,endDate)

ret = obj.GetCdnLogList()

link = ret['data']['list'][0]['link']

name = ret['data']['list'][0]['name']

gzip_name = '/data/logs/cdn/cdn_log_temp/' + name + '.gz'

local_name = '/data/logs/cdn/cdn_log_temp/' + name + '.log'

real_path = '/data/logs/cdn/' + name + '.log'

print local_name, real_path

status = download(link, gzip_name)

if status:

try:

GZipTool().decompress(gzip_name, local_name)

writelog(local_name, real_path)

# os.remove(gzip_name)

os.remove(local_name)

except:

continue

except Exception ,e:

print e

continue

2、filebeat配置(filebeat读取日志,写入kafka)

cat /etc/filebeat/filebeat.yml

filebeat:

prospectors:

-

paths:

- /data/logs/cdn/*.log

fields:

tag: cdn-log

output.kafka:

hosts: ["10.10.16.72:9092","10.10.16.73:9092","10.10.16.74:9092"]

topic: "cdn-log"

partition.round_robin:

reachable_only: false

required_acks: 1

compression: gzip

max_message_bytes: 1000000

3、logstash配置(读取kafka中的topic将数据格式化写入elasticsearch)

备注5.x之前的版本默认不支持alter过滤需要安装logstash-filter-alter插件,安装方法

我是yum 安装的,源码安装路径不一样,自己注意

cd /usr/share/logstash/

bin/logstash-plugin install logstash-filter-alter

input {

kafka {

bootstrap_servers => "nh-sy-storm3:9092,nh-sy-storm1:9092,nh-sy-storm2:9092"

topics => "cdn-log"

}

}

filter {

json {

source => "message"

}

if [fields][tag] == "cdn-log"

{

grok {

match => {

"message" =>"%{NUMBER:timestamp} %{IPORHOST:client_ip} %{IPORHOST:domain} %{NOTSPACE:request} %{NUMBER:bytes} %{NUMBER:province} %{NUMBER:isp} %{NUMBER:response} %{NOTSPACE:referrer

} %{NUMBER:response_time} %{QS:agent} %{QS:range} %{WORD:verb} %{NOTSPACE:http_version} %{WORD:cache_status}" }

}

date {

match => [ "timestamp", "yyyyMMddHHmmss"]

target => "@timestamp"

}

alter {

condrewrite => [

"province", "22", "北京",

"province", "86", "内蒙古",

"province", "146", "山西",

"province", "1069", "河北",

"province", "1077", "天津",

"province", "119", "宁夏",

"province", "152", "陕西",

"province", "1208", "甘肃",

"province", "1467", "青海",

"province", "1468", "新疆",

"province", "145", "黑龙江",

"province", "1445", "吉林",

"province", "1464", "辽宁",

"province", "2", "福建",

"province", "120", "江苏",

"province", "121", "安徽",

"province", "122", "山东",

"province", "1050", "上海",

"province", "1442", "浙江",

"province", "182", "河南",

"province", "1135", "湖北",

"province", "1465", "江西",

"province", "1466", "湖南",

"province", "118", "贵州",

"province", "153", "云南",

"province", "1051", "重庆",

"province", "1068", "四川",

"province", "1155", "西藏",

"province", "4", "广东",

"province", "173", "广西",

"province", "1441", "海南",

"province", "0", "其他",

"province", "1", "港澳台",

"province", "1", "海外",

"isp", "2", "中国电信",

"isp", "26", "中国联通",

"isp", "38", "教育网",

"isp", "43", "长城宽带",

"isp", "1046", "中国移动",

"isp", "3947", "中国铁通",

"isp", "-1", "海外运营商",

"isp", "0", "其他运营商"

]

}

}

}

output {

if [fields][tag] == "cdn-log" {

elasticsearch {

hosts => ["10.10.16.245:9200", "10.10.16.246:9200", "10.10.16.248:9200"]

index => "cdn-log-%{+YYYY-MM-dd}"

}

}

}

参考大神博文

patterns

标签: #nginx获取铁通ip