1. flink-conf.yaml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Jobmanager的地址,taskmanager必须要识别并能连上。
# 只有standalone的集群模式起作用,当执行bin/jobmanager.sh --host<hostname>的时候将被覆盖
# 在YARN或者Mesos的集群模式下将自动替换为jobmanager所在节点的hostname
# JobMaster节点IP地址
jobmanager.rpc.address

# JobMaster端口号,默认6123
jobmanager.rpc.port:6123

# JobManager能够使用的内存的最大值
jobmanager.heap.size: 1024m

# taskmanager 能够使用的最大内存大小
taskmanager.memory.process.size: 2048m

# taskmanager最大内存,不包含虚拟机的metaspace和其他的开销(不能和taskmanager.memory.process.size同时设置)
# taskmanager.memory.flink.size: 2048m

# 这俩不能和taskmanager.memory.flink.size或taskmanager.memory.process.size同时设置
# 指定JVM堆内存大小
# taskmanager.memory.task.heap.size: 2048m
# 指定JVM托管内存大小
# taskmanager.memory.managed.size: 512m

# 每个TaskManager提供的slot数量,每个slot上能运行一个并行的pipeline任务。
taskmanager.numberOfTaskSlots: 1

# 没有指定并行度的情况下,默认的全局并行度
parallelism.default: 1

# 默认的文件系统的数据结构和授权
# 如果是本地文件,那么格式为: 'file:///'
# 如果是HDFS文件,那么格式为:'hdfs://namenode_name:9870'
# fs.default-scheme

#==============================================================================
# 高可用配置
#==============================================================================

# 高可用模型的选择有:'NONE' 或者 'zookeeper'
# high-availability: zookeeper

# 指定master恢复所需要的元数据存储的路径。这个路径需要存储像持久化数据流图一样的大对象
# high-availability.storageDir: hdfs:///flink/ha/

# # ZooKeeper集群的地址,用于协调高可用模型的建立
# high-availability.zookeeper.quorum: localhost:2181

# 如果zookeeper安全策略开启,那么下面这个值就应该是creator,默认是oepn
# high-availability.zookeeper.client.acl: open

#==============================================================================
# 容错和检查点配置
#==============================================================================

# Flink后端会默认存储算子的状态,当checkpointing是开启状态的时候
# Flink支持的后端存储有 'jobmanager(MemoryStateBackend)', 'filesystem', 'rocksdb', 或者自定的类<class-name-of-factory>.
# state.backend: filesystem

# 自动存储检查点的数据文件和元数据的默认目录,这个目录必须能被所有的进程、节点访问
# state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints

# 手动保存状态时 savepoints 的目录(可选),用于状态存储写文件时使用
state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints

# 是否开启增量存储(类似RocksDB存储转态的情况)
# state.backend.incremental: false

# 故障恢复的策略,怎么样从失败的任务重恢复计算。配置可选: full 或者 region
# full: 表示集群中的任务发生故障的时候,所有的task重新启动
# region: 表示集群中的任务发生故障的时候,仅重启被失败失败任务所影响的下游的任务和产生数据的上游任务。
jobmanager.execution.failover-strategy: region

#==============================================================================
# REST和前端设置
#==============================================================================

# Web端运行时监视器端口
rest.port: 8081

# Web Server的地址配置,用于REST api调用
#rest.address: 0.0.0.0

# 配置REST端口绑定的范围
#rest.bind-port: 8080-8090

# 配置Server地址并绑定server
#rest.bind-address: 0.0.0.0

# 配置任务提交是否在网页端开启,默认开启
#web.submit.enable: false

#==============================================================================
# 高级配置
#==============================================================================

# 指定默认的本地临时目录。Flink在运行时会把临时数据写到本地文件系统,
# 比如 Flink 接收到的JAR、应用程序状态(当用RocksDB存储应用程序状态时),要避免目录里的数据被服务器自动清空
# 否则 job 重启时可能因找不到元数据导致恢复失败
# 注意:推荐是添加多个文件夹地址(unix系统使用冒号隔开,windows使用逗号隔开),多个文件夹地址可以是相同的,多文件地址的配置致使Flink创建多个线程来出来,提高读写效率和吞度量
# io.tmp.dirs: /tmp

