需求如题,要依靠 Python 脚本转发 Splunk 数据到第三方目的地(可以是 Socket 或者 Kafka 中间件等等,这里就只放 Kafka 的部分),以获取前一天的数据为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
import sys,io,os,logging,datetime
import splunklib.client as client
import splunklib.results as results
from logging import handlers
from kafka import KafkaProducer

reload(sys)
sys.setdefaultencoding('utf8')

# log日志,存放路径:/logs
class Logger:
level_relations = {
'debug':logging.DEBUG,
'info':logging.INFO,
'warning':logging.WARNING,
'error':logging.ERROR,
'crit':logging.CRITICAL
}

def __init__(self,option,level='info',when='W0',backCount=8):
logfile = "/logs/{}.log".format(option)
if not os.path.exists('/logs'):
os.makedirs("/logs")
self.logger = logging.getLogger(logfile)
fmt='%(asctime)s - %(levelname)s: %(message)s'
format_str = logging.Formatter(fmt)
self.logger.setLevel(self.level_relations.get(level))
self.logger.handlers=[]
#stream_handler = logging.StreamHandler()
#stream_handler.setFormatter(format_str)
#self.logger.addHandler(stream_handler)
file_handler = handlers.TimedRotatingFileHandler(filename=logfile,when=when,backupCount=backCount,encoding='utf-8')
file_handler.setFormatter(format_str)
self.logger.addHandler(file_handler)


# splunk客户端
# option:数据分类标识
class ConnectPhoenix:
def __init__(self,option):
self.HOST = "127.0.0.1"
self.PORT = 8089
self.USERNAME = "admin"
self.PASSWORD = "passwd"
self.option = option

def phoenixService(self):
phoenix_service = client.connect(
host=self.HOST,
port=self.PORT,
username=self.USERNAME,
password=self.PASSWORD,
verify=False,
app="search")
return phoenix_service

# 获取查询SPL
def get_query(self):
if self.option == 'test1':
return 'search index=*'

# 获取查询结果
# period:起始时间
# delay:终止时间
def get_results(self, period, delay):
query = self.get_query()
kwargs = {'earliest_time': '-%dd@d' % (int(period) + int(delay)), 'latest_time':'-%dd@d' % int(delay)}
phoenix_service = self.phoenixService()
phoenix_jobs = phoenix_service.jobs
job = phoenix_jobs.export(query, **kwargs)
query_results = results.ResultsReader(io.BufferedReader(job))
return query_results


# 针对查询结果做ETL
# log:查询结果
# option:数据分类标识
class FormatLog:
def __init__(self,log,option):
self.log = log
self.option = option

def format_log(self):
if self.option == 'test1':
# 此处可针对业务对数据做特殊处理
pass

return self.log


if __name__=='__main__':
# get args from cron cmd
option = sys.argv[1] if len(sys.argv) > 1 else 'test1' # 任务名,默认test1
period = sys.argv[2] if len(sys.argv) > 2 else 1 # 查询起始时间,默认前1天0点整
delay = sys.argv[3] if len(sys.argv) > 3 else 0 # 查询终止时间,默认前1天23:59:59
# OUTPUTIP = sys.argv[4] if len(sys.argv) > 4 else '192.168.66.6' # 目的地址,默认192.168.66.6
# OUTPUTPORT = int(sys.argv[5]) if len(sys.argv) > 5 else 9092 # 目标端口,默认9092

log=Logger(option,level='info')
phoenix_server = ConnectPhoenix(option)
query_results = phoenix_server.get_results(period, delay)

# for result in query_results:
# print(result)

# kafka sink
def send_to_kafka():
# kafka地址
kafka_addr = '192.168.66.6:9092'
# kafka客户端
producer = KafkaProducer(bootstrap_servers=kafka_addr)
# 日志计数
count_num = 0
for result in query_results:
if isinstance(result,results.Message):
pass
else:
formatLog = FormatLog(result, option)
logdic = formatLog.format_log()
if logdic != "":
try:
# 参数一:Topic
# 参数二:发送数据
# 参数三:分区
producer.send('test1', str(logdic), partition=0)
count_num += 1
except Exception as e:
log.logger.info("Error: {}".format(e))

producer.flush()
producer.close()
# log记录
log.logger.info("forwarder process send {} msg to {}".format(count_num,kafka_addr))

try:
send_to_kafka()
except Exception as e:
log.logger.info("Error: {}".format(e))
# send_to_kafka();

参考文献

  【1】https://dev.splunk.com/enterprise/docs/devtools/python/sdk-python/howtousesplunkpython/howtorunsearchespython/
  【2】https://blog.csdn.net/d1240673769/article/details/80424161
  【3】https://blog.csdn.net/ffjl1985/article/details/79773719?spm=1001.2101.3001.6650.6&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-6-79773719-blog-80424161.235%5Ev33%5Epc_relevant_increate_t0_download_v2_base&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-6-79773719-blog-80424161.235%5Ev33%5Epc_relevant_increate_t0_download_v2_base&utm_relevant_index=12