0%

优秀的 Go存储开源项目和库

A curated list of awesome Go storage projects and libraries. Inspired by awesome-go.

Contributing

Please take a quick gander at the contribution guidelines first. Thanks to all contributors; you rock!

If you see a package or project here that is no longer maintained or is not a good fit, please submit a pull request to improve this file. Thank you!

Contents

Storage Server

Storage Servers implemented in Go.

  • minio - Minio is an open source object storage server compatible with Amazon S3 APIs.
  • rclone - “rsync for cloud storage” - Google Drive, Amazon Drive, S3, Dropbox, Backblaze B2, One Drive, Swift, Hubic, Cloudfile…
  • perkeep - Perkeep is your personal storage system for life: a way of storing, syncing, sharing, modelling and backing up content.
  • s3git - Git for Cloud Storage. Distributed Version Control for Data.
  • storj - Decentralized cloud object storage that is affordable, easy to use, private, and secure.
  • rook - Open, Cloud Native, and Universal Distributed Storage.
  • longhorn Longhorn is an open source persistent block storage server delivered via containers.
Read more »

本文转发自技术世界原文链接 http://www.jasongj.com/2015/04/24/KafkaColumn2

摘要

  Kafka在0.8以前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上所有Partition都无法继续提供服务。若该Broker永远不能再恢复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一即是提供数据持久化,同时对于分布式系统来说,尤其当集群规模上升到一定程度后,一台或者多台机器宕机的可能性大大提高,对于Failover机制的需求非常高。因此,Kafka从0.8开始提供High Availability机制。本文从Data Replication和Leader Election两方面介绍了Kafka的HA机制。

Kafka为何需要High Available

为何需要Replication

  在Kafka在0.8以前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上所有的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。

  • 如果Producer使用同步模式则Producer会在尝试重新发送message.send.max.retries(默认值为3)次后抛出Exception,用户可以选择停止发送后续数据也可选择继续选择发送。而前者会造成数据的阻塞,后者会造成本应发往该Broker的数据的丢失。
  • 如果Producer使用异步模式,则Producer会尝试重新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会造成数据丢失并且用户只能通过日志发现该问题。

  由此可见,在没有Replication的情况下,一旦某机器宕机或者某个Broker停止工作则会造成整个系统的可用性降低。随着集群规模的增加,整个集群中出现该类异常的几率大大增加,因此对于生产系统而言Replication机制的引入非常重要。   

为何需要Leader Election

  (本文所述Leader Election主要指Replica之间的Leader Election)
  引入Replication之后,同一个Partition可能会有多个Replica,而这时需要在这些Replica中选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica作为Follower从Leader中复制数据。
  因为需要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必须要能继续服务并且即不能造成数据重复也不能造成数据丢失)。如果没有一个Leader,所有Replica都可同时读/写数据,那就需要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性非常难保证,大大增加了Replication实现的复杂性,同时也增加了出现异常的几率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。      

Kafka HA设计解析

如何将所有Replica均匀分布到整个集群

  为了更好的做负载均衡,Kafka尽量将所有的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提高Kafka的容错能力,也需要将同一个Partition的Replica尽量分散到不同的机器。实际上,如果所有的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的所有Replica都无法工作,也就达不到HA的效果。同时,如果某个Broker宕机了,需要保证它上面的负载可以被均匀的分配到其它幸存的所有Broker上。
  Kafka分配Replica的算法如下:

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序
  2. 将第i个Partition分配到第(i mod n)个Broker上
  3. 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上
Read more »

本文转发自技术世界原文链接 http://www.jasongj.com/2015/06/08/KafkaColumn3

本文在上篇文章 基础上,更加深入讲解了Kafka的HA机制,主要阐述了HA相关各种场景,如Broker failover,Controller failover,Topic创建/删除,Broker启动,Follower从Leader fetch数据等详细处理过程。同时介绍了Kafka提供的与Replication相关的工具,如重新分配Partition等。

摘要

  本文在上篇文章基础上,更加深入讲解了Kafka的HA机制,主要阐述了HA相关各种场景,如Broker failover,Controller failover,Topic创建/删除,Broker启动,Follower从Leader fetch数据等详细处理过程。同时介绍了Kafka提供的与Replication相关的工具,如重新分配Partition等。

Broker Failover过程

Controller对Broker failure的处理过程

  1. Controller在Zookeeper的/brokers/ids节点上注册Watch。一旦有Broker宕机(本文用宕机代表任何让Kafka认为其Broker die的情景,包括但不限于机器断电,网络不可用,GC导致的Stop The World,进程crash等),其在Zookeeper对应的Znode会自动被删除,Zookeeper会fire Controller注册的Watch,Controller即可获取最新的幸存的Broker列表。
  2. Controller决定set_p,该集合包含了宕机的所有Broker上的所有Partition。
  3. 对set_p中的每一个Partition:
      3.1 从/brokers/topics/[topic]/partitions/[partition]/state读取该Partition当前的ISR。
      3.2 决定该Partition的新Leader。如果当前ISR中有至少一个Replica还幸存,则选择其中一个作为新Leader,新的ISR则包含当前ISR中所有幸存的Replica。否则选择该Partition中任意一个幸存的Replica作为新的Leader以及ISR(该场景下可能会有潜在的数据丢失)。如果该Partition的所有Replica都宕机了,则将新的Leader设置为-1。
      3.3 将新的Leader,ISR和新的leader_epochcontroller_epoch写入/brokers/topics/[topic]/partitions/[partition]/state。注意,该操作只有Controller版本在3.1至3.3的过程中无变化时才会执行,否则跳转到3.1。
  4. 直接通过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller可以在一个RPC操作中发送多个命令从而提高效率。
      Broker failover顺序图如下所示。
    broker failover sequence diagram

  LeaderAndIsrRequest结构如下
