0%

Kafka高性能架构之道

本文转发自技术世界原文链接

本文从宏观架构层面和微观实现层面分析了Kafka如何实现高性能。包含Kafka如何利用Partition实现并行处理和提供水平扩展能力,如何通过ISR实现可用性和数据一致性的动态平衡,如何使用NIO和Linux的sendfile实现零拷贝以及如何通过顺序读写和数据压缩实现磁盘的高效利用。

摘要

宏观架构层面

利用Partition实现并行处理

Partition提供并行处理的能力

Kafka是一个Pub-Sub的消息系统,无论是发布还是订阅,都须指定Topic。如《Kafka设计解析(一)- Kafka背景及架构介绍》一文所述,Topic只是一个逻辑的概念。每个Topic都包含一个或多个Partition,不同Partition可位于不同节点。同时Partition在物理上对应一个本地文件夹,每个Partition包含一个或多个Segment,每个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,可以把一个Partition当作一个非常长的数组,可通过这个“数组”的索引(offset)去访问其数据。

一方面,由于不同Partition可位于不同机器,因此可以充分利用集群优势,实现机器间的并行处理。另一方面,由于Partition在物理上对应一个文件夹,即使多个Partition位于同一个节点,也可通过配置让同一节点上的不同Partition置于不同的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优势。

利用多磁盘的具体方法是,将不同磁盘mount到不同目录,然后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将所有Partition尽可能均匀分配到不同目录也即不同目录(也即不同disk)上。

注:虽然物理上最小单位是Segment,但Kafka并不提供同一Partition内不同Segment间的并行处理。因为对于写而言,每次只会写Partition内的一个Segment,而对于读而言,也只会顺序读取同一Partition内的不同Segment。

Read more »

ClickHouse 数据压缩与解压

ClickHouse 是一款真正面向列的DBMS,就是一款列式数据库,所以ClickHouse非常适合作为OLAP的数据查询引擎。通常列式数据库具有非常好的数据压缩效果,因为每列数据的数据类型一致,保存时会作为一个数组数据挨着保存,这样压缩算法具有非常好的压缩效果,在OLAP查询场景下可以有效的提高整个系统的吞吐量。

ClickHouse目前支持的数据压缩算法是lz4和zstd,其中zstd是实验性,默认情况下ClickHouse采用的是lz4压缩算法。压缩的主要配置项示例如下,通常情况下不会去更改这几个配置项,因为默认配置就可以让数据压缩效率非常高。

1
2
3
4
5
6
7
<compression incl="clickhouse_compression">
<case>
<min_part_size>10000000000</min_part_size>
<min_part_size_ratio>0.01</min_part_size_ratio>
<method>zstd</method>
</case>
</compression>

关于压缩算法的测试,见这篇文章。简而言之,LZ4在速度上会更快,但是压缩率较低,ZSTD正好相反。尽管ZSTD比LZ4慢,但是相比传统的压缩方式Zlib,无论是在压缩效率还是速度上,都可以作为Zlib的替代品。

压缩比

官方有提供星型模块基准测试的案例,clickhouse-in-a-general-analytical-workload-based-on-star-schema-benchmark 该基准测试案例lineorder数据表字段基本都是整形,该表lineorder原始数据有150亿条记录,原始数据总大小为1.7TB,导入到ClickHouse后lineorder数据表占用464GB,压缩比达到了3.7倍数。

目前我们的ClickHouse数据库中存储一些nginx原始日志信息raw_cdn_nginx_log_all,nginx原始访问日志以及维度扩展之后的日志数据每行大概会有100左右个字段信息,我们截取其中40多个有助于OLAP查询高频字段,数据表字段信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
(
timeStamp DateTime,
date Date,
province String,
isp String,
upstream_addr String,
hostname String,
machineIP String,
country String,
scheme String,
upstream_local_port String,
channel String,
node String,
cacheGroup String,
city String,
view String,
status String,
customer String,
nodeisp String,
app String,
http_host String,
upstream_status String,
deviceID String,
remote_addr String,
request_id String,
serviceGroupId String,
request String,
http_referer String,
serverType String,
conn_state String,
upstream_keepalive String,
http_user_agent String,
body_bytes_sent Int64,
client_rtt Float32,
ssl_handshake_time Float32,
response_time Float32,
first_byte_time Float32,
upstream_response_time String,
bytes_sent Int64,
download_time Float32,
half_rtt_time Float32,
request_time Float32
)

