处理 kafka 因数据量大时消息丢失
背景:
中车电机二期项目时常出现 kafka 未消费的情况,查询消息历史显示已经发送,已经处理,但实际上没有进行处理,日志里面显示这条数据的 mesId 为 null
分析
原因初步分析是由数据量过大导致的,因为 kafka 不消费的情况主要集中在早上,大批量数据下达的时候
解决方法
这个问题是波总提供的方法解决的,波总提供了一个有道云文档,里面分析了南通项目遇到相同问题解决方法。
这里我在网上查询相关的资料,发现 kafka0.8 和之后版本的区别,以及 metadata.broker.list 和 bootstrap.servers 的区别
这是区别分析地址 https://blog.csdn.net/pony_maggie/article/details/95862515
这是波总有道云地址http://note.youdao.com/noteshare?id=00a66e8963d7ec1b972abfa59653f8e4
最后分享
波总提供的 MestarMsgProducer.java 是用了 Lambda 表达式
我提供一个 jdk1.8 以下的一种写法 写了一个内部类 实现这个 callback 接口
public static void send(String topic, String msgId, String messageStr)
{
// 0.8版本的Producer在面对大量数据的写入时,会导致producer端使用的直接内存无法释放,最终导致应用被操作系统中断掉。
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, messageStr);
LOG.debug("KafkaProducer send msgId: {}", msgId);
final String msgIdNew = msgId;
final String topicNew = record.topic();
getInstance().send(record, (new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (null != metadata)
{
LOG.debug("send to topic:" + topicNew +" msgId:" + msgIdNew + " at partition:" + metadata.partition() + " offset:" + metadata.offset() + " spend:" + 0 + "ms" );
}
if (e != null)
{
LOG.debug("KafkaProducer send msgId: {} Error: {}", msgIdNew, e.getMessage());
}
}
}));
}```