2.软文推荐
3.软文推荐
目录: 1、2019数据架构选型必读:1月数据库产品技术解析 2、Kafka架构及基本原理简析 3、python怎么安装acl包 4、腾讯云Logstash实战4-使用Logstash消费kafka数据并写入到Elasticsearch 5、腾讯云CKafka压测踩坑记录 6、什么是kafka 2019数据架构选型必读:1月数据库产品技术解析本期目录
DB-Engines数据库排行榜
新闻快讯
一、RDBMS家族
二、NoSQL家族
三、NewSQL家族
四、时间序列
五、大数据生态圈
六、国产数据库概览
七、云数据库
八、推出dbaplus Newsletter的想法
九、感谢名单
为方便阅读、重点呈现,本期Newsletter(2019年1月)将对各个板块的内容进行精简。需要阅读全文的同学可点击文末 【阅读原文】 或登录
进行下载。
DB-Engines数据库排行榜
以下取自2019年1月的数据,具体信息可以参考,数据仅供参考。
DB-Engines排名的数据依据5个不同的因素:
新闻快讯
1、2018年9月24日,微软公布了SQL Server2019预览版,SQL Server 2019将结合Spark创建统一数据平台。
2、2018年10月5日,ElasticSearch在美国纽约证券交易所上市。
3、亚马逊放弃甲骨文数据库软件,导致最大仓库之一在黄金时段宕机。受此消息影响,亚马逊盘前股价小幅跳水,跌超2%。
4、2018年10月31日,Percona发布了Percona Server 8.0 RC版本,发布对MongoDB 4.0的支持,发布对XtraBackup测试第二个版本。
5、2018年10月31日,Gartner陆续发布了2018年的数据库系列报告,包括《数据库魔力象限》、《数据库核心能力》以及《数据库推荐报告》。
今年的总上榜数据库产品达到了5家,分别来自:阿里云,华为,巨杉数据库,腾讯云,星环 科技 。其中阿里云和巨杉数据库已经连续两年入选。
6、2018年11月初,Neo4j宣布完成E轮8000万美元融资。11月15日,Neo4j宣布企业版彻底闭源:
7、2019年1月8日,阿里巴巴以1.033亿美元(9000万欧元)的价格收购了Apache Flink商业公司DataArtisans。
8、2019年1月11日早间消息,亚马逊宣布推出云数据库软件,亚马逊和MongoDB将会直接竞争。
RDBMS家族
Oracle 发布18.3版本
2018年7月,Oracle Database 18.3通用版开始提供下载。我们可以将Oracle Database 18c视为采用之前发布模式的Oracle Database 12c第2版的第一个补丁集。未来,客户将不再需要等待多年才能用上最新版Oracle数据库,而是每年都可以期待新数据库特性和增强。Database 19c将于2019年Q1率先在Oracle cloud上发布云版本。
Oracle Database 18c及19c部分关键功能:
1、性能
2、多租户,大量功能增强及改进,大幅节省成本和提高敏捷性
3、高可用
4、数据仓库和大数据
MySQL发布8.0.13版本
1、账户管理
经过配置,修改密码时,必须带上原密码。在之前的版本,用户登录之后,就可以修改自己的密码。这种方式存在一定安全风险。比如用户登录上数据库后,中途离开一段时间,那么非法用户可能会修改密码。由参数password_require_current控制。
2、配置
Innodb表必须有主键。在用户没有指定主键时,系统会生成一个默认的主键。但是在主从复制的场景下,默认的主键,会对丛库应用速度带来致命的影响。如果设置sql_require_primary_key,那么数据库会强制用户在创建表、修改表时,加上主键。
3、字段默认值
BLOB、TEXT、GEOMETRY和JSON字段可以指定默认值了。
4、优化器
1)Skip Scan
非前缀索引也可以用了。
之前的版本,任何没有带上f1字段的查询,都没法使用索引。在新的版本中,它可以忽略前面的字段,让这个查询使用到索引。其实现原理就是把(f1 = 1 AND f2 40) 和(f1 = 2 AND f2 40)的查询结果合并。
2)函数索引
之前版本只能基于某个列或者多个列加索引,但是不允许在上面做计算,如今这个限制消除了。
5、SQL语法
GROUP BY ASC和GROUP BY DESC语法已经被废弃,要想达到类似的效果,请使用GROUP BY ORDER BY ASC和GROUP BY ORDER BY DESC。
6、功能变化
1)设置用户变量,请使用SET语句
如下类型语句将要被废弃SELECT @var, @var:=@var+1。
2)新增innodb_fsync_threshold
该变量是控制文件刷新到磁盘的速率,防止磁盘在短时间内饱和。
3)新增会话级临时表空间
在以往的版本中,当执行SQL时,产生的临时表都在全局表空间ibtmp1中,及时执行结束,临时表被释放,空间不会被回收。新版本中,会为session从临时表空间池中分配一个临时表空间,当连接断开时,临时表空间的磁盘空间被回收。
4)在线切换Group Replication的状态
5)新增了group_replication_member_expel_timeout
之前,如果某个节点被怀疑有问题,在5秒检测期结束之后,那么就直接被驱逐出这个集群。即使该节点恢复正常时,也不会再被加入集群。那么,瞬时的故障,会把某些节点驱逐出集群。
group_replication_member_expel_timeout让管理员能更好的依据自身的场景,做出最合适的配置(建议配置时间小于一个小时)。
MariaDB 10.3版本功能展示
1、MariaDB 10.3支持update多表ORDER BY and LIMIT
1)update连表更新,limit语句
update t1 join t2 on t1.id=t2.id set t1.name='hechunyang' limit 3;
MySQL 8.0直接报错
MariaDB 10.3更新成功
2)update连表更新,ORDER BY and LIMIT语句
update t1 join t2 on t1.id=t2.id set t1.name='HEchunyang' order by t1.id DESC limit 3;
MySQL 8.0直接报错
MariaDB 10.3更新成功
参考:
2、MariaDB10.3增补AliSQL补丁——安全执行Online DDL
Online DDL从名字上看很容易误导新手,以为不论什么情况,修改表结构都不会锁表,理想很丰满,现实很骨感,注意这个坑!
有以下两种情况执行DDL操作会锁表的,Waiting for table metadata lock(元数据表锁):
针对第二种情况,MariaDB10.3增补AliSQL补丁-DDL FAST FAIL,让其DDL操作快速失败。
例:
如果线上有某个慢SQL对该表进行操作,可以使用WAIT n(以秒为单位设置等待)或NOWAIT在语句中显式设置锁等待超时,在这种情况下,如果无法获取锁,语句将立即失败。 WAIT 0相当于NOWAIT。
参考:
3、MariaDB Window Functions窗口函数分组取TOP N记录
窗口函数在MariaDB10.2版本里实现,其简化了复杂SQL的撰写,提高了可读性。
参考:
Percona Server发布8.0 GA版本
2018年12月21日,Percona发布了Percona Server 8.0 GA版本。
在支持MySQL8.0社区的基础版上,Percona Server for MySQL 8.0版本中带来了许多新功能:
1、安全性和合规性
2、性能和可扩展性
3、可观察性和可用性
Percona Server for MySQL 8.0中将要被废用功能:
Percona Server for MySQL 8.0中删除的功能:
RocksDB发布V5.17.2版本
2018年10月24日,RocksDB发布V5.17.2版本。
RocksDB是Facebook在LevelDB基础上用C++写的高效内嵌式K/V存储引擎。相比LevelDB,RocksDB提供了Column-Family,TTL,Transaction,Merge等方面的支持。目前MyRocks,TiKV等底层的存储都是基于RocksDB来构建。
PostgreSQL发布11版本
2018年10月18日,PostgreSQL 11发布。
1、PostgreSQL 11的重大增强
2、PostgreSQL 插件动态
1)分布式插件citus发布 8.1
citus是PostgreSQL的一款sharding插件,目前国内苏宁、铁总、探探有较大量使用案例。
2)地理信息插件postgis发布2.5.1
PostGIS是专业的时空数据库插件,在测绘、航天、气象、地震、国土资源、地图等时空专业领域应用广泛。同时在互联网行业也得到了对GIS有性能、功能深度要求的客户青睐,比如共享出行、外卖等客户。
3)时序插件timescale发布1.1.1
timescale是PostgreSQL的一款时序数据库插件,在IoT行业中有非常好的应用。github star数目前有5000多,是一个非常火爆的插件。
4)流计算插件 pipelinedb 正式插件化
Pipelinedb是PostgreSQL的一款流计算插件,使用这个创建可以对高速写入的数据进行实时根据定义的聚合规则进行聚合(支持概率计算),实时根据定义的规则触发事件(支持事件处理函数的自定义)。可用于IoT,监控,FEED实时计算等场景。
3、PostgreSQL衍生开源产品动态
1)agensgraph发布 2.0.0版本
agensgraph是兼容PostgreSQL、opencypher的专业图数据库,适合图式关系的管理。
2)gpdb发布5.15
gpdb是兼容PostgreSQL的mpp数据库,适合OLAP场景。近两年,gpdb一直在追赶PostgreSQL的社区版本,预计很快会追上10的PostgreSQL,在TP方面的性能也会得到显著提升。
3)antdb发布3.2
antdb是以Postgres-XC为基础开发的一款PostgreSQL sharding数据库,亚信主导开发,开源,目前主要服务于亚信自有客户。
4)迁移工具MTK发布52版本
MTK是EDB提供的可以将Oracle、PostgreSQL、MySQL、MSSQL、Sybase数据库迁移到PostgreSQL, PPAS的产品,迁移速度可以达到100万行/s以上。
DB2发布 11.1.4.4版本
DB2最新发布Mod Pack 4 and Fix Pack 4,包含以下几方面的改动及增强:
1、性能
2、高可用
3、管理视图
4、应用开发方面
5、联邦功能
6、pureScale
NoSQL家族
Redis发布5.0.3版本
MongoDB升级更新MongoDB Mobile和MongoDB Stitch
2018年11月21日,MongoDB升级更新MongoDB Mobile和MongoDB Stitch,助力开发人员提升工作效率。
MongoDB 公司日前发布了多项新产品功能,旨在更好地帮助开发人员在世界各地管理数据。通过利用存储在移动设备和后台数据库的数据之间的实时、自动的同步特性,MongoDB Mobile通用版本助力开发人员构建更快捷、反应更迅速的应用程序。此前,这只能通过在移动应用内部安装一个可供选择或限定功能的数据库来实现。
MongoDB Mobile在为客户提供随处运行的自由度方面更进了一步。用户在iOS和安卓终端设备上可拥有MongoDB所有功能,将网络边界扩展到其物联网资产范畴。应用系统还可以使用MongoDB Stitch的软件开发包访问移动客户端或后台数据,帮助开发人员通过他们希望的任意方式查询移动终端数据和物联网数据,包括本地读写、本地JSON存储、索引和聚合。通过Stitch移动同步功能(现可提供beta版),用户可以自动对保存在本地的数据以及后台数据库的数据进行同步。
本期新秀:Cassandra发布3.11.3版本
2018年8月11日,Cassandra发布正式版3.11.3。
Apache Cassandra是一款开源分布式NoSQL数据库系统,使用了基于Google BigTable的数据模型,与面向行(row)的传统关系型数据库或键值存储key-value数据库不同,Cassandra使用的是宽列存储模型(Wide Column Stores)。与BigTable和其模仿者HBase不同,数据并不存储在分布式文件系统如GFS或HDFS中,而是直接存于本地。
Cassandra的系统架构与Amazon DynamoDB类似,是基于一致性哈希的完全P2P架构,每行数据通过哈希来决定应该存在哪个或哪些节点中。集群没有master的概念,所有节点都是同样的角色,彻底避免了整个系统的单点问题导致的不稳定性,集群间的状态同步通过Gossip协议来进行P2P的通信。
3.11.3版本的一些bug fix和改进:
NewSQL家族
TiDB 发布2.1.2版本
2018 年 12 月 22 日,TiDB 发布 2.1.2 版,TiDB-Ansible 相应发布 2.1.2 版本。该版本在 2.1.1 版的基础上,对系统兼容性、稳定性做出了改进。
TiDB 是一款定位于在线事务处理/在线分析处理( HTAP: Hybrid Transactional/Analytical Processing)的融合型数据库产品。除了底层的 RocksDB 存储引擎之外,分布式SQL层、分布式KV存储引擎(TiKV)完全自主设计和研发。
TiDB 完全开源,兼容MySQL协议和语法,可以简单理解为一个可以无限水平扩展的MySQL,并且提供分布式事务、跨节点 JOIN、吞吐和存储容量水平扩展、故障自恢复、高可用等优异的特性;对业务没有任何侵入性,简化开发,利于维护和平滑迁移。
TiDB:
PD:
TiKV:
Tools:
1)TiDB-Lightning
2)TiDB-Binlog
EsgynDB发布R2.5版本
2018年12月22日,EsgynDB R2.5版本正式发布。
作为企业级产品,EsgynDB 2.5向前迈进了一大步,它拥有以下功能和改进:
CockroachDB发布2.1版本
2018年10月30日,CockroachDB正式发布2.1版本,其新增特性如下:
新增企业级特性:
新增SQL特性:
新增内核特性:
Admin UI增强:
时间序列
本期新秀:TimescaleDB发布1.0版本
10月底,TimescaleDB 1.0宣布正式推出,官方表示该版本已可用于生产环境,支持完整SQL和扩展。
TimescaleDB是基于PostgreSQL数据库开发的一款时序数据库,以插件化的形式打包提供,随着PostgreSQL的版本升级而升级,不会因为另立分支带来麻烦。
TimescaleDB架构:
数据自动按时间和空间分片(chunk)
更新亮点:
大数据生态圈
Hadoop发布2.9.2版本
2018年11月中旬,Hadoop在2.9分支上发布了新的2.9.2版本,该版本进行了204个大大小小的变更,主要变更如下:
Greenplum 发布5.15版本
Greenplum最新的5.15版本中发布了流式数据加载工具。
该版本中的Greenplum Streem Server组件已经集成了Kafka流式加载功能,并通过了Confluent官方的集成认证,其支持的主要功能如下:
国产数据库概览
K-DB发布数据库一体机版
2018年11月7日,K-DB发布了数据库一体机版。该版本更新情况如下:
OceanBase迁移服务发布1.0版本
1月4日,OceanBase 正式发布OMS迁移服务1.0版本。
以下内容包含 OceanBase 迁移服务的重要特性和功能:
SequoiaDB发布3.0.1新版本
1、架构
1)完整计算存储分离架构,兼容MySQL协议、语法
计算存储分离体系以松耦合的方式将计算与存储层分别部署,通过标准接口或插件对各个模块和组件进行无缝替换,在计算层与存储层均可实现自由的弹性伸缩。
SequoiaDB巨杉数据库“计算-存储分离”架构详细示意
用户可以根据自身业务特征选择面向交易的SQL解析器(例如MySQL或PGSQL)或面向统计分析的执行引擎(例如SparkSQL)。众所周知,使用不同的SQL优化与执行方式,数据库的访问性能可能会存在上千上万倍的差距。计算存储分离的核心思想便是在数据存储层面进行一体化存储,在计算层面则利用每种执行引擎的特点针对不同业务场景进行选择和优化,用户可以在存储层进行逻辑与物理的隔离,将面向高频交易的前端业务与面向高吞吐量的统计分析使用不同的硬件进行存储,确保在多类型数据访问时互不干扰,以真正达到生产环境可用的多租户与HTAP能力。
2、其他更新信息
1)接口变更:
2)主要特性:
云数据库
本期新秀:腾讯发布数据库CynosDB,开启公测
1、News
1)腾讯云数据库MySQL2018年重大更新:
2)腾讯云数据库MongoDB2018年重大更新:
3)腾讯云数据库Redis/CKV+2018年重大更新:
4)腾讯云数据库CTSDB2018年重大更新:
2、Redis 4.0集群版商业化上线
2018年10月,腾讯云数据库Redis 4.0集群版完成邀测、公测、商业化三个迭代,在广州、上海、北京正式全量商业化上线。
产品特性:
使用场景:
官网文档:
3、腾讯自研数据库CynosDB发布,开启公测
2018年11月22日,腾讯云召开新一代自研数据库CynosDB发布会,业界第一款全面兼容市面上两大最主流的开源数据库MySQL和PostgreSQL的高性能企业级分布式云数据库。
本期新秀:京东云DRDS发布1.0版本
12月24日,京东云分布式关系型数据库DRDS正式发布1.0版本。
DRDS是京东云精心自研的数据库中间件产品,获得了2018年 ”可信云技术创新奖”。DRDS可实现海量数据下的自动分库分表,具有高性能,分布式,弹性升级,兼容MySQL等优点,适用于高并发、大规模数据的在线交易, 历史 数据查询,自动数据分片等业务场景,历经多次618,双十一的考验,已经在京东集团内大规模使用。
京东云DRDS产品有以下主要特性
1)自动分库分表
通过简单的定义即可自动实现分库分表,将数据实际存放在多个MySQL实例的数据库中,但呈现给应用程序的依旧是一张表,对业务透明,应用程序几乎无需改动,实现了对数据库存储和处理能力的水平扩展。
2)分布式架构
基于分布式架构的集群方案,多个对等节点同时对外提供服务,不但可有效规避服务的单点故障,而且更加容易扩展。
3)超强性能
具有极高的处理能力,双节点即可支持数万QPS,满足用户超大规模处理能力的需求。
4)兼容MySQL
兼容绝大部分MySQL语法,包括MySQL语法、数据类型、索引、常用函数、排序、关联等DDL,DML语句,使用成本低。
参考链接:
RadonDB发布1.0.3版本
2018年12月26日,MyNewSQL领域的RadonDB云数据库发布1.0.3版本。
推出dbaplus Newsletter的想法
dbaplus Newsletter旨在向广大技术爱好者提供数据库行业的最新技术发展趋势,为社区的技术发展提供一个统一的发声平台。为此,我们策划了RDBMS、NoSQL、NewSQL、时间序列、大数据生态圈、国产数据库、云数据库等几个版块。
我们不以商业宣传为目的,不接受任何商业广告宣传,严格审查信息源的可信度和准确性,力争为大家提供一个纯净的技术学习环境,欢迎大家监督指正。
至于Newsletter发布的周期,目前计划是每三个月左右会做一次跟进, 下期计划时间是2019年4月14日~4月25日, 如果有相关的信息提供请发送至邮箱:newsletter@dbaplus.cn
感谢名单
最后要感谢那些提供宝贵信息和建议的专家朋友,排名不分先后。
往期回顾:
↓↓别忘了点这里下载 2019年1月 完整版Newsletter 哦~
Kafka架构及基本原理简析Kafka是一个由Scala和Java编写的企业级的消息发布和订阅系统,最早是由Linkedin公司开发,最终开源到Apache软件基金会的项目。Kafka是一个分布式的,支持分区的,多副本的和多订阅者的高吞吐量的消息系统,被广泛应用在应用解耦、异步处理、限流削峰和消息驱动等场景。本文将针对Kafka的架构和相关组件进行简单的介绍。在介绍Kafka的架构之前,我们先了解一下Kafk的核心概念。
在详细介绍Kafka的架构和基本组件之前,需要先了解一下Kafka的一些核心概念。
Producer: 消息的生产者,负责往Kafka集群中发送消息;
Consumer: 消息的消费者,主动从Kafka集群中拉取消息。
Consumer Group: 每个Consumer属于一个特定的Consumer Group,新建Consumer的时候需要指定对应的Consumer Group ID。
Broker: Kafka集群中的服务实例,也称之为节点,每个Kafka集群包含一个或者多个Broker(一个Broker就是一个服务器或节点)。
Message: 通过Kafka集群进行传递的对象实体,存储需要传送的信息。
Topic: 消息的类别,主要用于对消息进行逻辑上的区分,每条发送到Kafka集群的消息都需要有一个指定的Topic,消费者根据Topic对指定的消息进行消费。
Partition: 消息的分区,Partition是一个物理上的概念,相当于一个文件夹,Kafka会为每个topic的每个分区创建一个文件夹,一个Topic的消息会存储在一个或者多个Partition中。
Segment: 一个partition当中存在多个segment文件段(分段存储),每个Segment分为两部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查询.log 文件当中数据的偏移量位置;
.log文件: 存放Message的数据文件,在Kafka中把数据文件就叫做日志文件。一个分区下面默认有n多个.log文件(分段存储)。一个.log文件大默认1G,消息会不断追加在.log文件中,当.log文件的大小超过1G的时候,会自动新建一个新的.log文件。
.index文件: 存放.log文件的索引数据,每个.index文件有一个对应同名的.log文件。
后面我们会对上面的一些核心概念进行更深入的介绍。在介绍完Kafka的核心概念之后,我们来看一下Kafka的对外提供的基本功能,组件及架构设计。
如上图所示,Kafka主要包含四个主要的API组件:
1. Producer API
应用程序通过Producer API向Kafka集群发送一个或多个Topic的消息。
2. Consumer API
应用程序通过Consumer API,向Kafka集群订阅一个或多个Topic的消息,并处理这些Topic下接收到的消息。
3. Streams API
应用程序通过使用Streams API充当流处理器(Stream Processor),从一个或者多个Topic获取输入流,并生产一个输出流到一个或者多个Topic,能够有效地将输入流进行转变后变成输出流输出到Kafka集群。
4. Connect API
允许应用程序通过Connect API构建和运行可重用的生产者或者消费者,能够把kafka主题连接到现有的应用程序或数据系统。Connect实际上就做了两件事情:使用Source Connector从数据源(如:DB)中读取数据写入到Topic中,然后再通过Sink Connector读取Topic中的数据输出到另一端(如:DB),以实现消息数据在外部存储和Kafka集群之间的传输。
接下来我们将从Kafka的架构出发,重点介绍Kafka的主要组件及实现原理。Kafka支持消息持久化,消费端是通过主动拉取消息进行消息消费的,订阅状态和订阅关系由客户端负责维护,消息消费完后不会立刻删除,会保留历史消息,一般默认保留7天,因此可以通过在支持多订阅者时,消息无需复制多分,只需要存储一份就可以。下面将详细介绍每个组件的实现原理。
1. Producer
Producer是Kafka中的消息生产者,主要用于生产带有特定Topic的消息,生产者生产的消息通过Topic进行归类,保存在Kafka 集群的Broker上,具体的是保存在指定的partition 的目录下,以Segment的方式(.log文件和.index文件)进行存储。
2. Consumer
Consumer是Kafka中的消费者,主要用于消费指定Topic的消息,Consumer是通过主动拉取的方式从Kafka集群中消费消息,消费者一定属于某一个特定的消费组。
3. Topic
Kafka中的消息是根据Topic进行分类的,Topic是支持多订阅的,一个Topic可以有多个不同的订阅消息的消费者。Kafka集群Topic的数量没有限制,同一个Topic的数据会被划分在同一个目录下,一个Topic可以包含1至多个分区,所有分区的消息加在一起就是一个Topic的所有消息。
4. Partition
在Kafka中,为了提升消息的消费速度,可以为每个Topic分配多个Partition,这也是就之前我们说到的,Kafka是支持多分区的。默认情况下,一个Topic的消息只存放在一个分区中。Topic的所有分区的消息合并起来,就是一个Topic下的所有消息。每个分区都有一个从0开始的编号,每个分区内的数据都是有序的,但是不同分区直接的数据是不能保证有序的,因为不同的分区需要不同的Consumer去消费,每个Partition只能分配一个Consumer,但是一个Consumer可以同时一个Topic的多个Partition。
5. Consumer Group
Kafka中的每一个Consumer都归属于一个特定的Consumer Group,如果不指定,那么所有的Consumer都属于同一个默认的Consumer Group。Consumer Group由一个或多个Consumer组成,同一个Consumer Group中的Consumer对同一条消息只消费一次。每个Consumer Group都有一个唯一的ID,即Group ID,也称之为Group Name。Consumer Group内的所有Consumer协调在一起订阅一个Topic的所有Partition,且每个Partition只能由一个Consuemr Group中的一个Consumer进行消费,但是可以由不同的Consumer Group中的一个Consumer进行消费。如下图所示:
在层级关系上来说Consumer Group好比是跟Topic对应的,而Consumer就对应于Topic下的Partition。Consumer Group中的Consumer数量和Topic下的Partition数量共同决定了消息消费的并发量,且Partition数量决定了最终并发量,因为一个Partition只能由一个Consumer进行消费。当一个Consumer Group中Consumer数量超过订阅的Topic下的Partition数量时,Kafka会为每个Partition分配一个Consumer,多出来的Consumer会处于空闲状态。当Consumer Group中Consumer数量少于当前定于的Topic中的Partition数量是,单个Consumer将承担多个Partition的消费工作。如上图所示,Consumer Group B中的每个Consumer需要消费两个Partition中的数据,而Consumer Group C中会多出来一个空闲的Consumer4。总结下来就是:同一个Topic下的Partition数量越多,同一时间可以有越多的Consumer进行消费,消费的速度就会越快,吞吐量就越高。同时,Consumer Group中的Consumer数量需要控制为小于等于Partition数量,且最好是整数倍:如1,2,4等。
6. Segment
考虑到消息消费的性能,Kafka中的消息在每个Partition中是以分段的形式进行存储的,即每1G消息新建一个Segment,每个Segment包含两个文件:.log文件和.index文件。之前我们已经说过,.log文件就是Kafka实际存储Producer生产的消息,而.index文件采用稀疏索引的方式存储.log文件中对应消息的逻辑编号和物理偏移地址(offset),以便于加快数据的查询速度。.log文件和.index文件是一一对应,成对出现的。下图展示了.log文件和.index文件在Partition中的存在方式。
Kafka里面每一条消息都有自己的逻辑offset(相对偏移量)以及存在物理磁盘上面实际的物理地址便宜量Position,也就是说在Kafka中一条消息有两个位置:offset(相对偏移量)和position(磁盘物理偏移地址)。在kafka的设计中,将消息的offset作为了Segment文件名的一部分。Segment文件命名规则为:Partition全局的第一个Segment从0开始,后续每个segment文件名为上一个Partition的最大offset(Message的offset,非实际物理地偏移地址,实际物理地址需映射到.log中,后面会详细介绍在.log文件中查询消息的原理)。数值最大为64位long大小,由20位数字表示,前置用0填充。
上图展示了.index文件和.log文件直接的映射关系,通过上图,我们可以简单介绍一下Kafka在Segment中查找Message的过程:
1. 根据需要消费的下一个消息的offset,这里假设是7,使用二分查找在Partition中查找到文件名小于(一定要小于,因为文件名编号等于当前offset的文件里存的都是大于当前offset的消息)当前offset的最大编号的.index文件,这里自然是查找到了00000000000000000000.index。
2. 在.index文件中,使用二分查找,找到offset小于或者等于指定offset(这里假设是7)的最大的offset,这里查到的是6,然后获取到index文件中offset为6指向的Position(物理偏移地址)为258。
3. 在.log文件中,从磁盘位置258开始顺序扫描,直到找到offset为7的Message。
至此,我们就简单介绍完了Segment的基本组件.index文件和.log文件的存储和查询原理。但是我们会发现一个问题:.index文件中的offset并不是按顺序连续存储的,为什么Kafka要将索引文件设计成这种不连续的样子?这种不连续的索引设计方式称之为稀疏索引,Kafka中采用了稀疏索引的方式读取索引,kafka每当.log中写入了4k大小的数据,就往.index里以追加的写入一条索引记录。使用稀疏索引主要有以下原因:
(1) 索引稀疏存储,可以大幅降低.index文件占用存储空间大小。
(2) 稀疏索引文件较小,可以全部读取到内存中,可以避免读取索引的时候进行频繁的IO磁盘操作,以便通过索引快速地定位到.log文件中的Message。
7. Message
Message是实际发送和订阅的信息是实际载体,Producer发送到Kafka集群中的每条消息,都被Kafka包装成了一个Message对象,之后再存储在磁盘中,而不是直接存储的。Message在磁盘中的物理结构如下所示。
其中 key 和 value 存储的是实际的Message内容,长度不固定,而其他都是对Message内容的统计和描述,长度固定。因此在查找实际Message过程中,磁盘指针会根据Message的 offset 和 message length 计算移动位数,以加速Message的查找过程。之所以可以这样加速,因为Kafka的.log文件都是顺序写的,往磁盘上写数据时,就是追加数据,没有随机写的操作。
8.Partition Replicas
最后我们简单聊一下Kafka中的Partition Replicas(分区副本)机制,0.8版本以前的Kafka是没有副本机制的。创建Topic时,可以为Topic指定分区,也可以指定副本个数。kafka 中的分区副本如下图所示:
Kafka通过副本因子(replication-factor)控制消息副本保存在几个Broker(服务器)上,一般情况下副本数等于Broker的个数,且同一个副本因子不能放在同一个Broker中。副本因子是以分区为单位且区分角色;主副本称之为Leader(任何时刻只有一个),从副本称之为 Follower(可以有多个),处于同步状态的副本叫做in-sync-replicas(ISR)。Leader负责读写数据,Follower不负责对外提供数据读写,只从Leader同步数据,消费者和生产者都是从leader读写数据,不与follower交互,因此Kafka并不是读写分离的。同时使用Leader进行读写的好处是,降低了数据同步带来的数据读取延迟,因为Follower只能从Leader同步完数据之后才能对外提供读取服务。
如果一个分区有三个副本因子,就算其中一个挂掉,那么只会剩下的两个中,选择一个leader,如下图所示。但不会在其他的broker中,另启动一个副本(因为在另一台启动的话,必然存在数据拷贝和传输,会长时间占用网络IO,Kafka是一个高吞吐量的消息系统,这个情况不允许发生)。如果指定分区的所有副本都挂了,Consumer如果发送数据到指定分区的话,将写入不成功。Consumer发送到指定Partition的消息,会首先写入到Leader Partition中,写完后还需要把消息写入到ISR列表里面的其它分区副本中,写完之后这个消息才能提交offset。
到这里,差不多把Kafka的架构和基本原理简单介绍完了。Kafka为了实现高吞吐量和容错,还引入了很多优秀的设计思路,如零拷贝,高并发网络设计,顺序存储,以后有时间再说。
python怎么安装acl包搜索本产品内容
查询
文档中心 消息队列 CKafka SDK 文档 Python SDK 公网 SASL_SSL 方式接入
公网 SASL_SSL 方式接入
最近更新时间:2021-12-28 09:54:00
操作场景
前提条件
操作步骤
步骤1:准备工作
步骤2:生产消息
步骤3:消费消息
操作场景
该任务以 Python 客户端为例,指导您使用公网 SASL_SSL 方式接入消息队列 CKafka 并收发消息。
前提条件
安装 Python
安装 pip
配置 ACL 策略
下载 Demo
下载 SASL_SSL 证书
操作步骤
步骤1:准备工作
创建接入点。
在 实例列表 页面,单击目标实例 ID,进入实例详情页。
在 基本信息 接入方式 中,单击添加路由策略,在打开窗口中选择:路由类型:公网域名接入,接入方式:SASL_SSL。
创建角色。
在用户管理页面新建角色,设置密码。
创建 Topic。
在控制台 topic 管理页面新建 Topic(参见 创建 Topic)。
添加 Python 依赖库。
执行以下命令安装:
pip install kafka-python
步骤2:生产消息
修改生产消息程序 producer.py 中配置参数。
producer = KafkaProducer(
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1, 1),
#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = "PLAIN",
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
message = "Hello World! Hello Ckafka!"
msg = json.dumps(message).encode()
producer.send('topic_name', value = msg)
print("produce message " + message + " success.")
producer.close()
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在 CKafka 控制台 的实例详情页面的基本信息获取,用户在用户管理创建用户时设置。
sasl_plain_password
用户密码,在 CKafka 控制台实例详情页面的用户管理创建用户时设置。
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。
CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
编译并运行 producer.py。
查看运行结果。
在 CKafka 控制台 的 topic管理页面,选择对应的 Topic , 单击更多 消息查询,查看刚刚发送的消息。
步骤3:消费消息
修改消费消息程序 consumer.py 中配置参数。
consumer = KafkaConsumer(
'topic_name',
group_id = "group_id",
bootstrap_servers = ['xx.xx.xx.xx:port'],
api_version = (1,1),
#
# SASL_SSL 公网接入
#
security_protocol = "SASL_SSL",
sasl_mechanism = 'PLAIN',
sasl_plain_username = "instanceId#username",
sasl_plain_password = "password",
ssl_cafile = "CARoot.pem",
ssl_check_hostname = False,
)
for message in consumer:
print ("Topic:[%s] Partition:[%d] Offset:[%d] Value:[%s]" %
(message.topic, message.partition, message.offset, message.value))
参数
描述
bootstrap_servers
接入网络,在控制台的实例详情页面接入方式模块的网络列复制。
group_id
消费者的组 ID,根据业务需求自定义。
sasl_plain_username
用户名,格式为 实例 ID + # + 用户名。实例 ID 在CKafka 控制台的实例详情页面的基本信息获取,用户在用户管理创建用户时设置。
sasl_plain_password
用户名密码,在 CKafka 控制台实例详情页面的用户管理创建用户时设置
topic_name
Topic 名称,您可以在控制台上 topic管理页面复制。
CARoot.pem
采用 SASL_SSL 方式接入时,所需的证书路径。
编译并运行 consumer.py。
查看运行结果。
在 CKafka 控制台 的 Consumer Group 页面,选择对应的消费组名称,在主题名称输入 Topic 名称,单击查询详情,查看消费详情。
上一篇: 公网 SASL_PLAINTEXT 方式接入下一篇: VPC 网络接入
文档内容是否对您有帮助?
有帮助没帮助
如果遇到产品相关问题,您可咨询 在线客服 寻求帮助。
消息队列 CKafka 相关文档
API 概览
创建主题
签名方法
获取实例列表
删除主题白名单
增加主题白名单
错误返回结果
产品概述
获取主题属性
点击搜索腾讯云文档
取消
清除查询
腾讯云Logstash实战4-使用Logstash消费kafka数据并写入到ElasticsearchLogstash的一个典型应用场景,就是消费kafka中的数据并且写入到Elasticsearch, 使用 腾讯云Logstash 产品,可以通过简单的配置快速地完成这一过程。
在“管道管理”页面,点击“新建管道”按钮,创建一个管道:
进入管道配置页面,点击“引用模板”按钮,同时引用“input-kafka”和“output-elasticsearch”两个模板:
在管道配置中,分别针对“input-kafka”和“output-elasticsearch”进行配置,一些关键的配置参数说明如下:
查看更多参数,可以参考 input-kafka
查看更多参数,可以参考 output-elasticsearch
在配置完管道后,点击“保存并部署”创建一个管道并自动部署:
在控制台查看Logstash的运行日志,如果没有ERROR级别的日志,则说明管道运行正常:
进入到output-elasticsearch中定义的输出端的ES集群对应的kibana页面,在Dev tools工具栏里查看索引是否存在,以及索引的文档数量是否正确:
腾讯云CKafka压测踩坑记录由于最近项目要上腾讯云,不得不对腾讯云CKafka进行压测,评估kafka的处理性能是否满足项目需求。(项目期望Kafka能够处理上千万级别的MQ)
一、 明确测试目的
本次性能测试在UAT环境下腾讯云服务器上CKafka处理MQ消息能力进行压力测试。测试包括对Kafka写入MQ消息和消费MQ消息进行压力测试,根据100w、500w和1000w级别的消息处理结果,评估Kafka的处理性能极限值。
二、 Kafka测试前期准备
2.1 Kafka的性能测试主要测试kafka的吞吐量,kafka吞性能为生产者在向kafka传入消息时的写入量,kafka的吐性能为消费者在kafka集群中消费的能力,也就是读取量。
2.2 Borker相关
Kafka的borker是kafka集群的缓存代理,消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。具体可参考kafka官方文档。
2.3 Cousumer相关
Consumer为kafka的消费者,同一个topic消费者越多越快,但是需要注意的是,消费者的数量不能超过topic的分区数量,因为每个topic的每个分区只能被一个消费者消费,多出来的消费者会无信息可消费。导致资源浪费。具体可参考腾讯云Ckafka指南。
三、Kafka常用参数配置
3.1 生产端常用参数配置如下:
消费者参数配置如下:
Broker 配置参数说明如下:
四、场景设计
4.1 Kafka写入消息压力测试
4.2 Kafka消费压测测试
五、测试方法
5.1 在服务器上使用Kafka自带的测试脚本,分别模拟100w、500w和1000w的消息写入请求,查看Kafka处理不同数量级的消息数时的处理能力,包括每秒生成消息数、吞吐量、消息延迟时间。Kafka消息吸入创建的topic命名为test-2,使用命令发起消费该topic的请求,查看Kafka消费不同数量级别的消息时的处理能力。
5.2 压测命令(脚本执行目录:bin/)
zookeeper脚本:./kafka-consumer-perf-test.sh --zookeeper IP:port --topic forbid_resources_topic --fetch-size 1048576 --messages 10000 --threads 1
写入脚本命令的参数解析:
消费脚本参数解析:
六、测试结果
写入100W结果:
消费100W结果:
注意:这里的坑就来了实际消费数量与脚本设置的消费数量不一致,在这里的这个问题查了很多资料发现两个问题,一会下面慢慢解释,先来看每个字段的意思。
我们先来看看消费的每个数据字段的含义,如下图:
上图我们可以看出,data.consumed.in.nMsg(总消费消息数)与脚本中messages设置的值不一致;设置消费100W,实际消费121431条消息。
坑就在这里,由于是买的腾讯云的PASS服务,很多东西都没办法获取权限查看,只能一步步和客服沟通,挨个排查。
腾讯售后客服也给发了案例和教程,发现教程里也是实际消费和设置消费数不一致;,此处省略一万字。。。
最终在无意之间更改了Topic分区数之后再次运行脚本发现问题消失了,测试环境的Topic分区设置为1,后续增加分区数发现能实际消费和设置消费消息数一致。最后经过多次测试最终Topic分区数设置为3。这次之后发现忽略了腾讯云提供的压测指南中的底部有几句话;
在多次和腾讯售后客服沟通和交流,后续也有和腾讯相关后端开发沟通,发现竟然连开发都解释不清楚出现这个问题的原因,只要深入了解,就会含糊解释说:“我们卖了这么多产品,Kafka肯定不会有问题的”,其内部也没有关于Kafka相关的压测分析案例。可能是我寡闻,在此记录,也是分享出此次压测的踩坑经历。
什么是kafkaKafka最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大特性就是可以实时处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低时延的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务水平和最关键指标之一。
基本工作流程如上图所示,其中:
我们看上面的架构图中,producer就是生产者,是数据的入口。注意看图中的红色箭头,Producer在写入数据的时候 永远的找leader ,不会直接将数据写入follower!那leader怎么找呢?写入的流程又是什么样的呢?我们看下图:
发送的流程就在图中已经说明了,就不单独在文字列出来了!需要注意的一点是,消息写入leader后,follower是主动的去leader进行同步的!producer采用push模式将数据发布到broker,每条消息追加到分区中,顺序写入磁盘,所以保证 同一分区 内的数据是有序的!写入示意图如下:
上面说到数据会写入到不同的分区,那kafka为什么要做分区呢?相信大家应该也能猜到,分区的主要目的是:
熟悉负载均衡的朋友应该知道,当我们向某个服务器发送请求的时候,服务端可能会对请求做一个负载,将流量分发到不同的服务器,那在kafka中,如果某个topic有多个partition,producer又怎么知道该将数据发往哪个partition呢?kafka中有几个原则:
保证消息不丢失是一个消息队列中间件的基本保证,那producer在向kafka写入消息的时候,怎么保证消息不丢失呢?其实上面的写入流程图中有描述出来,那就是通过ACK应答机制!在生产者向队列写入数据的时候可以设置参数来确定是否确认kafka接收到数据,这个参数可设置的值为 0 、 1 、 all 。
最后要注意的是,如果往不存在的topic写数据,能不能写入成功呢?kafka会自动创建topic,分区和副本的数量根据默认配置都是1。
Producer将数据写入kafka后,集群就需要对数据进行保存了!kafka将数据保存在磁盘,可能在我们的一般的认知里,写入磁盘是比较耗时的操作,不适合这种高并发的组件。Kafka初始会单独开辟一块磁盘空间,顺序写入数据(效率比随机写入高)。
前面说过了每个topic都可以分为一个或多个partition,如果你觉得topic比较抽象,那partition就是比较具体的东西了!Partition在服务器上的表现形式就是一个一个的文件夹,每个partition的文件夹下面会有多组segment文件,每组segment文件又包含.index文件、.log文件、.timeindex文件(早期版本中没有)三个文件, log文件就实际是存储message的地方,而index和timeindex文件为索引文件,用于检索消息。
上面说到log文件就实际是存储message的地方,我们在producer往kafka写入的也是一条一条的message,那存储在log中的message是什么样子的呢?消息主要包含消息体、消息大小、offset、压缩类型……等等!我们重点需要知道的是下面三个:
无论消息是否被消费,kafka都会保存所有的消息。那对于旧数据有什么删除策略呢?
需要注意的是,kafka读取特定消息的时间复杂度是O(1),所以这里删除过期的文件并不会提高kafka的性能!
消息存储在log文件后,消费者就可以进行消费了。在讲消息队列通信的两种模式的时候讲到过点对点模式和发布订阅模式。Kafka采用的是点对点的模式,消费者主动的去kafka集群拉取消息,与producer相同的是,消费者在拉取消息的时候也是 找leader 去拉取。
多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组id!同一个消费组者的消费者可以消费同一topic下不同分区的数据,但是不会组内多个消费者消费同一分区的数据!!!如下图:
图示是消费者组内的消费者小于partition数量的情况,所以会出现某个消费者消费多个partition数据的情况,消费的速度也就不及只处理一个partition的消费者的处理速度!如果是消费者组的消费者多于partition的数量,那会不会出现多个消费者消费同一个partition的数据呢?上面已经提到过不会出现这种情况!多出来的消费者不消费任何partition的数据。所以在实际的应用中,建议 消费者组的consumer的数量与partition的数量一致 !
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的本身特性.且无论任何OS下,对文件系统本身的优化是非常艰难的.文件缓存/直接内存映射等是常用的手段.因为kafka是对日志文件进行append操作,因此磁盘检索的开支是较小的;同时为了减少磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到一定阀值时,再flush到磁盘,这样减少了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提升.
除磁盘IO之外,我们还需要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并没有提供太多高超的技巧;对于producer端,可以将消息buffer起来,当消息的条数达到一定阀值时,批量发送给broker;对于consumer端也是一样,批量fetch多条消息.不过消息量的大小可以通过配置文件来指定.对于kafka broker端,似乎有个sendfile系统调用可以潜在的提升网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域即可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).
其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,因此启用消息压缩机制是一个良好的策略;压缩需要消耗少量的CPU资源,不过对于kafka而言,网络IO更应该需要考虑.可以将任何在网络上传输的消息都经过压缩.kafka支持gzip/snappy等多种压缩方式
kafka集群中的任何一个broker,都可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接;消息由producer直接通过socket发送到broker,中间不会经过任何"路由层".
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢整体的网络延迟,批量延迟发送事实上提升了网络效率;不过这也有一定的隐患,比如当producer失效时,那些尚未发送的消息将会丢失。
其他JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker需要太多额外的工作.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是相当轻量级的.当消息被consumer接收之后,consumer可以在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.由此可见,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不仅提高了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不同,kafka中的消息是批量(通常以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.
Kafka提供3种消息传输一致性语义:最多1次,最少1次,恰好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的情况;
最多1次:可能会出现数据丢失情况;
恰好1次:并不是指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的情况。
at most once: 消费者fetch消息,然后保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中consumer进程失效(crash),导致部分消息未能继续处理.那么此后可能其他consumer会接管,但是因为offset已经提前保存,那么新的consumer将不能fetch到offset之前的消息(尽管它们尚没有被处理),这就是"at most once".
at least once: 消费者fetch消息,然后处理消息,然后保存offset.如果消息处理成功之后,但是在保存offset阶段zookeeper异常或者consumer失效,导致保存offset操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息,这就是"at least once".
"Kafka Cluster"到消费者的场景中可以采取以下方案来得到“恰好1次”的一致性语义:
最少1次+消费者的输出中额外增加已处理消息最大编号:由于已处理消息最大编号的存在,不会出现重复处理消息的情况。
kafka中,replication策略是基于partition,而不是topic;kafka将每个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(可以没有);备份的个数可以通过broker配置文件来设定。leader处理所有的read-write请求,follower需要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪所有的follower状态,如果follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当所有的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具有良好的网络环境.即使只有一个replicas实例存活,仍然可以保证消息的正常发送和接收,只要zookeeper集群存活即可.
选择follower时需要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,如果一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,需要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.
每个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每个日志都有一个offset来唯一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
获取消息时,需要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,可以找到此消息所在segment文件,然后根据segment的最小offset取差值,得到它在file中的相对位置,直接读取输出即可.
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作(比如consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册自己的节点信息(临时znode),同时当broker和zookeeper断开连接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册自己持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每个consumer客户端被创建时,会向zookeeper注册自己的信息;此作用主要是为了"负载均衡".一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.
Consumer id Registry: 每个consumer都有一个唯一的ID(host:uuid,可以通过配置文件指定,也可以由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每个consumer目前所消费的partition中最大的offset.此znode为持久节点,可以看出offset跟group_id有关,以表明当group中一个消费者失效,其他consumer可以继续消费.
Partition Owner registry: 用来标记partition正在被哪个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的操作:
A) 首先进行"Consumer id Registry";
B) 然后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其他consumer的"leave"和"join";只要此znode path下节点列表变更,都会触发此group下consumer的负载均衡.(比如一个consumer失效,那么其他consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活情况;如果broker列表变更,将会触发所有的groups下的consumer重新balance.
总结:
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
如果leaders永远不会down的话我们就不需要followers了!一旦leader down掉了,需要在followers中选择一个新的leader.但是followers本身有可能延时太久或者crash,所以必须选择高质量的follower作为leader.必须保证,一旦一个消息被提交了,但是leader down掉了,新选出的leader必须可以提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据所有副本节点的状况动态的选择最适合的作为leader.Kafka并不是使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。因此这个集合中的任何一个节点随时都可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就可以允许在f个节点down掉的情况下不会丢失消息并正常提供服。ISR的成员是动态的,如果一个节点被淘汰了,当它重新达到“同步中”的状态时,他可以重新加入ISR.这种leader的选择方式是非常快速的,适合kafka的应用场景。
一个邪恶的想法:如果所有节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦所有节点都down了,这个就不能保证了。
实际应用中,当所有的副本都down掉时,必须及时作出反应。可以有以下两种选择:
这是一个在可用性和连续性之间的权衡。如果等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。如果等待ISR意外的节点恢复,这个节点的数据就会被作为线上数据,有可能和真实的数据有所出入,因为有些数据它可能还没同步到。Kafka目前选择了第二种策略,在未来的版本中将使这个策略的选择可配置,可以根据场景灵活的选择。
这种窘境不只Kafka会遇到,几乎所有的分布式数据系统都会遇到。
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡这样每个几点都会担任一定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点作为“controller”,当发现有节点down掉的时候它负责在游泳分区的所有节点中选择新的leader,这使得Kafka可以批量的高效的管理所有分区节点的主从关系。如果controller down掉了,活着的节点中的一个会备切换为新的controller.
对于某个分区来说,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会完全复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采取的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。一般情况下,一个分区有一个“正分区”和零到多个“备份分区”。可以配置“正分区+备份分区”的总数量,关于这个配置,不同主题可以有不同的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通信。
Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topic配置副本的数量。Kafka会自动在每个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
创建副本的单位是topic的分区,每个分区都有一个leader和零或多个followers.所有的读写操作都由leader处理,一般分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。所有的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在自己的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪所有“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时太久,leader就会把它移除。至于延时多久算是“太久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被所有的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担心一旦leader down掉了消息会丢失。Producer也可以选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。

立即
返回
1
目录:1、阿里云服务器域名配置2、阿里云服务器二级域名配置3、如何配置阿里云服务器?怎么弄出和普通电脑一样的操作界面?怎么配置...