LeaderAndIsrRequest

  LeaderAndIsrResponse结构如下
LeaderAndIsrResponse

Read more »

Kafka NotLeaderForPartitionException异常

异常分析

1.Kafka日志分析

发现Kafka日志中有比较多的org.apache.kafka.common.errors.NotLeaderForPartitionException异常信息,该异常从字面解释就是某个分区的leader找不到,具体异常信息如下:

1
2019-01-18 22:01:00,802 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcherThread-0-118]: Error for partition [kafka_custflow_topic_test,5] to broker 118:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

通常来说该异常信息是由于kafka和zk连接存在超时,接着导致Controller重新选举导致获取元数据不正确,timed out的那台Broker所持有的partition就会出现NotLeaderForPartitionException,kafka中连接zk超时日志格式信息如下:

1
2
2019-01-18 21:59:55,916 WARN org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 7329ms for sessionid 0x2677edb3ac1f8d5
2019-01-18 21:59:55,916 INFO org.apache.zookeeper.ClientCnxn: Client session timed out, have not heard from server in 7329ms for sessionid 0x2677edb3ac1f8d5, closing socket connection and attempting reconnect
1
2
3
4
5
6
7
2019-01-18 21:59:56,075 INFO org.apache.zookeeper.ClientCnxn: Opening socket connection to server mfsmaster/121.9.240.249:2181. Will not attempt to authenticate using SASL (unknown error)
2019-01-18 21:59:56,075 INFO org.apache.zookeeper.ClientCnxn: Socket connection established to mfsmaster/121.9.240.249:2181, initiating session
2019-01-18 21:59:56,080 WARN org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x2677edb3ac1f8d5 has expired
2019-01-18 21:59:56,080 INFO org.I0Itec.zkclient.ZkClient: zookeeper state changed (Expired)
2019-01-18 21:59:56,080 INFO org.apache.zookeeper.ClientCnxn: Unable to reconnect to ZooKeeper service, session 0x2677edb3ac1f8d5 has expired, closing socket connection
2019-01-18 21:59:56,080 INFO org.apache.zookeeper.ZooKeeper: Initiating client connection, connectString=SR-CNSX-GDFS-240-251:2181,SR-CNSX-GDFS-240-252:2181,SR-CNSX-GDFS-240-253:2181,mfslogger:2181,mfsmaster:2181/kafka sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7957dc72
2019-01-18 21:59:56,080 INFO org.apache.zookeeper.ClientCnxn: EventThread shut down for session: 0x2677edb3ac1f8d5
1
2
3
4
5
6
7
2019-01-18 21:59:56,094 INFO org.I0Itec.zkclient.ZkClient: zookeeper state changed (SyncConnected)
2019-01-18 21:59:56,095 INFO kafka.server.KafkaHealthcheck$SessionExpireListener: re-registering broker info in ZK for broker 121
2019-01-18 21:59:56,095 INFO kafka.utils.ZKCheckedEphemeral: Creating /brokers/ids/121 (is it secure? false)
2019-01-18 21:59:56,112 INFO kafka.utils.ZKCheckedEphemeral: Result of znode creation is: OK
2019-01-18 21:59:56,113 INFO kafka.utils.ZkUtils: Registered broker 121 at path /brokers/ids/121 with addresses: EndPoint(SR-CNSX-GDFS-240-252,9092,ListenerName(PLAINTEXT),PLAINTEXT)
2019-01-18 21:59:56,113 INFO kafka.server.KafkaHealthcheck$SessionExpireListener: done re-registering broker
2019-01-18 21:59:56,113 INFO kafka.server.KafkaHealthcheck$SessionExpireListener: Subscribing to /brokers/topics path to watch for new topics

2.zk日志分析

同时分析zk日志信息:

1
2
3
4
5
6
7
8
2019-01-18 21:59:55,961 WARN org.apache.zookeeper.server.NIOServerCnxn: caught end of stream exception
EndOfStreamException: Unable to read additional data from client sessionid 0x2677edb3ac1f8d5, likely client has closed socket
at org.apache.zookeeper.server.NIOServerCnxn.doIO(NIOServerCnxn.java:231)
at org.apache.zookeeper.server.NIOServerCnxnFactory.run(NIOServerCnxnFactory.java:208)
at java.lang.Thread.run(Thread.java:748)
2019-01-18 21:59:55,962 INFO org.apache.zookeeper.server.NIOServerCnxn: Closed socket connection for client /121.9.240.252:40660 which had sessionid 0x2677edb3ac1f8d5
2019-01-18 21:59:56,001 INFO org.apache.zookeeper.server.ZooKeeperServer: Expiring session 0x2677edb3ac1f8d5, timeout of 6000ms exceeded
2019-01-18 21:59:56,001 INFO org.apache.zookeeper.server.PrepRequestProcessor: Processed session termination for sessionid: 0x2677edb3ac1f8d5

