程序员笔记 (七十三)使用Kafka Consumer
外向笑小鸭子
2024-01-12 14:19:34
51623
赞同 0
反对 0
程序员笔记 (七十三)使用Kafka Consumer
我们已经知道了如何发送数据到Kafka,既然有数据发送,那么肯定就有数据消费,消费者也是Kafka整个体系中不可缺少的一环
Kafka Consumer采用的是主动拉取broker数据进行消费的。一般消息中间件存在推送(server推送数据给consumer)和拉取(consumer主动取服务器取数据)两种方式,这两种方式各有优劣。
如果是选择推送的方式最大的阻碍就是服务器不清楚consumer的消费速度,如果consumer中执行的操作又是比较耗时的,那么consumer可能会不堪重负,甚至会导致系统挂掉。
而采用拉取的方式则可以解决这种情况,consumer根据自己的状态来拉取数据,可以对服务器的数据进行延迟处理。但是这种方式也有一个劣势就是服务器没有数据的时候可能会一直轮询,不过还好Kafka在poll()有参数允许消费者请求在“长轮询”中阻塞,等待数据到达(并且可选地等待直到给定数量的字节可用以确保传输大小)。
上面代码中消费者必须的属性有4个,这里着重说一下group.id这个属性,kafka Consumer和Producer不一样,Consummer中有一个Consumer group(消费组),由它来决定同一个Consumer group中的消费者具体拉取哪个partition的数据,所以这里必须指定group.id属性。
bootstrap.servers 连接Kafka集群的地址,多个地址以逗号分隔
key.deserializer 消息中key反序列化类,需要和Producer中key序列化类相对应
value.deserializer 消息中value的反序列化类,需要和Producer中Value序列化类相对应
如果您发现该资源为电子书等存在侵权的资源或对该资源描述不正确等,可点击“私信”按钮向作者进行反馈;如作者无回复可进行平台仲裁,我们会在第一时间进行处理!