处理 kafka 因数据量大时消息丢失

背景:

中车电机二期项目时常出现 kafka 未消费的情况,查询消息历史显示已经发送,已经处理,但实际上没有进行处理,日志里面显示这条数据的 mesId 为 null

分析

原因初步分析是由数据量过大导致的,因为 kafka 不消费的情况主要集中在早上,大批量数据下达的时候

解决方法

这个问题是波总提供的方法解决的,波总提供了一个有道云文档,里面分析了南通项目遇到相同问题解决方法。
image.png

这里我在网上查询相关的资料,发现 kafka0.8 和之后版本的区别,以及 metadata.broker.list 和 bootstrap.servers 的区别

这是区别分析地址 https://blog.csdn.net/pony_maggie/article/details/95862515

这是波总有道云地址http://note.youdao.com/noteshare?id=00a66e8963d7ec1b972abfa59653f8e4

最后分享

波总提供的 MestarMsgProducer.java 是用了 Lambda 表达式
image.png

我提供一个 jdk1.8 以下的一种写法 写了一个内部类 实现这个 callback 接口

image.png

 public static void send(String topic, String msgId, String messageStr)

    {
        // 0.8版本的Producer在面对大量数据的写入时,会导致producer端使用的直接内存无法释放,最终导致应用被操作系统中断掉。
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, messageStr);
        LOG.debug("KafkaProducer send msgId: {}", msgId);

        
        final String msgIdNew = msgId;
        final String topicNew = record.topic();
        
        getInstance().send(record, (new Callback() {
			
			@Override
			public void onCompletion(RecordMetadata metadata, Exception e) {

	            if (null != metadata)
	            {
	            	 LOG.debug("send to topic:" + topicNew +" msgId:" + msgIdNew + " at partition:" + metadata.partition() + " offset:" + metadata.offset() + " spend:" + 0 + "ms" );
	            }
	            if (e != null)
	            {
	            	 LOG.debug("KafkaProducer send msgId: {} Error: {}", msgIdNew, e.getMessage());
	            }
				
			}
		}));
        
        
    }```