Python 脚本之将 logstash 数据按天保存在本地服务器并加密压缩
|字数总计:2.5k|阅读时长:10分钟|阅读量:
前言
最近客户方提出了要将日志另外备份一份并进行加密压缩,今天用这篇文章来总结一下。原本的架构是logstash通过udp端口将数据发送到我们的系统,现在客户想在中间将数据取出来做一份备份,并把每天的数据加密压缩。
功能分两个脚本来实现,话不多说,直接进入正题。
脚本一:接收数据并按天保存在本地
先把数据取出来,在logstash中添加output插件,发送到本地888端口,数据为json格式
1 2 3 4 5
| udp { host => "10.7.2.20" port => "888" codec => json }
|
可以在本地用nc监听888端口查看网络是否阻塞,这里就不做演示了。
之后先编写接收udp端口数据的代码,一步一步来:
1 2 3 4 5 6 7 8 9 10 11 12 13
| import socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) localaddr = ("192.168.44.112",8888) udp_socket.bind(localaddr) data,client = udp_socket.recvfrom(2048) print(data.decode("utf-8")) udp_socket.close()
|
收到数据后,发现来一条数据后就断开连接,就在前面加一个循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| import socket
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) localaddr = ("192.168.44.112",8888) udp_socket.bind(localaddr) while True: data,client = udp_socket.recvfrom(2048) print(data.decode("utf-8")) udp_socket.close()
|
这样就能一直收到发送方的数据了,然后把数据存在本地文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import socket,os
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) localaddr = ("192.168.44.112",8888) udp_socket.bind(localaddr) while True: data,client = udp_socket.recvfrom(2048) f = open("/data/test.txt","a+") f.write(data.decode("utf-8")+ "\n") f.close() udp_socket.close()
|
测试之后,完美运行,下面就是完善代码了,前面说过日志要按天来存储,并且日志还分为不同的类型,所以存放路径就分为每种类型日志下面有各自每天的日志备份,日志样例如下
1
| {"@timestamp":"2021-03-11T09:24:03.668Z","hostipv6":"fe80::c360:92ba:aede:5c36","hostipv4":"192.168.44.111","ecs":{"version":"1.0.0"},"@version":"1","message":"Mar 11 17:24:01 localhost CROND[104204]: (root) CMD (/usr/local/bin/php /data/rdiweb/public/di_admin.php Autotask >> /tmp/111.log 2>&1 &)","host":{"name":"localhost.localdomain","containerized":false,"os":{"kernel":"3.10.0-693.el7.x86_64","family":"redhat","name":"CentOS Linux","codename":"Core","version":"7 (Core)","platform":"centos"},"ip":["192.168.44.111","fe80::c360:92ba:aede:5c36"],"id":"251f075008fe4214825ef8910e29bf29","mac":["00:0c:29:64:b2:af"],"hostname":"localhost.localdomain","architecture":"x86_64"},"input":{"type":"log"},"agent":{"ephemeral_id":"25d88d8c-994c-49c9-afa3-78867740c45a","version":"7.2.0","type":"filebeat","hostname":"localhost.localdomain","id":"7936b07b-99db-458a-b7ed-3df7e348ef96"},"log":{"file":{"path":"/var/log/cron"},"offset":111834},"appname":"cron","tags":["cron","beats_input_codec_plain_applied"]}
|
可以看到从logstash收到的json日志是由tags字段来分日志类型的,比如bashhistory、cron、secure等等日志,而message为原始日志,所以我们只需要取得tags字段来分类型,并把message存在文件里即可,这里我是用正则来提取的值,因为数据的格式不规则,用json解析可能会报错,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import socket,os,re
def main(path): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) localaddr = ("192.168.44.112",8888) udp_socket.bind(localaddr)
def getmessage(data): rule = "\"message\":\"(.*?)\",\"" message = re.findall(rule, data) return message[0]
def gettype(data): rule = "\"tags\":\[\"(.*?)\"," message = re.findall(rule, data) return message[0]
while True: data,client = udp_socket.recvfrom(2048) import time time = time.strftime('%Y-%m-%d') type = gettype(data) message = getmessage(data) f = open("%s/%s/%s.txt"%(path,type,time),"a+") f.write(data.decode("utf-8") + "\n") f.close() udp_socket.close() if __name__ == "__main__": path = "/data/data" main(path)
|
运行效果如图:
可以看到日志按照类型放在不同文件夹下,并按日期分为不同的文件,代码到这里就差不多了,但是还差一些需要完善的地方,比如来的是空数据,列表会下标越界,包括后续如果客户方要接入新的日志,找不到对应的路径也会报错,总不能每次都能手动创建日志类型的文件夹吧,话不多说,完整的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| import socket,os,re
def main(path): udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) localaddr = ("192.168.44.112",8888) udp_socket.bind(localaddr)
def getmessage(data): rule = "\"message\":\"(.*?)\",\"" message = re.findall(rule, data) return message[0]
def gettype(data): rule = "\"tags\":\[\"(.*?)\"," message = re.findall(rule, data) return message[0]
while True: data,client = udp_socket.recvfrom(2048) if data.isspace(): continue import time time = time.strftime('%Y-%m-%d') type = gettype(data) message = getmessage(data) if os.path.exists("%s/%s"%(path,type)) == False: os.makedirs("%s/%s"%(path,type)) f = open("%s/%s/%s.txt"%(path,type,time),"a+") f.write(data.decode("utf-8") + "\n") f.close() udp_socket.close() if __name__ == "__main__": path = "/data/data" main(path)
|
脚本二:将本地文件进行加密压缩
上一个脚本已经完美的把日志备份在了本地服务器,那么接下来就是将日志进行加密压缩,这个脚本相对上一个来说还是很简单的。
用python压缩文件还是很简单的,比较费事的是如何加密压缩,因为pyhon自带的zipfile库不支持加密,所以这里只能使用linux压缩文件的命令行来进行处理了.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import os from datetime import date, timedelta
def zipDir(path,outpath,password): import os status = os.popen("zip -jP %s %s %s"%(password,outpath,path)) os.wait() if __name__ == "__main__": path = "/data/secure" password = "123456" yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d") zipDir("%s/%s.txt" %(path[0],yesterday), "%s/%s.zip" %(path,yesterday), password)
|
到这里文件就完美加密压缩了,可是现在的代码只能压缩一种日志类型下的文件,不能将所有日志类型全部压缩,并且压缩后的日志文件没有删除,浪费了服务器的磁盘空间,于是又花费了一些时间将代码完善,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import os from datetime import date, timedelta
def zipDir(path,outpath,password): import os status = os.popen("zip -jP %s %s %s"%(password,outpath,path)) os.wait() os.remove(path)
if __name__ == "__main__": paths = "/data/data" password = "123456" num = True for path in os.walk(paths): if num: num = False continue yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d") zipDir("%s/%s.txt" %(path[0],yesterday), "%s/%s.zip" %(path[0],yesterday), password)
|
可以看到上面的代码遍历了目录,并且删除了压缩后的日志文件,和上一个脚本一样,哪怕后续加入了新的日志类型,脚本也不需要再改动了,运行效果如下:
之后把脚本加入到linux任务执行计划里,每天晚上2点压缩前一天的文件即可。
root用户下执行ctontab -e命令,将下面执行脚本的命令添加在文件末尾即可:
1
| 00 2 * * * /data/python3/bin/python3 /data/file2zip.py
|
/data/python3/bin/python3是python安装的路径,注意这里一定要写绝对路径,否则可能会执行失败,/data/file2zip.py是脚本的路径和文件名,00 2 * * *是指每天2:00运行脚本
同样上一个脚本也要加入到任务执行计划中,这样服务器挂掉或者重启之后脚本也能重启:
1
| */5 * * * * /data/python3/bin/python3 /data/logstash2file.py
|
这里每五分钟执行一次即可。
总结
这个需求相对来说没到很难的程度,只是要全面考虑到怎么应对不同的生产环境。