ClickHouse集群有4台机器,2019-06-14这一天集群的数据记录条数是2.4亿,那时候还不算业务爆发期,这2.4亿条的数据在clickhouse集群中总占用大小在12G左右,每台机器占用空间大小为3G左右。

sd

每行原始数据信息大概如下,每条JSON格式的数据记录大概是1000字节左右,2.4亿条JSON格式的输入数据大小为240G左右。如果是以CSV格式存储,则单条记录大小大概是380字节,2.4亿条CSV格式的输入数据大小为93G。

1
{"timeStamp":"2019-06-14 02:37:31","date":"2019-06-14","province":"GZ","isp":"CM","upstream_addr":"","hostname":"SR-CNCM-GZKWE-38-23","machineIP":"xxx","country":"CN","scheme":"https","upstream_local_port":"","channel":"xxx","node":"IDC-CNCM-GZKWE-Dnion","cacheGroup":"SG-CNCM-GZKWE-cacheOpt-01","city":"KWE","view":"CN_CM_XN_GZ","status":"200","customer":"meitu","nodeisp":"CM","app":"APP-WEB","http_host":"api.meipai.com","upstream_status":"200","deviceID":"9233d8bbfe37eed97679b6f768858d06","remote_addr":"223.104.96.19","request_id":"31f80ef11dc338493cf25c5334dcbfc8","serviceGroupId":"1152","request":"","http_referer":"","serverType":"0","conn_state":"","upstream_keepalive":"1","http_user_agent":"","body_bytes_sent":"8069","client_rtt":0.029,"ssl_handshake_time":0,"response_time":0.259,"first_byte_time":-0,"upstream_response_time":"0.258","bytes_sent":"8364","download_time":-0,"half_rtt_time":2685.709,"request_time":0.259}

总结:2.4亿条原始nginx日志,原始JSON格式数据大小240G,CSV格式的数据大小为93G,存储到ClickHouse后占用磁盘大小12G左右。不管以哪种方式存储,ClickHouse具体非常好的数据压缩比。目前线上是从kafka消费JSON格式数据入库到ClickHouse。如果数据表字段少一点或者是数据都存储在一台机器上,ClickHouse压缩比会更高。ClickHouse数据压缩这篇文章有测试过1亿条数据记录ES存储磁盘占用33GB,ClickHouse磁盘占用1.4GB。

ClickHouse解压缩

当我们在查询ClickHouse数据库数据时,如果采用perf工具对ClickHouse进程采样,会发现LZ_decompress_fast方法占用的CPU时间最多。

sd

ClickHouse数据以压缩的形式存储在本地磁盘中,当数据查询时ClickHouse为了减少CPU使用资源会尽量少做一些事情。在许多情况下,所有潜在的耗时计算都已经得到了很好的优化,而且用户编写了一个经过深思熟虑的查询,那么剩下要做的就是执行解压缩。

那么为什么LZ4解压缩成为一个瓶颈呢?LZ4看起来是一种非常轻的算法:数据解压缩速率通常是每个处理器内核1到3 GB/s,具体取决于数据。这比典型的磁盘子系统快得多。此外,我们使用所有可用的中央处理器内核,解压缩在所有物理内核之间线性扩展。

首先,如果数据压缩率很高,则磁盘上数据占用空间就很小,在读取数据时磁盘IO会比较低,但是如果待解压的数据量很大则会影响到CPU使用率。在LZ4的情况下,解压缩数据所需的工作量几乎与解压缩数据本身的量成正比;其次,如果数据被缓存,你可能根本不需要从磁盘读取数据。可以依赖页面缓存或使用自己的缓存。缓存在面向列的数据库中更有效,因为只有经常使用的列保留在缓存中。这就是为什么LZ4经常成为CPU负载的瓶颈。

在官方的这篇博客中How to speed up LZ4 decompression in ClickHouse,作者反馈有人希望ClickHoouse不要以数据压缩的方式存储数据,因为反馈者认为数据查询时是因为数据解压拖慢了整个查询进度,并且这个人还在github上提了一个PR,最后维护者觉得Ok. If you are not going to use this compression method, it's not worth to implement.