综合zk和kafka日志信息,可以看出kafka和zk session超时之后,session会被zk主动关闭,之后kafka会重新连接到zk集群,基本是在1s之内kafka新的session已经建立,所以从短时间内kafka服务没问题。

3.gc日志分析

Gc 日志中基本没有Full GC

4.系统资源分析

  • 晚上9~10业务晚高峰,数据量通常比较大
  • CPU 内存正常
  • IO在晚高峰时存在突刺

解决方案

1.kafka增大session time out

当前默认值是6s,可适当加大超时时间

2.增加kafka磁盘

3.增加磁盘IO处理线程数

MySQL 读书笔记-索引

MySQ索引在存储引擎层实现不是在服务器层,不同的存储引擎的索引的实现方式不一样。本文重点关注采用B-Tree实现的索引,还有内存引擎使用的哈希索引、空间索引、全文索引不在本文描述范围内。

为何使用索引

  1. 查询时减少扫描数据量,加快查询速度
  2. 避免排序和临时表
  3. 将随机IO变为顺序IO

索引实现方式

MySQL 索引对于数据查询性能有很重要的影响,好的索引可能能让查询性能提升好几个数量级。MySQL先利用索引查找到对应值,然后根据匹配的索引记录找到对应的数据行。如果索引有多列则列的顺序很重要,因为MySQL有最左前缀匹配限制。

MySQL大部分存储引擎支持B-Tree索引,不过每个存储引擎的B-Tree数据结构可能不太一致。MyISAM采用前缀压缩技术使索引更小,InnoDB按照原始数据格式存储;MyISAM索引通过数据的物理位置引用被索引的行,InndoDB则根据主键引用被索引的行。

mysql_clustered_index

InnoDB有主键索引和非主键索引区分 ,对于主键索引来说叶子节点存放的是整行数据,主键索引又叫聚簇索引(clustered index);非主键索引的叶子节点内容是主键的值,又叫二级索引(secondary index)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
MariaDB [test]> desc tt;
+-------+-------------+------+-----+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+-------+-------------+------+-----+---------+-------+
| id | int(11) | NO | PRI | NULL | |
| k | int(11) | NO | MUL | NULL | |
| name | varchar(20) | YES | | NULL | |
+-------+-------------+------+-----+---------+-------+

// 搜索Id的B+Tree搜索树
select * from tt where id=100;

// 搜索k的B+Tree搜索树得到id的值,然后在用id值搜索id搜索树,回表
select * from tt where k=5;

索引与自增主键

索引采用B+Tree数据结构来存储,B+Tree数据结构父接口大于左节点小于右节点,所以数据表在插入数据时索引树存在维护操作,数据插入可能存在页分裂,数据删除存在可能页合并,此时会影响性能。

通常在建表规范中有描述建需要自增主键,自增主键定义NOT NULL PRIMARY AUTO_INTREMENT,采用自增主键在插入数据时采用追加操作,不涉及到数据移动,也不会触发叶子节点的分裂。

对于有业务字段做主键的场景,往往不容易保证有序插入,这样数据写成本比较高。此外还有一个考虑点业务主键的大小,假如以身份证作为业务主键,则该字段大小为20字节 ,此时每个二级索引的叶子节点占用20字节,相比用自增主键,二级索引叶子节点主要8字节。主键长度越小,普通索引的叶子节点就越小,普通索引占用的空间也就越小

索引查询

B-Tree索引使用的查询场景有:全值匹配, 最左前缀匹配,列前缀匹配,范围值匹配,精确匹配某一列并范围匹配另一列

全值匹配:被索引的每个列在where查询条件里面都有比较值

最左前缀匹配: 假设有多个索引列,where查询条件里面用到了第一列全值匹配

列前缀匹配:假设有多个索引列,where查询条件里面用到了第一列前几个字符匹配

范围值匹配:假设有多个索引列,where查询条件里面用到了第一列 between and

B-Tree索引限制:

  1. 如果不是按照索引的最左列开始查找,则无法使用索引
  2. 不能跳过索引的列
  3. 如果查询中有某个列的范围查询,则其右边所有列都不能使用索引优化查找 (like)

MySQ 5.6之后引入一个叫索引下推的功能,可以在索引遍历过程中,对索引中包含的字段先做判断,直接过滤掉不满足条件的记录,减少回表次数。

聚簇索引

聚集索引与非聚集索引的区别是,在同一颗B-Tree数据结构中是否同时保存了索引和数据行,当表有聚簇索引时它的数据行放在索引的叶子页中。一个表只能有一个聚簇索引,因为在一个表中数据的存放方式只有一种。InnoDB 主键使用的是聚簇索引,MyISAM 不管是主键索引,还是二级索引使用的都是非聚簇索引。InnoDB采用主键聚集数据,如果没有定义主键InnoDB会选择一个唯一的非空索引代替。

聚簇索引优点:

  1. 可以把相关数据存储在一起,在一定场景下可以减少IO操作
  2. 数据访问更快,因为聚簇索引把索引和数据保存在一起在同一个B-Tree中因此从聚簇索引中获取数据比非聚簇索引查找更快
  3. 使用覆盖索引扫描的查询可以直接使用叶节点中的主键值

