Kafka不消费消息的解决方法

时间:16-08-17 栏目:Java开发, Kafka 作者:kyle 评论:0 点击: 19,185 次

 

Kafka是大数据里面不可缺少的一个消息队列服务。

 

安装前需要安装好Zookeeper做负载均衡,然后装好Kafka即可使用,建议在3台以上的服务器安装,服务器数量必须为奇数。以便zookeeper做调度。

 

Kafka传说中非常非常强。一秒可以处理几十万消息。我用来做日志实时计算,供后续Storm、Hive、Spark等进行交换处理数据。

 

离线的日志部份存到了Hadoop集群里面。

 

最近在用Java做测试的时候,发现Kafka不消费消息,先用生产者,Producer生产了1000条消息,再用消费者Consumer进行消息处理。发现消费者没有任何反映。

 

生产者配置如下:

/**
 * Created by Kyle on 16/7/9.
 */
public class KafkaUtil {

    private static KafkaConsumer<String, String> consumer;
    private static KafkaProducer<String, String> producer;

    /**
     * 创建生产者实例
     * <p>
     * 配置参考:http://kafka.apache.org/documentation.html#producerconfigs
     *
     * @return
     */
    public static KafkaProducer<String,String> createProducer() {

        if (producer == null) {

            Properties props = new Properties();

            //Kafka集群连接串,可以由多个host:port组成
            props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");

            //broker消息确认的模式,有三种:默认1
//        0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
//        1:由Leader确认,Leader接收到消息后会立即返回确认信息
//        all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
            props.put("acks", "all");

            //发送失败时Producer端的重试次数,默认为0
            props.put("retries", 0);

            //当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节
            props.put("batch.size", 16384);

            //发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
            props.put("linger.ms", 1);

            //消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)
            props.put("buffer.memory", 33554432);

            //消息key/value的序列器Class,根据key和value的类型决定
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            // 分区接口类
            //props.put("partitioner.class","");

            //K代表每条消息的key类型,V代表消息类型。消息的key用于决定此条消息由哪一个partition接收,所以我们需要保证每条消息的key是不同的。
            producer = new KafkaProducer<String, String>(props);
        }
        return producer;
    }


    /**
     * 创建消息费实例
     * <p>
     * 配置参考:http://kafka.apache.org/documentation.html#newconsumerconfigs
     *
     * @return
     */
    public static KafkaConsumer<String, String> createConsumer() {

        if (consumer == null) {
            Properties props = new Properties();

            ////Kafka集群连接串,可以由多个host:port组成
            props.put("bootstrap.servers", "localhost:9091,localhost:9092,localhost:9093");

            //Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
            props.put("group.id", "test");

            //是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
            props.put("enable.auto.commit", "false");

            //自动提交offset的间隔毫秒数,默认5000。
            //本 例中采用的是自动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自动提交的间隔内发生故障(比如整个JVM进程死掉),那么有一部分消息是会被 重复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代 码中用consumer.commitSync()来手动提交。
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

            consumer = new KafkaConsumer<String, String>(props);
        }
        return consumer;
    }


}

 

生产者的详细代码如下:用于向Kafka存储详细消费。

        Integer messageNo = 1;
        while (true) {
            // 准备发送的消息
            String messageStr = new String("Message_" + messageNo);

            // 发送json的信息
            JSONObject json = new JSONObject();

            for (Integer i = 0; i < 10; i++) {
                json.put("test[" + messageNo + "]" + i, "testValue[" + messageNo + "]" + i);
            }

            messageStr = json.toJSONString();

            //System.out.println("Send:" + messageStr);

            //一、普通发送,发送消息到Kafka里面
            //producer.send(new ProducerRecord<String, String>(topic, messageStr));

            //二、带回调的发送,完成发送后会触发回调逻辑,在回调方法的 metadata对象中,我们能够获取到已发送消息的offset和落在的分区等信息。注意,如果acks配置为0,依然会触发回调逻辑,只是拿不到 offset和消息落地的分区信息。

            // 1、准备要发送的数据,生产消息内容
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Message:" + messageNo.toString(), messageStr);

            // 2、发送数据,成功之后回调函数
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e != null)
                        e.printStackTrace();
                    System.out.println("message send to partition " + metadata.partition() + ", offset: " + metadata.offset());
                }
            });

            messageNo++;
            try {
                sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (messageNo > 1000) break;//满1000条就退出,只测试1000条数据
        }

        producer.close();//关闭生产者

 