如果可以使用缓存,为何ClickHouse不把解压后的数据存在缓存中呢,这样可以减少很多数据解压的场景,ClickHouse也提供了cache配置项the cache of decompressed blocks。在博客中作者认为这种方式对内存是极大的浪费,并且只有在查询数据量很小的场景下是有用的。我个人也觉得如果缓存住解压后的数据,ClickHouse进程肯定会经常发生OOM。ClickHouse高效的数据压缩设计其实是一个很好的设计方案,首先可以减小磁盘的数据占用;其次在shard的replica副本个数超过1时,replica之间的数据同步也可以更高效。

目前在生产环境中,ClickHouse简单数据查询P99的时间还是在秒级返回,只有在复杂的数据查询场景下查询时间会增加到几秒,例如多个表join,其实在这个场景下更多的做法应该是优化SQL查询语句,尽量避免大量表join查询。

How to speed up LZ4 decompression in ClickHouse这篇官方博客中作者还提到了LZ4是如何工作的以及数据解压缩的优化手段。

压缩算法概览

压缩的理论(它与算法信息论密切相关)以及率有损理论,这个领域的研究工作主要是由美国学者克劳德·香农(Claude Elwood Shannon)奠定的,他在二十世纪四十年代末期及五十年代早期发表了这方面的基础性的论文。

Lempel-Ziv(LZ)压缩方法是最流行的无损存储算法之一。DEFLATE是LZ的一个变体,它针对解压速度与压缩率进行了优化,虽然它的压缩速度可能非常缓慢,PKZIPgzip以及PNG都在使用DEFLATE。LZW(Lempel-Ziv-Welch)是Unisys专利,直到2003年6月专利到期限,这种方法用于GIF图像。另外值得一提的是LZR (LZ-Renau) 方法,它是Zip方法的基础。LZ方法使用基于表格的压缩模型,其中表格中的条目用重复的数据串替换。对于大多数的LZ方法来说,这个表格是从最初的输入数据动态生成的。这个表格经常采用霍夫曼编码维护(例如SHRI、LZX)。 当前一个性能良好基于LZ的编码机制是LZX,它用于微软公司的CAB格式。

压缩算法分为两个层面:

  1. 熵编码:根据消息中每个符号出现的概率,然后通过某种映射用更短的符号替代原来的符号,核心在于提高符号的信息熵,哈夫曼编码最为典型。
  2. 字典编码:提取信息中的重复部分作为字典,然后通过字典和某种映射替代这些重复的部分,核心在于替代重复,LZ77和LZ78算法最为典型。

gzip

Gzip是若干种文件压缩程序的简称,通常指GNU计划的实现,gzip的基础是DEFLATE,DEFLATE是LZ77哈夫曼编码的一个组合体。Gzip编码格式在RFC 1952中定义。

Gzip亚搜文件格式如下为:

1
| ID1 | ID2 | CM | FLG | MTIME(4字节) | XFL | OS | ---> more

在Centos操作系统中空Gzip文件文件大小为26字节,用二进制查看工具查看文件内容:

1
2
3
0000000 8b1f 0808 c8fb 5d60 0300 6568 6c6c 006f
0000020 0003 0000 0000 0000 0000
0000032
  • 其中 ID1 和 ID2 分别是 0x1f 和 0x8b,用来标识文件格式是 gzip
  • CM 标识 加密算法,目前 0-7是保留字,8 指的是 deflate 算法
  • FLG标志位
  • MTIME 指的是源文件最近一次修改时间,存的是 Unix 时间戳
  • XFL defalte 算法中 2 表示使用压缩率最高的算法,4 表示使用压缩速度最快的算法
  • OS 标识压缩程序运行的文件系统,以处理 EOF 等的问题
  • more 后面是根据 FLG 的开启情况决定的,可能会有 循环冗余校验码、源文件长度、附加信息等多种其他信息
Read more »

ClickHouse 运营总结

概述

ClickHouse 是俄罗斯 Yandex 公司所开源的一款用于大数据实时分析的列式数据库管理系统,采用 C++ 编写,对于百亿级数据的查询聚合能达到秒级返回。