聚餐索引缺点:

  1. 插入速度严重依赖插入顺序,按照主键顺序插入是加载数据到InnoDB表中速度最快的方式 。使用InnoDB时应该尽可能按主键顺序插入数据并尽可能使用单调增加聚簇键的值来插入新行。
  2. 更新聚簇索引的代价大
  3. 基于聚餐索引的表在插入新行或者主键更新需要移动行的时候,可能面临页分裂问题,页分裂导致表占用更多的磁盘空间
  4. 二级索引访问需要两次索引查找,而不是一次。通过二级索引获取对应的主键值,然后根据主键值去聚簇索引中查找对应数据行。这样做的好处当出现数据行移动时减少二级索引的维护工作

覆盖索引

如果一个索引包含所有查询字段的值,无需再读取数据行,则称为覆盖索引,覆盖索引可以减少搜索次数能够极大的提高查询性能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
MariaDB [test]> desc fulltest;
+-----------+----------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-----------+----------+------+-----+---------+----------------+
| node_id | int(11) | NO | PRI | NULL | auto_increment |
| prod_id | char(10) | NO | | NULL | |
| note_date | datetime | NO | | NULL | |
| note_text | text | YES | MUL | NULL | |
+-----------+----------+------+-----+---------+----------------+


MariaDB [test]> explain select node_id from fulltest \G;
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: fulltest
type: index
possible_keys: NULL
key: PRIMARY
key_len: 4
ref: NULL
rows: 2
Extra: Using index

MariaDB [test]> explain select * from fulltest where note_text='' \G;
*************************** 1. row ***************************
id: 1
select_type: SIMPLE
table: fulltest
type: ALL
possible_keys: note_text
key: NULL
key_len: NULL
ref: NULL
rows: 2
Extra: Using where
1 row in set (0.00 sec)

Spark On YARN 资源配置

YARN 调度模型

CDH YARN 界面中可以选择调度模型,有三种调度模型可供选择,分别是:Capacity Scheduler、FIFO Scheduler、Fair Scheduler,CDH YARN界面中Scheduler类配置项可以选择:

  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler

  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler

  • org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler

FIFO Scheduler:

任务调度先进先出,很好理解,且该调度模式不需要配置,不过它并不适用于共享集群。大的应用可能会占用所有集群资源,小任务有可能会一致阻塞。在共享集群中,更适合采用Capacity SchedulerFair Scheduler,这两个调度器都允许大任务和小任务在提交的同时获得一定的系统资源

Capacity Scheduler

有一个专门的队列用来运行小任务,但是为小任务专门设置一个队列会预先占用一定的集群资源,这就导致大任务的执行时间会落后于使用FIFO调度器时的时间。

Fair Scheduler

在Fair调度器中,不需要预先占用一定的系统资源,Fair调度器会为所有运行的job动态的调整系统资源。需要注意的是,小任务执行完成之后也会释放自己占用的资源,大任务又获得了全部的系统资源。最终的效果就是Fair调度器即得到了高的资源利用率又能保证小任务及时完成。

Spark on YARN跑的任务按照日志业务可划分为访问日志计算任务和流量日志计算任务,希望这两种任务资源尽量能隔离,两者之间不要资源抢占。在默认情况下spark运行在YARN上的所有Application公用一个Queue,Queue采用的调度模型是公平调度模型,这种情况下访问日志业务量突增之后会影响流量日志计算任务。为此设置两个队列,分布运行访问日志计算和流量日志计算。YARN总调度模型采用Fair调度模型,两个队列内部也采用Fair调度模型。

Capacity Scheduler

Capacity 调度器允许多个组织共享整个集群,每个组织可以获得集群的一部分计算能力。通过为每个组织分配专门的队列,然后再为每个队列分配一定的集群资源,这样整个集群就可以通过设置多个队列的方式给多个组织提供服务了。除此之外,队列内部又可以垂直划分,这样一个组织内部的多个成员就可以共享这个队列资源了,在一个队列内部,资源的调度是采用的是先进先出(FIFO)策略。

在正常的操作中,Capacity调度器不会强制释放Container,当一个队列资源不够用时,这个队列只能获得其它队列释放后的Container资源。当然,我们可以为队列设置一个最大资源使用量,以免这个队列过多的占用空闲资源,导致其它队列无法使用这些空闲资源,这就是”弹性队列”需要权衡的地方。

主要的特点:

  • 分级队列–支持队列分级,以确保在允许其他队列使用空闲资源之前,资源在组织的子队列之间共享,从而提供更多的控制和可预测性。
  • 容量保证–队列被分配了网格容量的一小部分,即一定容量的资源将由它们支配。提交给队列的所有应用程序都可以访问分配给队列的容量。管理员可以对分配给每个队列的容量配置软限制和可选硬限制。
  • 安全性–每个队列都有严格的ACl,控制哪些用户可以向单个队列提交应用程序。此外,还有安全防护措施来确保用户不能查看和/或修改来自其他用户的应用程序。此外,还支持按队列和系统管理员角色。
  • 弹性–资源可以分配给超出其容量的任何队列。当未来某个时间点运行在容量不足的队列需要这些资源时,随着这些资源上计划的任务完成,它们将被分配给运行在容量不足的队列上的应用程序(也支持抢占)。这可以确保队列可以以可预测和灵活的方式获得资源,从而防止集群中人为的资源孤岛,这有助于利用率。
  • 多租户–提供了一组全面的限制,以防止单个应用程序、用户和队列独占队列或整个集群的资源,从而确保集群不会不堪重负。
  • 基于资源的调度–支持资源密集型应用程序,其中应用程序可以选择性地指定比默认更高的资源需求,从而适应具有不同资源需求的应用程序。目前,内存是支持的资源需求。

