blog
原创

【工作笔记】——记一次日志收集中的相关内容

Kafka

Kafka是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据,一般用来作日志的处理

消息队列的优点

解耦

异步

削峰

Kafka消费模式

一对一

消息生产者发布消息到Queue中,通知消费者从队列中拉取信息进行消费。消息被消费后则删除,Queue支持多个消费者,但对于一条消息而言,只能有一个消费者进行消费,即一条信息只能被一个消费者消费

一对多

一对多方式又称为发布/订阅(Pub/Sub)模式,利用Topic存储消息,消息生产者将消息发布到Topic后,同时有多个消费者订阅此Topic,消费者可以从中消费信息,发布到Topic中的消息会被多个消费者消费,消费者消费信息后,信息不会被删除,Kafka会默认保存一段时间后删除

Kafka的基础架构

image-20211019145043160

Producer:消息生产者,即向Kafka中发布消息的角色

Consumer:消息消费者,即从Kafka中拉取消息消费的角色

Consumer Group:消费者组,消费者组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息。多个消费者组之间互不影响,所有的消费者都属于某个消费者组。某个分区中的消息只能够被一个消费者组中的一个消费者所消费

Broker:一台Kafka服务器被称为一个Broker,一个Kafka集群中由多个Broker组成,一个Broker可以容纳多个Topic

Topic:主题,每条发布到Kafka中的消息都有一个主题,这个主题即为Topic,类似于数据库中的表名

Partition:分区,为了实现扩展性,一个非常大的Topic可以分布在多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序队列

Replica:副本,当一个Kafka集群中的某个节点发生故障,为保证节点上的Partition数据不丢失,Kafka可以正常工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本、一个Leader和多个Follower

Leader:每个分区有多个副本,有且仅有一个Leader,Leader是当前负责数据读写的partition

Follower:Follower跟随Leader,实时从Leader中同步数据,保持和Leader的数据同步,当Leader发生故障时,会从Follower中选举出一个新的Leader

Offset:kafka的存储文件都是按照offset.kafka命名,便于查找队列中的消息

Clickhouse Kafka引擎 将Kafka数据同步至Clickhouse

clickhouse的kafka引擎提供了clickhouse与kafka的双向同步支持,clickhouse的kafka引擎表将作为一个消费者,当kafka中的对应Topic有消息进入时,获取该消息,将其进行消费,并通过物化视图同步插入到MergeTree表中

同步步骤

1. Kafka创建Topic

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test --partitions 1 --replication-factor 1 

2. 创建Kafka引擎表

CREATE TABLE queue (
	name String,
    type String
) 
ENGINE = Kafka 
SETTINGS kafka_broker_list = 'localhost:9092',
         kafka_topic_list = 'test',
         kafka_group_name = 'group1',
         kafka_format = 'JSONEachRow',
         kafka_row_delimeter = '\n'

必要参数:

  • kafka_broker_list – 以逗号分隔的 brokers 列表
  • kafka_topic_list – topic 列表
  • kafka_group_name – Kafka 消费组名称 (group1),如果不希望消息在集群中重复,请在每个分片中使用相同的组名
  • kafka_format – 消息体格式,使用与 SQL 部分的 FORMAT 函数相同表示方法,例如 JSONEachRow,参考官网Format格式文档

可选参数:

  • kafka_row_delimiter - 每个消息体(记录)之间的分隔符
  • kafka_schema – 如果解析格式需要一个 schema 时,此参数必填
  • kafka_num_consumers – 单个表的消费者数量。默认值是:1,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者

3. 创建结构表

该表是Kafka数据同步的目标表

CREATE TABLE target (
	name String,
    type String
) 
ENGINE = MergeTree ORDER BY type

4. 创建物化视图

物化视图会在后台收集数据,持续不断地从Kafka收集数据并通过SELECT将数据转换为所需要的格式存放至目标表

CREATE MATERIALIZED VIEW consumet TO target
AS 
SELECT * FROM queue WHERE type = '1'

5. 测试

通过Kafka生产者向Kafka中生产数据

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> { "name":"name1", "type": "1" }
> { "name":"name2", "type": "1" }
> { "name":"name3", "type": "1" }
> { "name":"name4", "type": "2" }
> { "name":"name5", "type": "3" }

查询目标表target

select * from target;

SELECT *
FROM target