ClickHouse 的主要优点有:

  1. 为了高效的使用CPU,数据不仅仅按列存储,同时还按向量进行处理;
  2. 数据压缩空间大,减少io;处理单查询高吞吐量每台服务器每秒最多数十亿行;
  3. 索引非B树结构,不需要满足最左原则;只要过滤条件在索引列中包含即可;即使在使用的数据不在索引中,由于各种并行处理机制ClickHouse全表扫描的速度也很快;
  4. 写入速度非常快,50-200M/s,对于大量的数据更新非常适用。

而为了达到“快”的效果,ClickHouse 付出了如下的代价:

  1. 不支持事务,不支持真正的删除/更新;
  2. 不支持高并发,官方建议 QPS 为100,可以通过修改配置文件增加连接数,但是在服务器足够好的情况下;
  3. SQL 满足日常使用80%以上的语法,join 写法比较特殊;最新版已支持类似 SQL 的 join,但性能不好;
  4. 尽量做1000条以上批量的写入,避免逐行 insert 或小批量的 insert,update,delete 操作,因为 ClickHouse 底层会不断的做异步的数据合并,会影响查询性能,这个在做实时数据写入的时候要尽量避开;
  5. ClickHouse 快是因为采用了并行处理机制,即使一个查询,也会用服务器一半的cpu去执行,所以 ClickHouse 不能支持高并发的使用场景,默认单查询使用cpu核数为服务器核数的一半,安装时会自动识别服务器核数,可以通过配置文件修改该参数。
Read more »

原文地址

Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文旨在梳理出 Spark 内存管理的脉络,抛砖引玉,引出读者对这个话题的深入探讨。本文中阐述的原理基于 Spark 2.1 版本,阅读本文需要读者有一定的 Spark 和 Java 基础,了解 RDD、Shuffle、JVM 等相关概念。

在执行 Spark 的应用程序时,Spark 集群会启动 Driver 和 Executor 两种 JVM 进程,前者为主控进程,负责创建 Spark 上下文,提交 Spark 作业(Job),并将作业转化为计算任务(Task),在各个 Executor 进程间协调任务的调度,后者负责在工作节点上执行具体的计算任务,并将结果返回给 Driver,同时为需要持久化的 RDD 提供存储功能[1]。由于 Driver 的内存管理相对来说较为简单,本文主要对 Executor 的内存管理进行分析,下文中的 Spark 内存均特指 Executor 的内存。

1. 堆内和堆外内存规划

作为一个 JVM 进程,Executor 的内存管理建立在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用。

图 1 . 堆内和堆外内存示意图

img

1.1 堆内内存

堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory 或 spark.executor.memory 参数配置。Executor 内运行的并发任务共享 JVM 堆内内存,这些任务在缓存 RDD 数据和广播(Broadcast)数据时占用的内存被规划为存储(Storage)内存,而这些任务在执行 Shuffle 时占用的内存被规划为执行(Execution)内存,剩余的部分不做特殊规划,那些 Spark 内部的对象实例,或者用户定义的 Spark 应用程序中的对象实例,均占用剩余的空间。不同的管理模式下,这三部分占用的空间大小各不相同(下面第 2 小节会进行介绍)。

Spark 对堆内内存的管理是一种逻辑上的”规划式”的管理,因为对象实例占用内存的申请和释放都由 JVM 完成,Spark 只能在申请后和释放前记录这些内存,我们来看其具体流程:

  • 申请内存
  1. Spark 在代码中 new 一个对象实例
  2. JVM 从堆内内存分配空间,创建对象并返回对象引用
  3. Spark 保存该对象的引用,记录该对象占用的内存
  • 释放内存
  1. Spark 记录该对象释放的内存,删除该对象的引用
  2. 等待 JVM 的垃圾回收机制释放该对象占用的堆内内存

我们知道,JVM 的对象可以以序列化的方式存储,序列化的过程是将对象转换为二进制字节流,本质上可以理解为将非连续空间的链式存储转化为连续空间或块存储,在访问时则需要进行序列化的逆过程——反序列化,将字节流转化为对象,序列化的方式可以节省存储空间,但增加了存储和读取时候的计算开销。