配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<property>
<name>yarn.scheduler.capacity.root.queues</name>
<value>a,b,c</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>

<property>
<name>yarn.scheduler.capacity.root.a.queues</name>
<value>a1,a2</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>

<property>
<name>yarn.scheduler.capacity.root.b.queues</name>
<value>b1,b2,b3</value>
<description>The queues at the this level (root is the root queue).
</description>
</property>

Fair Scheduler

公平调度是一种将资源分配给应用程序的方法,这样所有应用程序在一段时间内平均获得相同的资源份额。Hadoop NextGen能够调度多种资源类型。默认情况下,公平调度器仅基于内存来调度公平决策。它可以被配置为使用内存和CPU进行调度,使用Ghodsi等人开发的优势资源公平概念。当有一个应用程序运行时,该应用程序使用整个集群。提交其他应用程序时,释放的资源会分配给新应用程序,这样每个应用程序最终获得的资源量大致相同。与默认的Hadoop调度器不同,Hadoop调度器形成了一个应用队列,它允许短应用在合理的时间内完成,而不会耗尽长寿命应用。这也是一种在多个用户之间共享集群的合理方式。最后,公平共享也可以与应用程序优先级一起工作——优先级被用作权重来确定每个应用程序应该获得的总资源的比例。

调度程序将应用程序进一步组织成“队列”,并在这些队列之间公平地共享资源。默认情况下,所有用户共享一个名为“default”的队列。如果某个应用程序在容器资源请求中特别列出了一个队列,该请求将被提交到该队列。也可以通过配置根据请求中包含的用户名分配队列。在每个队列中,调度策略用于在运行的应用程序之间共享资源。默认为基于内存的公平共享,但也可以配置先进先出和具有优势资源公平的多资源。队列可以按层次排列以划分资源,并配置权重以按特定比例共享集群。

除了提供公平共享之外,公平调度器还允许为队列分配有保证的最小共享,这对于确保某些用户、组或生产应用程序始终获得足够的资源非常有用。当一个队列包含应用程序时,它至少会得到它的最小份额,但是当队列不需要它的全部保证份额时,多余的份额会在其他正在运行的应用程序之间分配。这使得调度器能够保证队列的容量,同时在这些队列不包含应用程序时高效地利用资源。

默认情况下,公平调度程序允许所有应用程序运行,但也可以通过配置文件限制每个用户和每个队列运行的应用程序数量。当用户必须一次提交数百个应用程序时,这可能很有用,如果一次运行太多应用程序会导致创建太多中间数据或太多上下文切换,这通常会提高性能。限制应用程序不会导致任何后续提交的应用程序失败,只会在调度程序的队列中等待,直到用户的一些早期应用程序完成。

CDH YARN界面中可以为Fair Scheduler、Capacity Scheduler这两种调度模型分别设置配置,在界面中配置项分别是 **容量调度程序配置高级配置代码段(安全阀)**和 Fair Scheduler XML 高级配置代码段(安全阀), 可选择XML视图。

公平调度XML配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<?xml version="1.0" encoding="utf-8"?>

<allocations>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>

<queue name="logAnalysis">
<weight>60</weight>
<minResources>80000 mb, 30 vcores</minResources>
<maxResources>100000 mb, 70 vcores</maxResources>
<maxRunningApps>10</maxRunningApps>
<minSharePreemptionTimeout>100</minSharePreemptionTimeout>
<aclSubmitApps></aclSubmitApps>
<aclAdministerApps></aclAdministerApps>
</queue>

<queue name="flowAnalysis">
<weight>40</weight>
<minResources>10000 mb, 10 vcores</minResources>
<maxResources>40000 mb, 30 vcores</maxResources>
<maxRunningApps>10</maxRunningApps>
<minSharePreemptionTimeout>100</minSharePreemptionTimeout>
<aclSubmitApps></aclSubmitApps>
<aclAdministerApps></aclAdministerApps>
</queue>

<user name="root">
<maxRunningApps>10</maxRunningApps>
</user>

<userMaxAppsDefault>50</userMaxAppsDefault>
<fairSharePreemptionTimeout>200</fairSharePreemptionTimeout>
</allocations>

  • 设置两个队列,分别是logAnalysis和flowAnalysis,Spark在submit application时需要制定该任务在哪个队列中执行,不然默认情况下会运行在default队列中。Spark制定队列配置项为spark.yarn.queue

  • logAnalysis队列和flowAnalysis队列资源权重为6 : 4

  • 分布配置项设置队列占用最小资源和最大资源,包含CPU核数和内存大小,这两个配置项需要根据Spark执行时设置的executor来设置

  • 配置项设置了调度模型为Fair

  • 设置队列可同时执行的Application数量