Query id: 68380c1f-25dd-4d3e-a467-4019e43b46a3

┌──name─┬─type─┐
│ name1 │ 1    │
│ name2 │ 1    │
│ name3 │ 1    │
└───────┴──────┘

3 rows in set. Elapsed: 0.001 sec. 

若需要停止接收Topic数据或更改转换逻辑需要停用物化视图,更改完毕后再开启物化视图

# 停用物化视图
DETACH TABLE consumer;
# 启用物化视图
ATTACH TABLE consumer;

脏数据处理

当Topic中的数据存在不符合Kafka引擎表格式的数据,Kafka引擎表会一直尝试读取该消息并报错,此时需要重置Kafka的offset

1. Detach Kafka引擎表

DETACH TABLE queue;

若不停用Kafka引擎表,再后续操作中重置Kafka offset时会报错

Error: Assignments can only be reset if the group ‘group1’ is inactive, but the current state is Stable

2. 重置offset

bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --group group1 --topic test --reset-offsets --to-offset 2 --execute

3. Attach Kafka引擎表

ATTACH TABLE queue;

JSON处理

当Kafka的JSON数据中的某个对象存在子对象时

{ "object":{"name":"o1", "type": "obj"}, "message":"this is a message" }

无法使用JSONEachRow的format将Kafka数据同步至Clickhouse

一种解决方式是使Kafka引擎表接收整个JSON,然后在物化视图中编写相关转换逻辑

修改Kafka引擎表

CREATE TABLE queue (	json String,) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',         kafka_topic_list = 'test',         kafka_group_name = 'group1',         kafka_format = 'JSONAsString',         kafka_row_delimeter = '\n'

JSONAsString会将整个JSON解释为一个值,使用此格式的表只能存在一个字段,且该字段类型为String [官方描述]

修改物化视图转换逻辑

CREATE MATERIALIZED VIEW consumet TO targetAS SELECT 	JSONExtractRaw(json, 'object') as object,	JSONExtractString(json, 'message') as messageFROM queue 
相关JSON操作函数
JSONExtractString

JSONExtractString(json[, indices_or_keys]…)

解析JSON并提取字符串。此函数类似于visitParamExtractString函数。

如果该值不存在或类型错误,则返回空字符串。

该值未转义。如果unescaping失败,则返回一个空字符串。

该方法可以从JSON中提取对应值并转换为字符串

示例:

select JSONExtractString('{"a": "hello", "b": [-100, 200.0, 300]}', 'a') = 'hello'select JSONExtractString('{"abc":"\\n\\u0000"}', 'abc') = '\n\0'select JSONExtractString('{"abc":"\\u263a"}', 'abc') = '☺'select JSONExtractString('{"abc":"\\u263"}', 'abc') = ''select JSONExtractString('{"abc":"hello}', 'abc') = ''
JSONExtractRaw

该方法将JSON中的对应值以未转义的字符串的形式返回

如:

select JSONExtractRaw('{ "object":{"name":"o1", "type": "obj"}, "message":"this is a message" }', 'object') = {"name":"o1","type":"obj"}
select JSONExtractRaw('{ "object":{"name":"o1", "type": "obj"}, "message":"this is a message" }', 'message') = "this is a message"
其他JSON函数

使用gohangout对Kafka数据进行清洗并导入clickhouse

gohangout配置

inputs:  
  - Kafka:      
      topic:        
	    test: 1      
	  codec: json      
	  consumer_settings:        
	    bootstrap.servers: "local:9092"        
	    group.id: test
filters:  
  - Convert:      
      fields:        
	    object:          
		  to: string        
		message:         
		  to: string
outputs:  
  - Clickhouse:      
      table: 'database.tabelname'      
	  conn_max_life_time: 1800      
	  hosts:      
	  - 'tcp://localhost:9000'      
	  fields: ['object', 'message']      
	  bulk_actions: 1000      
	  flush_interval: 10      
	  concurrent: 1

当Kafka中存在非友好格式的数据时,Kafka引擎表不会跳过该数据并持续报错,需要手动重置offset

gohangout会报出错误或尝试将数据以支持的格式插入到某字段中,并继续消费后续消息

Clickhouse
Kafka
工作笔记
  • 作者:Melonico
  • 发表时间:2021-10-20 16:03
  • 更新时间:2021-10-20 16:14

评论

暂无评论,快来发表第一个评论吧!
留言
TOP