对于 Spark 中序列化的对象,由于是字节流的形式,其占用的内存大小可直接计算,而对于非序列化的对象,其占用的内存是通过周期性地采样近似估算而得,即并不是每次新增的数据项都会计算一次占用的内存大小,这种方法降低了时间开销但是有可能误差较大,导致某一时刻的实际内存有可能远远超出预期[2]。此外,在被 Spark 标记为释放的对象实例,很有可能在实际上并没有被 JVM 回收,导致实际可用的内存小于 Spark 记录的可用内存。所以 Spark 并不能准确记录实际可用的堆内内存,从而也就无法完全避免内存溢出(OOM, Out of Memory)的异常。

虽然不能精准控制堆内内存的申请和释放,但 Spark 通过对存储内存和执行内存各自独立的规划管理,可以决定是否要在存储内存里缓存新的 RDD,以及是否为新的任务分配执行内存,在一定程度上可以提升内存的利用率,减少异常的出现。

1.2 堆外内存

为了进一步优化内存的使用以及提高 Shuffle 时排序的效率,Spark 引入了堆外(Off-heap)内存,使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据。利用 JDK Unsafe API(从 Spark 2.0 开始,在管理堆外的存储内存时不再基于 Tachyon,而是与堆外的执行内存一样,基于 JDK Unsafe API 实现[3]),Spark 可以直接操作系统堆外内存,减少了不必要的内存开销,以及频繁的 GC 扫描和回收,提升了处理性能。堆外内存可以被精确地申请和释放,而且序列化的数据占用的空间可以被精确计算,所以相比堆内内存来说降低了管理的难度,也降低了误差。

在默认情况下堆外内存并不启用,可通过配置 spark.memory.offHeap.enabled 参数启用,并由 spark.memory.offHeap.size 参数设定堆外空间的大小。除了没有 other 空间,堆外内存与堆内内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

1.3 内存管理接口

Spark 为存储内存和执行内存的管理提供了统一的接口——MemoryManager,同一个 Executor 内的任务都调用这个接口的方法来申请或释放内存:

清单 1 . 内存管理接口的主要方法

1
//申请存储内存``def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean``//申请展开内存``def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean``//申请执行内存``def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long``//释放存储内存``def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit``//释放执行内存``def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit``//释放展开内存``def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit

我们看到,在调用这些方法时都需要指定其内存模式(MemoryMode),这个参数决定了是在堆内还是堆外完成这次操作。

MemoryManager 的具体实现上,Spark 1.6 之后默认为统一管理(Unified Memory Manager)方式,1.6 之前采用的静态管理(Static Memory Manager)方式仍被保留,可通过配置 spark.memory.useLegacyMode 参数启用。两种方式的区别在于对空间分配的方式,下面的第 2 小节会分别对这两种方式进行介绍。

Read more »

ClickHouse zk依赖优化

ClickHouse 集群严重依赖zookeeper,clickhouse集群服务会在zk上存储大量的信息,例如数据库表metadata、blocksr、replicas等信息,znode节点和数据量/数据表呈线性相关。

ClickHouse把zk当做三种服务的结合,协调服务,日志服务、数据表catalog service。同时clickhouse在启动时还会对存在本地表的schema信息和存在zk上的schema信息做校验,如果两者存在差异则启动异常。

当前我们没有能力针对clickhouse做源码定制化,只能通过一些优化手段,让zk不至于成为整个系统的瓶颈。

第一原则是clickhouse依赖的zk不要和其他组件公用,其次是zk的各种参数调优。线上我们有个ClickHouse集群依赖的zk和其他组件公用,并且该zk还数据还存在机械盘上,严重影响到了ClickHouse集群的性能。ClickHouse从老的公用zookeeper中迁移到新的zookeeper的一些流程:

  1. 新部署一个zookeeper组件,把zk组件的dataDir和dataLogDir存放固盘,如果条件允许的话这两个目录最好不要存放在同一个固盘;

  2. 从老Zookeeper中获取最新的snapshot,并且传输到新zookeeper中myid最大的三台机器上(假设zk节点为5);

  3. 增大新zookeeper的syncLimit和initLimit配置项,避免zookeeper在leader选举时同步snapshot超时,导致leader选举失败,因为clickhouse在zookeeper上创建的节点很多,并且snapshot文件挺大;

  4. 新zookeeper部署完成后,检测clickhouse在zk上的znode节点是否和老节点一致;

  5. 更改clickhouse配置文件中zk相关的设置,涉及到的主要配置文件:

    /etc/clickhouse-server/metrika.xml

    /etc/clickhouse-server-wingman/metrika.xml

    /etc/clickhouse-server/config-preprocessed.xml

  6. 停止入库到clickhouse的程序,例如logkit、zabbix to clickhouse程序

  7. 停止clickhouse各个分片服务

    service clickhouse-server stop

    service clickhouse-server-wingman stop