资源抢占

YARN的yarn.scheduler.fair.preemption配置是否启用 Fair Scheduler 抢占,如果开启了资源抢占:

  • 在资源调度器中,每个队列可设置一个最小资源量和最大资源量,其中,最小资源量是资源紧缺情况下每个队列需保证的资源量,而最大资源量则是极端情况下队列也不能超过的资源使用量
  • 开启资源抢占后当某个队列资源不足时,调度器会杀死其他队列的container以释放资源,分给这个队列
  • 每个队列都有minShare、fairShare属性。这两个属性是抢占式调度的阈值。当一个队列使用的资源小于fairShare*X(defaultFairSharePreemptionThreshold)、或者小于minShare,并且持续超过一定时间(这两种情况的超时时间不同,可以设置),就会开始抢占式调度
  • 具体YARN抢占的算法参考官方文档

任务提交

Spark在submit任务时可以用spark.yarn.queue配置项制定把该任务提交到哪个YARN资源队列

理解Go Channel

CSP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,由 Tony Hoare 于 1977 年提出。简单来说,CSP 模型由并发执行的实体(线程或者进程)所组成,实体之间通过发送消息进行通信,这里发送消息时使用的就是通道,或者叫 channel。CSP 模型的关键是关注 channel,而不关注发送消息的实体。Go 语言实现了 CSP 部分理论,goroutine 对应 CSP 中并发执行的实体,channel 也就对应着 CSP 中的 channel。

Channel类型

1
2
3
chan T          // 可以接收和发送类型为 T 的数据
chan<- float64 // 只可以用来发送 float64 类型的数据
<-chan int // 只可以用来接收 int 类型的数据

<-总是优先和最左边的类型结合

1
2
3
4
chan<- chan int    // 等价 chan<- (chan int)
chan<- <-chan int // 等价 chan<- (<-chan int)
<-chan <-chan int // 等价 <-chan (<-chan int)
chan (<-chan int)

Channel创建

使用make初始化Channel,并且可以设置容量

1
2
unBufferChan := make(chan int)  // 1
bufferChan := make(chan int, N) // 2

上面的方式 1 创建的是无缓冲 channel,方式 2 创建的是缓冲 channel。如果使用 channel 之前没有 make,会出现 dead lock 错误。

1
2
3
4
5
6
7
8
9
10
11
12
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive (nil chan)]:
main.main()
/Users/knife/Work/GoWorkplace/src/test/go.go:8 +0x4a

goroutine 4 [chan send (nil chan)]:
main.main.func1(0x0)
/Users/knife/Work/GoWorkplace/src/test/go.go:6 +0x37
created by main.main
/Users/knife/Work/GoWorkplace/src/test/go.go:5 +0x3e
exit status 2

Channel发送和接收

1
2
3
4
5
6
7
ch := make(chan int, 10)

// 读操作
x <- ch

// 写操作
ch <- x

channel 分为无缓冲 channel 和有缓冲 channel。

  • 无缓冲:发送和接收动作是同时发生的。如果没有 goroutine 读取 channel (<- channel),则发送者 (channel <-) 会一直阻塞
  • 缓冲:缓冲 channel 类似一个有容量的队列。当队列满的时候发送者会阻塞;当队列空的时候接收者会阻塞。

Channel关闭

1
2
3
4
ch := make(chan int)

// 关闭
close(ch)
  • 重复关闭 channel 会导致 panic
  • 向关闭的 channel 发送数据会 panic
  • 从关闭的 channel 读数据不会 panic,读出 channel 中已有的数据之后再读就是 channel 类似的默认值,比如 chan int 类型的 channel 关闭之后读取到的值为 0
1
2
3
4
5
6
7
8
9
ch := make(chan int, 10)
...
close(ch)

// ok-idiom
val, ok := <-ch
if ok == false {
// channel closed
}

Channel Range

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func consumer(ch chan int) {
...
for x := range ch {
fmt.Println(x)
...
}
}

func producer(ch chan int) {
...
for _, v := range values {
ch <- v
}
}

Channel Select

select会一致阻塞直到有case满足条件,select通常和for循环一起用。for + select + time.After可以实现超时,time.After返回一个类型为<-chan Time的单向的channel

1
2
3
4
5
6
7
8
9
10
11
12
for {
select {
case a <- ch1:
break
case b <- ch2:
break
case <- time.After(2 * time.Second)
break
default:
break
}
}

参考

  1. Go Concurrency Patterns: Pipelines and cancellation
  2. Go Channel 详解
  3. 深入理解Go Channel

理解spark闭包

什么叫闭包: 跨作用域访问函数变量。又指的一个拥有许多变量和绑定了这些变量的环境的表达式(通常是一个函数),因而这些变量也是该表达式的一部分。

Spark闭包的问题引出:
在spark中实现统计List(1,2,3)的和。如果使用下面的代码,程序打印的结果不是6,而是0。这个和我们编写单机程序的认识有很大不同。为什么呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
object Test {
def main(args:Array[String]):Unit = {
val conf = new SparkConf().setAppName("test");
val sc = new SparkContext(conf)

val rdd = sc.parallelize(List(1,2,3))
var counter = 0
//warn: don't do this
rdd.foreach(x => counter += x)
println("Counter value: "+counter)

sc.stop()
}
}1234567891011121314