# 是否在 TM 启动时预先分配 TM 管理的内存
taskmanager.memory.preallocate: false

# 类加载顺序的设定,Flink默认是'child-first',还可以选:'parent-first'
# 'child-first'这种加载机制允许用户在自己的应用中使用不同的依赖或者JAR包的版本。
# classloader.resolve-order: child-first

# 网络堆栈能使用的内存总量的配置。这些配置一般不需要调优。
# 当遇到"Insufficient number of network buffers" 错误的时候就需要调整下面的参数了。
# 默认最小是64MB, 最大值是 1GB
# TM 节点为网络缓冲区分配的内存,默认占JVM堆内存10%
# taskmanager.memory.network.fraction: 0.1
# 网络缓冲区最小值
# taskmanager.memory.network.min: 64mb
# 网络缓冲区最大值
# taskmanager.memory.network.max: 1gb

#==============================================================================
# Flink集群安全配置
#==============================================================================

# Kerberos认证,主要用在Hadoop,Zookeeper, connectors上的安全组件
# 通常开启的步骤为:
# 1. 配置本地的krb5.conf文件
# 2. 提供Kerberos的凭证(可以是一个keytab或者一个ticket cache)
# 3. 针对不同的JAAS的登录上下文,激活Kerberos的凭证
# 4. 使用JAAS/SASL配置连接

# 下面这些配置是说明Kerberos怎么提供安全凭证的。
# 当用户提供了keytab path 和 principal,keytab将用于代贴ticket cache

# 是否从 Kerberos ticket 缓存中读取
# security.kerberos.login.use-ticket-cache: true

# 包含用户凭据的 Kerberos 密钥表文件的绝对路径
# security.kerberos.login.keytab: /path/to/kerberos/keytab


# 与 keytab 关联的 Kerberos 主体名称
# security.kerberos.login.principal: flink-user

# 以逗号分隔的登录上下文列表,用于提供 Kerberos 凭据
# 例如,Client,KafkaClient 使用凭证进行 ZooKeeper 身份验证和 Kafka 身份验证# security.kerberos.login.contexts: Client,KafkaClient

#==============================================================================
# Zookeeper 安全配置
#==============================================================================

# 当zookeeper开启了安全模式,下面的配置就是必要的。

# 如果zookeeper配置了服务名称,就需要修改下面这个配置
# zookeeper.sasl.service-name: zookeeper

# 该配置必须匹配 security.kerberos.login.contexts 配置中的列表(含有一个)
# zookeeper.sasl.login-context-name: Client

#==============================================================================
# 历史任务服务器配置
#==============================================================================

# HistoryServer 通过bin/historyserver.sh (start|stop)开启和关闭

# 用于上传执行完成的job。也是HistoryServer监控的目录。
#jobmanager.archive.fs.dir: hdfs:///completed-jobs/

# 基于 Web 的 HistoryServer 的地址
#historyserver.web.address: 0.0.0.0

# 基于 Web 的 HistoryServer 的端口号
#historyserver.web.port: 8082

# 以逗号分隔的目录列表,指定监控已完成的job所在的地址
#historyserver.archive.fs.dir: hdfs:///completed-jobs/

# 刷新受监控目录的时间间隔(毫秒)
#historyserver.archive.fs.refresh-interval: 10000

2. masters

1
2
# 集群的 master 节点
localhost:8081

3. workers

1
2
# 集群中所有的工作节点
localhost

4. zoo.cfg

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 每个 tick 的毫秒数
tickTime=2000

# 初始同步阶段可以采用的 tick 数
initLimit=10

# 在发送请求和获取确认之间可以传递的 tick 数
syncLimit=5

# 存储快照的目录
# dataDir=/tmp/zookeeper

# 客户端将连接的端口
clientPort=2181

# ZooKeeper quorum peers
server.1=localhost:2888:3888
# server.2=host:peer-port:leader-port

5. 日志配置

1
2
3
4
5
6
7
8
# 在不同平台下Flink运行的日志文件
log4j-cli.properties
log4j-console.properties
log4j-yarn-session.properties
log4j.properties
logback-console.xml
logback-yarn.xml
logback.xml