其次是zookeeper参数调优,可以参考官方文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# http://hadoop.apache.org/zookeeper/docs/current/zookeeperAdmin.html

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=30000
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=10

maxClientCnxns=2000

maxSessionTimeout=60000000
# the directory where the snapshot is stored.
dataDir=/opt/zookeeper/{{ cluster['name'] }}/data
# Place the dataLogDir to a separate physical disc for better performance
dataLogDir=/opt/zookeeper/{{ cluster['name'] }}/logs

autopurge.snapRetainCount=10
autopurge.purgeInterval=1


# To avoid seeks ZooKeeper allocates space in the transaction log file in
# blocks of preAllocSize kilobytes. The default block size is 64M. One reason
# for changing the size of the blocks is to reduce the block size if snapshots
# are taken more often. (Also, see snapCount).
preAllocSize=131072

# Clients can submit requests faster than ZooKeeper can process them,
# especially if there are a lot of clients. To prevent ZooKeeper from running
# out of memory due to queued requests, ZooKeeper will throttle clients so that
# there is no more than globalOutstandingLimit outstanding requests in the
# system. The default limit is 1,000.ZooKeeper logs transactions to a
# transaction log. After snapCount transactions are written to a log file a
# snapshot is started and a new transaction log file is started. The default
# snapCount is 10,000.
snapCount=3000000

# If this option is defined, requests will be will logged to a trace file named
# traceFile.year.month.day.
#traceFile=

# Leader accepts client connections. Default value is "yes". The leader machine
# coordinates updates. For higher update throughput at thes slight expense of
# read throughput the leader can be configured to not accept clients and focus
# on coordination.
leaderServes=yes

standaloneEnabled=false
dynamicConfigFile=/etc/zookeeper-{{ cluster['name'] }}/conf/zoo.cfg.dynamic

Java虚拟机参数调优:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
NAME=zookeeper-{{ cluster['name'] }}
ZOOCFGDIR=/etc/$NAME/conf

# TODO this is really ugly
# How to find out, which jars are needed?
# seems, that log4j requires the log4j.properties file to be in the classpath
CLASSPATH="$ZOOCFGDIR:/usr/build/classes:/usr/build/lib/*.jar:/usr/share/zookeeper/zookeeper-3.5.1-metrika.jar:/usr/share/zookeeper/slf4j-log4j12-1.7.5.jar:/usr/share/zookeeper/slf4j-api-1.7.5.jar:/usr/share/zookeeper/servlet-api-2.5-20081211.jar:/usr/share/zookeeper/netty-3.7.0.Final.jar:/usr/share/zookeeper/log4j-1.2.16.jar:/usr/share/zookeeper/jline-2.11.jar:/usr/share/zookeeper/jetty-util-6.1.26.jar:/usr/share/zookeeper/jetty-6.1.26.jar:/usr/share/zookeeper/javacc.jar:/usr/share/zookeeper/jackson-mapper-asl-1.9.11.jar:/usr/share/zookeeper/jackson-core-asl-1.9.11.jar:/usr/share/zookeeper/commons-cli-1.2.jar:/usr/src/java/lib/*.jar:/usr/etc/zookeeper"