问题分析:
counter是在foreach函数外部定义的,也就是在driver程序中定义,而foreach函数是属于rdd对象的,rdd函数的执行位置是各个worker节点(或者说worker进程),main函数是在driver节点上(或者说driver进程上)执行的,所以当counter变量在driver中定义,被在rdd中使用的时候,出现了变量的“跨域”问题,也就是闭包问题。

问题解释:
对于上面程序中的counter变量,由于在main函数和在rdd对象的foreach函数是属于不同“闭包”的,所以,传进foreach中的counter是一个副本,初始值都为0。foreach中叠加的是counter的副本,不管副本如何变化,都不会影响到main函数中的counter,所以最终打印出来的counter为0.

当用户提交了一个用scala语言写的Spark程序,Spark框架会调用哪些组件呢?首先,这个Spark程序就是一个“Application”,程序里面的mian函数就是“Driver Program”, 前面已经讲到它的作用,只是,dirver程序的可能运行在客户端,也有可有可能运行在spark集群中,这取决于spark作业提交时参数的选定,比如,yarn-client和yarn-cluster就是分别运行在客户端和spark集群中。在driver程序中会有RDD对象的相关代码操作,比如下面代码的newRDD.map()

1
2
3
4
5
6
7
8
9
10
11
class Test{
def main(args: Array[String]) {
val sc = new SparkContext(new SparkConf())
val newRDD = sc.textFile("")

newRDD.map(data => {
//do something
println(data.toString)
})
}
}

涉及到RDD的代码,比如上面RDD的map操作,它们是在Worker节点上面运行的,所以spark会透明地帮用户把这些涉及到RDD操作的代码传给相应的worker节点。如果在RDD map函数中调用了在函数外部定义的对象,因为这些对象需要通过网络从driver所在节点传给其他的worker节点,所以要求这些类是可序列化的,比如在Java或者scala中实现Serializable类,除了java这种序列化机制,还可以选择其他方式,使得序列化工作更加高效。worker节点接收到程序之后,在spark资源管理器的指挥下运行RDD程序。不同worker节点之间的运行操作是并行的。

在worker节点上所运行的RDD中代码的变量是保存在worker节点上面的,在spark编程中,很多时候用户需要在driver程序中进行相关数据操作之后把该数据传给RDD对象的方法以做进一步处理,这时候,spark框架会自动帮用户把这些数据通过网络传给相应的worker节点。除了这种以变量的形式定义传输数据到worker节点之外,spark还另外提供了两种机制,分别是broadcast和accumulator。相比于变量的方式,在一定场景下使用broadcast比较有优势,因为所广播的数据在每一个worker节点上面只存一个副本,而在spark算子中使用到的外部变量会在每一个用到它的task中保存一个副本,即使这些task在同一个节点上面。所以当数据量比较大的时候,建议使用广播而不是外部变量。

理解闭包

​ Spark中理解起来比较困难的一点是当代码在集群上运行时变量和方法的生命周期和作用域(scope)。当作用于RDD上的操作修改了超出它们作用域范围的变量时,会引起一些混淆。为了说明这个问题,使用下面的例子。该例中使用foreach(),对counter(计数器)进行增加,相同的问题也会发生在其他操作中。

例子

​ 下面的例子在以本地模式运行(–master = local[n]) 和将它部署到集群中 (例如通过 spark-submit 提交到 YARN)对比发现会产生不同的结果。

1
2
3
4
5
var counter =  0 
var rdd = sc.parallelize(data)
// 错误,请不要这样做!!
rdd.foreach(x => counter += x)
println( "Counter value: " + counter)

本地模式 vs. 集群模式

​ 这里主要的挑战是上面代码的行为是有歧义的。以本地模式运行在单个JVM上,上面的代码会将RDD中的值进行累加,并且将它存储到counter中。这是因为RDD和变量counter在driver节点的相同内存空间中。
然而,以集群模式运行时,会更加复杂,上面的代码的结果也许不会如我们预期的那样。当执行一个作业(job)时,Spark会将RDD分成多个任务(task)–每一个任务都会由一个executor来执行。在执行之前,Spark会计算闭包(closure)。闭包是对executors可见的那部分变量和方法,executors会用闭包来执行RDD上的计算(在这个例子中,闭包是foreach())。这个闭包是被序列化的,并且发送给每个executor。在本地模式中,只有一个executor,所以共享相同的闭包。然而,在集群模式中,就不是这样了。executors会运行在各自的worker节点中,每个executor都有闭包的一个复本。
发送给每个executor的闭包中的变量其实也是复本。每个foreach函数中引用的counter不再是driver节点上的counter。当然,在driver节点的内存中仍然存在这一个counter,但是这个counter对于executors来说是不可见的。executors只能看到自己的闭包中的复本。这样,counter最后的值仍旧是0,因为所有在counter的操作只引用了序列化闭包中的值。
为了在这样的场景中,确保这些行为正确,应该使用累加变量(Accumulator)。在集群中跨节点工作时,Spark中的累加变量提供了一种安全的机制来更新变量。所以可变的全局状态应该使用累加变量来定义。

