在 Linux 环境执行脚本,Python需要引入对应依赖:

1
pip install splunk-sdk

  离线环境下,可手动执行python进入 Python 解释器的交互式界面,输入以下命令:

1
2
import sys
print(sys.path)

  该命令会输出一个列表,包含了 Python 解释器在搜索模块时会查看的所有目录,如/usr/local/lib/python2.7/site-packages/。再去 Splunk 目录,将./etc/apps/splunk_instrumentation/bin/splunk_instrumentation/splunklib目录复制到/usr/local/lib/python2.7/site-packages/一份,Python 环境即有了 Splunk 的依赖。
  脚本执行命令:

1
python forwarder_udp.py original 1 0 day 192.9.9.9 514

  Python环境:2.7,脚本如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
from logging import handlers
import sys, io, logging, socket, os
import splunklib.client as client
import splunklib.results as results

# _create_unverified_https_context = ssl._create_unverified_context
# ssl._create_default_https_context = _create_unverified_https_context
reload(sys)
sys.setdefaultencoding('utf8')

# log日志,存放路径:/logs
class Logger:
level_relations = {
'debug': logging.DEBUG,
'info': logging.INFO,
'warning': logging.WARNING,
'error': logging.ERROR,
'crit': logging.CRITICAL
}

def __init__(self, option, level='info', when='W0', backCount=8):
logfile = "/logs/{}.log".format(option)
if not os.path.exists('/logs'):
os.makedirs("/logs")
self.logger = logging.getLogger(logfile)
fmt = '%(asctime)s - %(levelname)s: %(message)s'
format_str = logging.Formatter(fmt)
self.logger.setLevel(self.level_relations.get(level))
self.logger.handlers = []
# stream_handler = logging.StreamHandler()
# stream_handler.setFormatter(format_str)
# self.logger.addHandler(stream_handler)
file_handler = handlers.TimedRotatingFileHandler(filename=logfile, when=when, backupCount=backCount,
encoding='utf-8')
file_handler.setFormatter(format_str)
self.logger.addHandler(file_handler)

# splunk客户端
# option:数据分类标识
class ConnectPhoenix:
def __init__(self,option):
self.HOST = "10.10.10.1"
self.PORT = 8089
self.USERNAME = "admin"
self.PASSWORD = "123456"
self.option = option

def phoenixService(self):
phoenix_service = client.connect(
host=self.HOST,
port=self.PORT,
username=self.USERNAME,
password=self.PASSWORD,
verify=False,
app="search")
return phoenix_service

# 获取查询SPL
def get_query(self):
if self.option == 'original':
return 'search index=ri_* | table _time,_raw'

# 获取查询结果
# period:起始时间
# delay:截止时间
# time_type:时间类型,day、hour、minute
def get_results(self, period, delay, time_type):
query = self.get_query()
if time_type == 'day':
kwargs = {'earliest_time': '-%dd@d'%(int(period)), 'latest_time': '-%dd@d'%(int(delay))}
elif time_type == 'hour':
kwargs = {'earliest_time': '-%dh@h' % (int(period)), 'latest_time': '-%dh@h' % (int(delay))}
elif time_type == 'minute':
kwargs = {'earliest_time': '-%dm@m'%(int(period)), 'latest_time': '-%dm@m'%(int(delay))}
phoenix_service = self.phoenixService()
phoenix_jobs = phoenix_service.jobs
job = phoenix_jobs.export(query, **kwargs)
query_results = results.ResultsReader(io.BufferedReader(job))
return query_results

# 针对查询结果做ETL
# log:查询结果
# option:数据分类标识
class FormatLog:
def __init__(self,log,option):
self.log = log
self.option = option

def format_log(self):
if self.option == 'original':
logdir = self.log['_raw']

return logdir

# 转发数据到第三方端口
# option:任务类型
# period:查询起始时间
# delay:查询终止时间
# time_type:查询周期
# output_ip:目的IP
# output_port:目的端口
class Forwardudp:
def __init__(self, option, period, delay, time_type, output_ip, output_port):
self.option = option
self.period = period
self.delay = delay
self.time_type = time_type
self.output_ip = output_ip
self.output_port = output_port

# 转发数据
def Forward_udp(self):
log = Logger(self.option, level='info')
phoenix_server = ConnectPhoenix(self.option)
query_results = phoenix_server.get_results(self.period, self.delay, self.time_type)
# 日志计数
count_data = 0
# 创建套接字
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 遍历发送数据
for result in query_results:
if isinstance(result, results.Message):
pass
else:
formatLog = FormatLog(result, self.option)
logdic = formatLog.format_log().decode('utf-8')
if logdic != '':
try:
s.sendto(str(logdic), (self.output_ip, self.output_port))
print(logdic)
count_data += 1
except Exception as e:
log.logger.info("Error: {}".format(e))

log.logger.info("forwarder process send {} msg to {}".format(count_data, self.output_ip))


if __name__=='__main__':
# get args from cron cmd
option = sys.argv[1] if len(sys.argv) > 1 else 'original' # 任务名,默认original
period = sys.argv[2] if len(sys.argv) > 1 else '1' # 查询起始时间=初始化为整点(当前时间 - period(time_type)),例:13:26:15执行查询前一小时数据,12:00:00=初始化为整点(13:26:15 - 1(hour))
delay = sys.argv[3] if len(sys.argv) > 0 else '0' # 查询截止时间=初始化为整点(当前时间 - delay(time_type)) - 1s,例:13:26:15执行查询前一小时数据,12:59:59=初始化为整点(13:26:15 - 1(hour)) - 1s
time_type = sys.argv[4] if len(sys.argv) > 4 else 'day' # 查询时间类型,默认day,可选:day、hour、minute
output_ip = sys.argv[5] if len(sys.argv) > 5 else '192.9.9.9' # 日志转发目的IP
output_port = int(sys.argv[6]) if len(sys.argv) > 5 else 514 # 日志转发目的端口

forwardudp = Forwardudp(option, period, delay, time_type, output_ip, output_port)
forwardudp.Forward_udp()