ZOOCFG="$ZOOCFGDIR/zoo.cfg"
ZOO_LOG_DIR=/var/log/$NAME
USER=zookeeper
GROUP=zookeeper
PIDDIR=/var/run/$NAME
PIDFILE=$PIDDIR/$NAME.pid
SCRIPTNAME=/etc/init.d/$NAME
JAVA=/usr/bin/java
ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
ZOO_LOG4J_PROP="INFO,ROLLINGFILE"
JMXLOCALONLY=false
JAVA_OPTS="-Xms{{ cluster.get('xms','128M') }} \
-Xmx{{ cluster.get('xmx','1G') }} \
-Xloggc:/var/log/$NAME/zookeeper-gc.log \
-XX:+UseGCLogFileRotation \
-XX:NumberOfGCLogFiles=16 \
-XX:GCLogFileSize=16M \
-verbose:gc \
-XX:+PrintGCTimeStamps \
-XX:+PrintGCDateStamps \
-XX:+PrintGCDetails
-XX:+PrintTenuringDistribution \
-XX:+PrintGCApplicationStoppedTime \
-XX:+PrintGCApplicationConcurrentTime \
-XX:+PrintSafepointStatistics \
-XX:+UseParNewGC \
-XX:+UseConcMarkSweepGC \
-XX:+CMSParallelRemarkEnabled"

YARN Container 内存控制

YARN 2.6 版本中对于Container内存控制策略比较单一,程序中有一个监控线程不停地检测各个Container的内存使用情况,只要物理内存或者虚拟内存在超过阈值之后,就会kill该container。有两个配置项与此相关,分别是yarn.nodemanager.pmem-check-enabledyarn.nodemanager.vmem-check-enabled,在默认情况下,这两个配置项都为true。

YARN 3.2版本中提供更为精细的三种内存控制策略,主要分为三种:

  1. 监控线程定时轮询各个container的内存占用情况,如果超过限制则kill container
  2. 使用linux cgroup内核的OOM killer机制,严格控制container内存
  3. 弹性内存控制策略,只有当整个系统内存超过限制后才会kill container

第1种内存控制策略比较好理解,就是开一个监控线程不停地监控container的使用情况,遇到阈值超过控制才会kill container,这种监控方式是在应用程序级别进行检测,有一定的延迟性。第2种和第3种内存控制策略使用了linux内核的OOM killer机制,当整个系统内存不足时,内核会选出score分数最高的进程,然后kill,区别在于,前者是严格控制,后者是弹性控制,严格控制指的是只要container内存超过阈值就Kill,后者是只要container的内存使用没有超过系统可使用的内存,则不会被kill。

弹性内存控制策略配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
<property>
<name>yarn.nodemanager.container-executor.class</name>
<value>org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.runtime.linux.allowed-runtimes</name>
<value>default</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-pmem-ratio</name>
<value>3.5</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory.enforced</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.elastic-memory-control.enabled</name>
<value>true</value>
</property>

具体配置项参考:官方文档

ClickHouse采用分片 + 副本实现高可用集群,下面以4台服务器为例子,讲述设计一个高可用的分布式集群需要注意的几个地方。

建立可横向扩展的数据服务集群,分片技术通常是必须的,MongoDB提供了原生的数据库分片技术,减小MySQL分库分表带来的麻烦。ClickHouse同样支持分片技术,假设4台机器,每台机器安装一个CH的示例,则表示有4个分片,每个分片的副本设置为1,对于这种集群设置不存在高可用可言,因为如果有一台机器挂了则整个ClickHouse集群不可用。

只有分片 + 副本才可实现高可用集群,可根据集群机器资源情况设置单分片副本数量,本次采用4台机器服务集群搭建,4个分片,每个分片两个副本。需要在每台机器上开启两个ClickHouse示例,注意不同示例需要设置不同的数据目录以及绑定不同的端口。

Read more »

ClickHouse系统架构概述

ClickHouse独特功能

真正的列式数据库管理系统

在一个真正的列式数据库管理系统中,除了数据本身外不应该存在其他额外的数据。这意味着为了避免在值旁边存储它们的长度“number”,你必须支持固定长度数值类型。例如,10亿个UInt8类型的数据在未压缩的情况下大约消耗1GB左右的空间,如果不是这样的话,这将对CPU的使用产生强烈影响。即使是在未压缩的情况下,紧凑的存储数据也是非常重要的,因为解压缩的速度主要取决于未压缩数据的大小。

这是非常值得注意的,因为在一些其他系统中也可以将不同的列分别进行存储,但由于对其他场景进行的优化,使其无法有效的处理分析查询。例如: HBase,BigTable,Cassandra,HyperTable。在这些系统中,你可以得到每秒数十万的吞吐能力,但是无法得到每秒几亿行的吞吐能力。

