【工作笔记】——记一次日志收集中的相关内容
Kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据,一般用来作日志的处理
消息队列的优点
解耦
异步
削峰
Kafka消费模式
一对一
消息生产者发布消息到Queue中,通知消费者从队列中拉取信息进行消费。消息被消费后则删除,Queue支持多个消费者,但对于一条消息而言,只能有一个消费者进行消费,即一条信息只能被一个消费者消费
一对多
一对多方式又称为发布/订阅(Pub/Sub)
模式,利用Topic
存储消息,消息生产者将消息发布到Topic后,同时有多个消费者订阅此Topic,消费者可以从中消费信息,发布到Topic中的消息会被多个消费者消费,消费者消费信息后,信息不会被删除,Kafka会默认保存一段时间后删除
Kafka的基础架构
Producer:消息生产者,即向Kafka中发布消息的角色
Consumer:消息消费者,即从Kafka中拉取消息消费的角色
Consumer Group:消费者组,消费者组中存在多个消费者,消费者消费Broker中当前Topic的不同分区中的消息。多个消费者组之间互不影响,所有的消费者都属于某个消费者组。某个分区中的消息只能够被一个消费者组中的一个消费者所消费
Broker:一台Kafka服务器被称为一个Broker,一个Kafka集群中由多个Broker组成,一个Broker可以容纳多个Topic
Topic:主题,每条发布到Kafka中的消息都有一个主题,这个主题即为Topic,类似于数据库中的表名
Partition:分区,为了实现扩展性,一个非常大的Topic可以分布在多个Broker上,一个Topic可以分为多个Partition,每个Partition是一个有序队列
Replica:副本,当一个Kafka集群中的某个节点发生故障,为保证节点上的Partition数据不丢失,Kafka可以正常工作,Kafka提供了副本机制,一个Topic的每个分区有若干个副本、一个Leader和多个Follower
Leader:每个分区有多个副本,有且仅有一个Leader,Leader是当前负责数据读写的partition
Follower:Follower跟随Leader,实时从Leader中同步数据,保持和Leader的数据同步,当Leader发生故障时,会从Follower中选举出一个新的Leader
Offset:kafka的存储文件都是按照offset.kafka
命名,便于查找队列中的消息
Clickhouse Kafka引擎 将Kafka数据同步至Clickhouse
clickhouse的kafka引擎提供了clickhouse与kafka的双向同步支持,clickhouse的kafka引擎表将作为一个消费者,当kafka中的对应Topic有消息进入时,获取该消息,将其进行消费,并通过物化视图同步插入到MergeTree表中
同步步骤
1. Kafka创建Topic
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --topic test --partitions 1 --replication-factor 1
2. 创建Kafka引擎表
CREATE TABLE queue (
name String,
type String
)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'test',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimeter = '\n'
必要参数:
kafka_broker_list
– 以逗号分隔的 brokers 列表kafka_topic_list
– topic 列表kafka_group_name
– Kafka 消费组名称 (group1
),如果不希望消息在集群中重复,请在每个分片中使用相同的组名kafka_format
– 消息体格式,使用与 SQL 部分的FORMAT
函数相同表示方法,例如JSONEachRow
,参考官网Format格式文档
可选参数:
kafka_row_delimiter
- 每个消息体(记录)之间的分隔符kafka_schema
– 如果解析格式需要一个 schema 时,此参数必填kafka_num_consumers
– 单个表的消费者数量。默认值是:1
,如果一个消费者的吞吐量不足,则指定更多的消费者。消费者的总数不应该超过 topic 中分区的数量,因为每个分区只能分配一个消费者
3. 创建结构表
该表是Kafka数据同步的目标表
CREATE TABLE target (
name String,
type String
)
ENGINE = MergeTree ORDER BY type
4. 创建物化视图
物化视图会在后台收集数据,持续不断地从Kafka收集数据并通过SELECT将数据转换为所需要的格式存放至目标表
CREATE MATERIALIZED VIEW consumet TO target
AS
SELECT * FROM queue WHERE type = '1'
5. 测试
通过Kafka生产者向Kafka中生产数据
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test
> { "name":"name1", "type": "1" }
> { "name":"name2", "type": "1" }
> { "name":"name3", "type": "1" }
> { "name":"name4", "type": "2" }
> { "name":"name5", "type": "3" }
查询目标表target
select * from target;
SELECT *
FROM target
Query id: 68380c1f-25dd-4d3e-a467-4019e43b46a3
┌──name─┬─type─┐
│ name1 │ 1 │
│ name2 │ 1 │
│ name3 │ 1 │
└───────┴──────┘
3 rows in set. Elapsed: 0.001 sec.
若需要停止接收Topic数据或更改转换逻辑需要停用物化视图,更改完毕后再开启物化视图
# 停用物化视图
DETACH TABLE consumer;
# 启用物化视图
ATTACH TABLE consumer;
脏数据处理
当Topic中的数据存在不符合Kafka引擎表格式的数据,Kafka引擎表会一直尝试读取该消息并报错,此时需要重置Kafka的offset
1. Detach Kafka引擎表
DETACH TABLE queue;
若不停用Kafka引擎表,再后续操作中重置Kafka offset时会报错
Error: Assignments can only be reset if the group ‘group1’ is inactive, but the current state is Stable
2. 重置offset
bin/kafka-consumer-group.sh --bootstrap-server localhost:9092 --group group1 --topic test --reset-offsets --to-offset 2 --execute
3. Attach Kafka引擎表
ATTACH TABLE queue;
JSON处理
当Kafka的JSON数据中的某个对象存在子对象时
{ "object":{"name":"o1", "type": "obj"}, "message":"this is a message" }
无法使用JSONEachRow
的format将Kafka数据同步至Clickhouse
一种解决方式是使Kafka引擎表接收整个JSON,然后在物化视图中编写相关转换逻辑
修改Kafka引擎表
CREATE TABLE queue ( json String,) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092', kafka_topic_list = 'test', kafka_group_name = 'group1', kafka_format = 'JSONAsString', kafka_row_delimeter = '\n'
JSONAsString
会将整个JSON解释为一个值,使用此格式的表只能存在一个字段,且该字段类型为String
[官方描述]
修改物化视图转换逻辑
CREATE MATERIALIZED VIEW consumet TO targetAS SELECT JSONExtractRaw(json, 'object') as object, JSONExtractString(json, 'message') as messageFROM queue
相关JSON操作函数
JSONExtractString
JSONExtractString(json[, indices_or_keys]…)
解析JSON并提取字符串。此函数类似于
visitParamExtractString
函数。如果该值不存在或类型错误,则返回空字符串。
该值未转义。如果unescaping失败,则返回一个空字符串。
该方法可以从JSON中提取对应值并转换为字符串
示例:
select JSONExtractString('{"a": "hello", "b": [-100, 200.0, 300]}', 'a') = 'hello'select JSONExtractString('{"abc":"\\n\\u0000"}', 'abc') = '\n\0'select JSONExtractString('{"abc":"\\u263a"}', 'abc') = '☺'select JSONExtractString('{"abc":"\\u263"}', 'abc') = ''select JSONExtractString('{"abc":"hello}', 'abc') = ''
JSONExtractRaw
该方法将JSON中的对应值以未转义的字符串的形式返回
如:
select JSONExtractRaw('{ "object":{"name":"o1", "type": "obj"}, "message":"this is a message" }', 'object') = {"name":"o1","type":"obj"}
select JSONExtractRaw('{ "object":{"name":"o1", "type": "obj"}, "message":"this is a message" }', 'message') = "this is a message"
其他JSON函数
使用gohangout对Kafka数据进行清洗并导入clickhouse
gohangout配置
inputs:
- Kafka:
topic:
test: 1
codec: json
consumer_settings:
bootstrap.servers: "local:9092"
group.id: test
filters:
- Convert:
fields:
object:
to: string
message:
to: string
outputs:
- Clickhouse:
table: 'database.tabelname'
conn_max_life_time: 1800
hosts:
- 'tcp://localhost:9000'
fields: ['object', 'message']
bulk_actions: 1000
flush_interval: 10
concurrent: 1
当Kafka中存在非友好格式的数据时,Kafka引擎表不会跳过该数据并持续报错,需要手动重置offset
gohangout会报出错误或尝试将数据以支持的格式插入到某字段中,并继续消费后续消息
评论