Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
2019-01-1822:01:00,802 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcherThread-0-118]: Error for partition [kafka_custflow_topic_test,5] to broker 118:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
2019-01-18 21:59:55,916 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 7329ms for sessionid 0x2677edb3ac1f8d5 2019-01-18 21:59:55,916 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 7329ms for sessionid 0x2677edb3ac1f8d5, closing socket connection and attempting reconnect
1 2 3 4 5 6 7
2019-01-18 21:59:56,075 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server mfsmaster/121.9.240.249:2181. Will not attempt to authenticate using SASL (unknown error) 2019-01-18 21:59:56,075 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to mfsmaster/121.9.240.249:2181, initiating session 2019-01-18 21:59:56,080 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x2677edb3ac1f8d5 has expired 2019-01-18 21:59:56,080 INFO org.I0Itec.zkclient.ZkClient: zookeeper state changed (Expired) 2019-01-18 21:59:56,080 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x2677edb3ac1f8d5 has expired, closing socket connection 2019-01-18 21:59:56,080 INFO org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=SR-CNSX-GDFS-240-251:2181,SR-CNSX-GDFS-240-252:2181,SR-CNSX-GDFS-240-253:2181,mfslogger:2181,mfsmaster:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7957dc72 2019-01-18 21:59:56,080 INFO org.apache.zookeeper.ClientCnxn: EventThread shut down for session: 0x2677edb3ac1f8d5
1 2 3 4 5 6 7
2019-01-18 21:59:56,094 INFO org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected) 2019-01-18 21:59:56,095 INFO kafka.server.KafkaHealthcheck$SessionExpireListener: re-registering broker info in ZK for broker 121 2019-01-18 21:59:56,095 INFO kafka.utils.ZKCheckedEphemeral: Creating /brokers/ids/121 (is it secure? false) 2019-01-18 21:59:56,112 INFO kafka.utils.ZKCheckedEphemeral: Result of znode creation is: OK 2019-01-18 21:59:56,113 INFO kafka.utils.ZkUtils: Registered broker 121 at path /brokers/ids/121 with addresses: EndPoint(SR-CNSX-GDFS-240-252,9092,ListenerName(PLAINTEXT),PLAINTEXT) 2019-01-18 21:59:56,113 INFO kafka.server.KafkaHealthcheck$SessionExpireListener: done re-registering broker 2019-01-18 21:59:56,113 INFO kafka.server.KafkaHealthcheck$SessionExpireListener: Subscribing to /brokers/topics path to watch for new topics
2.zk日志分析
同时分析zk日志信息:
1 2 3 4 5 6 7 8
2019-01-18 21:59:55,961 WARN org.apache.zookeeper.server.NIOServerCnxn: caught end of stream exception EndOfStreamException: Unable to read additional data from client sessionid 0x2677edb3ac1f8d5, likely client has closed socket at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:231) at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208) at java.lang.Thread.run(Thread.java:748) 2019-01-18 21:59:55,962 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /121.9.240.252:40660 which had sessionid 0x2677edb3ac1f8d5 2019-01-18 21:59:56,001 INFO org.apache.zookeeper.server.ZooKeeperServer: Expiring session 0x2677edb3ac1f8d5, timeout of 6000ms exceeded 2019-01-18 21:59:56,001 INFO org.apache.zookeeper.server.PrepRequestProcessor: Processed session termination for sessionid: 0x2677edb3ac1f8d5
Spark on YARN跑的任务按照日志业务可划分为访问日志计算任务和流量日志计算任务,希望这两种任务资源尽量能隔离,两者之间不要资源抢占。在默认情况下spark运行在YARN上的所有Application公用一个Queue,Queue采用的调度模型是公平调度模型,这种情况下访问日志业务量突增之后会影响流量日志计算任务。为此设置两个队列,分布运行访问日志计算和流量日志计算。YARN总调度模型采用Fair调度模型,两个队列内部也采用Fair调度模型。
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>a,b,c</value> <description>The queues at the this level (root is the root queue). </description> </property>
<property> <name>yarn.scheduler.capacity.root.a.queues</name> <value>a1,a2</value> <description>The queues at the this level (root is the root queue). </description> </property>
<property> <name>yarn.scheduler.capacity.root.b.queues</name> <value>b1,b2,b3</value> <description>The queues at the this level (root is the root queue). </description> </property>