需要说明的是,ClickHouse不单单是一个数据库, 它是一个数据库管理系统。因为它允许在运行时创建表和数据库、加载数据和运行查询,而无需重新配置或重启服务。

Read more »

Raft 一致性算法论文译文

Raft论文翻译转载自Raft 一致性算法论文译文

摘要

Raft 是一种用来管理日志复制的一致性算法。它和 Paxos 的性能和功能是一样的,但是它和 Paxos 的结构不一样;这使得 Raft 更容易理解并且更易于建立实际的系统。为了提高理解性,Raft 将一致性算法分为了几个部分,例如领导选取(leader selection),日志复制(log replication)和安全性(safety),同时它使用了更强的一致性来减少了必须需要考虑的状态。从用户学习的结果来看,Raft 比 Paxos 更容易学会。Raft 还包括了一种新的机制来使得动态改变集群成员,它使用重叠大多数(overlapping majorities)来保证安全。

1 引言

一致性算法允许一组机器像一个整体一样工作,即使其中的一些机器出了错误也能正常工作。正因为此,他们扮演着建立大规模可靠的软件系统的关键角色。在过去的十年中 Paxos 一直都主导着有关一致性算法的讨论:大多数一致性算法的实现都基于它或者受它影响,并且 Paxos 也成为了教学生关于一致性知识的主要工具。

不幸的是,尽管在降低它的复杂性方面做了许多努力,Paxos 依旧很难理解。并且,Paxos 需要经过复杂的修改才能应用于实际中。这些导致了系统构构建者和学生都十分头疼。

在被 Paxos 折磨之后,我们开始寻找一种在系统构建和教学上更好的新的一致性算法。我们的首要目标是让它易于理解:我们能不能定义一种面向实际系统的一致性算法并且比 Paxos 更容易学习呢?并且,我们希望这种算法能凭直觉就能明白,这对于一个系统构建者来说是十分必要的。对于一个算法,不仅仅是让它工作起来很重要,知道它是如何工作的更重要。

我们工作的结果是一种新的一致性算法,叫做 Raft。在设计 Raft 的过程中我们应用了许多专门的技巧来提升理解性,包括算法分解(分为领导选取(leader selection),日志复制(log replication)和安全性(safety))和减少状态(state space reduction)(相对于 Paxos,Raft 减少了非确定性的程度和服务器互相不一致的方式)。在两所学校的43个学生的研究中发现,Raft 比 Paxos 要更容易理解:在学习了两种算法之后,其中的33个学生回答 Raft 的问题要比回答 Paxos 的问题要好。

Raft 算法和现在一些已经有的算法在一些地方很相似(主要是 Oki 和 Liskov 的 Viewstamped Replication。但是 Raft 有几个新的特性:

  • 强领导者(Strong Leader):Raft 使用一种比其他算法更强的领导形式。例如,日志条目只从领导者发送向其他服务器。这样就简化了对日志复制的管理,使得 Raft 更易于理解。
  • 领导选取(Leader Selection):Raft 使用随机定时器来选取领导者。这种方式仅仅是在所有算法都需要实现的心跳机制上增加了一点变化,它使得在解决冲突时更简单和快速。
  • 成员变化(Membership Change):Raft 为了调整集群中成员关系使用了新的联合一致性(joint consensus)的方法,这种方法中大多数不同配置的机器在转换关系的时候会交迭(overlap)。这使得在配置改变的时候,集群能够继续操作。

我们认为,Raft 在教学方面和实际实现方面比 Paxos 和其他算法更出众。它比其他算法更简单、更容易理解;它能满足一个实际系统的需求;它拥有许多开源的实现并且被许多公司所使用;它的安全特性已经被证明;并且它的效率和其他算法相比也具有竞争力。

这篇论文剩下的部分会讲如下内容:复制状态机(replicated state machine)问题(第2节),讨论 Paxos 的优缺点(第3节),讨论我们用的为了达到提升理解性的方法(第4节),陈述 Raft 一致性算法(第5~8节),评价 Raft 算法(第9节),对相关工作的讨论(第10节)。

Read more »