所以上面的例子可以这样写:

1
2
3
4
5
// counter现在是累加变量
var counter = sc.accumulator( 0)
var rdd = sc.parallelize(data)
rdd.foreach(x => counter += x)
println( "Counter value: " + counter)

Kafka 集群扩容

kafka 集群在新增机器后是不会把历史已经创建的topic数据信息迁移到新加入集群的机器,只有新创建的topic才会分布到新机器。若想把历史topic数据均匀分布到加入机器后的新集群,需要人为操作,好在kafka提供了相应的工具可简单地完成数据迁移工作。

迁移数据的过程是手动启动的,但是执行过程是完全自动化的。在kafka后台内部中,kafka将添加新的服务器,并作为正在迁移分区的follower,来完全复制该分区现有的数据。当新服务器完全复制该分区的内容并加入同步副本,成为现有副本之一后,就将现有的副本分区上的数据删除。

分区重新分配工具可以用于跨broker迁移分区,理想的分区分配将确保所有的broker数据负载和分区大小。分区分配工具没有自动研究kafka集群的数据分布和迁移分区达到负载分布的能力,因此,管理员要弄清楚哪些topic或分区应该迁移。

数据清理

在数据迁移的过程中涉及大量的数据复制,对于数据存储量大的topic,如果历史数据不是必须的,可以适当地删除数据,针对某些topic设置retetion时间,操作成功后无需重启kafka,命令如下:

1
./kafka-configs.sh --zookeeper zookeeper:2181/kafka --entity-type topics --entity-name input_kafka_nginxLog_topic --alter --add-config retention.ms=86400000 

数据分区

分区分配工具的3种模式:

  • generate: 这个选项命令,是生成分配规则json文件的,生成“候选人”重新分配到指定的topic的所有parition都移动到新的broker。此选项,仅提供了一个方便的方式来生成特定的topic和目标broker列表的分区重新分配 “计划”。该命令选项会在shell终端输出JSON格式的重新分区后的数据。在使用该选项时,broker选择需要注意加上新机器ID

  • execute: 这个选项命令,是执行你用–generate 生成的分配规则json文件的,(用–reassignment-json-file 选项),可以是自定义的分配计划,也可以是由管理员或通过–generate选项生成的。

  • verify: 这个选项命令,是验证执行–execute重新分配后,列出所有分区的状态,状态可以是成功完成,失败或正在进行中的。

generate

  1. 确认需要重新分区的topic名称,并以JSON格式写到文件中,示例:
1
2
3
4
{
"topics": [{"topic": "input_kafka_nginxLog_topic"}],
"version": 1
}
  1. 生成新的分区列表配置, 示例:
1
kafka-reassign-partitions --zookeeper zookeeper:2181/kafka/kafka --topics-to-move-json-file ./topic.json --broker-list 117,118,119,120,121 --generate

该命令会在shell界面上以JSON格式输出重新分区后的配置,需要手动保存topic_reassgin.json文件中,后续execute会用到

execute

该命令执行重新分区操作,根据重新分区配置,会更改历史数据,异步操作。该操作存在大量磁盘和网络IO,如果kafka队列中该topic存在大量的数据,执行时间很长

1
kafka-reassign-partitions --zookeeper zookeeper:2181/kafka --reassignment-json-file ~/after.json --execute

verify

该命令可以确认第二部操作是否操作完成,如果所有分区数据都为done则表示重分区成功

1
)kafka-reassign-partitions --zookeeper zookeeper:2181/kafka --reassignment-json-file ~/after.json --verify

基于Zookeeper的分布式锁

原文地址

实现分布式锁目前有三种流行方案,分别为基于数据库、Redis、Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开。我们来看下使用Zookeeper如何实现分布式锁。

什么是Zookeeper?

Zookeeper(业界简称zk)是一种提供配置管理、分布式协同以及命名的中心化服务,这些提供的功能都是分布式系统中非常底层且必不可少的基本功能,但是如果自己实现这些功能而且要达到高吞吐、低延迟同时还要保持一致性和可用性,实际上非常困难。因此zookeeper提供了这些功能,开发者在zookeeper之上构建自己的各种分布式系统。

虽然zookeeper的实现比较复杂,但是它提供的模型抽象却是非常简单的。Zookeeper提供一个多层级的节点命名空间(节点称为znode),每个节点都用一个以斜杠(/)分隔的路径表示,而且每个节点都有父节点(根节点除外),非常类似于文件系统。例如,/foo/doo这个表示一个znode,它的父节点为/foo,父父节点为/,而/为根节点没有父节点。与文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据而目录节点不行。Zookeeper为了保证高吞吐和低延迟,在内存中维护了这个树状的目录结构,这种特性使得Zookeeper不能用于存放大量的数据,每个节点的存放数据上限为1M。

而为了保证高可用,zookeeper需要以集群形态来部署,这样只要集群中大部分机器是可用的(能够容忍一定的机器故障),那么zookeeper本身仍然是可用的。客户端在使用zookeeper时,需要知道集群机器列表,通过与集群中的某一台机器建立TCP连接来使用服务,客户端使用这个TCP链接来发送请求、获取结果、获取监听事件以及发送心跳包。如果这个连接异常断开了,客户端可以连接到另外的机器上。

Read more »