中间使用了FastJson处理消息内容。

 

生产者没什么问题,都正产发送了消息到Kafka里面,Kafka-Manager里面的详细 的记录

1

 

消费者不消费原因在哪儿呢?回过头去挨个查了不少资料。终于找到问题所在:

 

消费者的代码是这样的:

        // 方法一:系统去调度,自动处理消息
        consumer.subscribe(Arrays.asList(this.topic));
        while (true) {

            //poll方法即是从Broker拉取消息,在poll之前首先要用subscribe方法订阅一个Topic
            //如 果Topic有多个partition,KafkaConsumer会在多个partition间以轮询方式实现负载均衡。如果启动了多个 Consumer线程,Kafka也能够通过zookeeper实现多个Consumer间的调度,保证同一组下的Consumer不会重复消费消息。注 意,Consumer数量不能超过partition数,超出部分的Consumer无法拉取到任何数据。
            ConsumerRecords<String, String> records = consumer.poll(100);//拉取超时毫秒数,如果没有新的消息可供拉取,consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集
            for (ConsumerRecord<String, String> record : records) {

                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key:" + record.key() + ", message: " + record.value());
            }

            //提交已经拉取出来的offset,如果是手动模式下面,必须拉取之后提交,否则以后会拉取重复消息
            consumer.commitSync();

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //break;

        }

 

代码本身没有问题,问题出在消费者的配置上。

 

需要加上一个:

//此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),consumer应该从哪个offset开始消费.latest表示接受接收最大的offset(即最新消息),earliest表示最小offset,即从topic的开始位置消费所有消息.最好设为earliest,这样新的分组,能从最开始进行处理

           props.put("auto.offset.reset", "earliest");

 

原由是Kafka新的消费者,默认情况下会从最后一条消费进行消费,就是开始消费的时候,会从新增加的消息开始处理,即从我开始添加的1000条以后,才会开始处理。

 

所以必须要设置auto.offset.reset设置新加入的消费者,从头条开始处理消费。当然有些情况,可能需要从最新的开始处理。

 

设置好之后,可以正常消费了。

 

消息队列是什么东西,有什么用处呢。按我自己的理解,就是只关注消息的存储。以往我要发个短信验证码,后台接到请求之后,会进行发送,如果发送失败的话,会一直卡等结果。使用消息队列之后,后台接到发短信的请求,只需要把客户的手机号发到消息队列里面,另一边由另外的消费者负责处理消息。分割开生产消息和消费消息的线程,不阻塞用户操作界面。

 

后续会陆续重新发送Kafka相关的安装配置和优化的个人心得。

 

另,Kafka是Twitter开源的消息队列。可以百度一下相关资料,安装简单,功能强大。神器之一。

成都SEO小五嚎2句: 本文是(成都SEO小五)辛苦弄出来的,转载成都SEO小五原创的请保留链接: Kafka不消费消息的解决方法,3Q

Kafka不消费消息的解决方法:等您坐沙发呢!

来给哥评论评论


------====== 小五公告 ======------
成都SEO小五,专注成都搜索引擎优化。
小五善长站内外优化,C#、PHP开发,中英文SEO,Google中英文和百度优化技术。欢迎群内交流。伸手党请绕路,求资源的请绕开,求问题解答的请进群内交流。开放了一个QQ交流群:160750032。加入验证时请标注任何SEO相交字眼。友情链接直接Q我,收录正常,内容大部份原创、SEO或者程序开发、网络营销、线上推广等相关行业即可。

常用工具

赞助广告

来看过哥的人