spring boot与kafka集成的简单实例
发布时间 - 2026-01-11 03:10:56 点击率:次本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下:

引入相关依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency>
从依赖项的引入即可看出,当前spring boot(1.4.2)还不支持完全以配置项的配置来实现与kafka的无缝集成。也就意味着必须通过java config的方式进行手工配置。
定义kafka基础配置
与redisTemplate及jdbcTemplate等类似。spring同样提供了org.springframework.kafka.core.KafkaTemplate作为kafka相关api操作的入口。
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
KafkaTemplate依赖于ProducerFactory,而创建ProducerFactory时则通过一个Map指定kafka相关配置参数。通过KafkaTemplate对象即可实现消息发送。
kafkaTemplate.send("test-topic", "hello");
or
kafkaTemplate.send("test-topic", "key-1", "hello");
监听消息配置
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
实现消息监听的最终目标是得到监听器对象。该监听器对象自行实现。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.Optional;
public class Listener {
@KafkaListener(topics = {"test-topic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("listen1 " + message);
}
}
}
只需用@KafkaListener指定哪个方法处理消息即可。同时指定该方法用于监听kafka中哪些topic。
注意事项
定义监听消息配置时,GROUP_ID_CONFIG配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。
@KafkaListener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。
KEY_DESERIALIZER_CLASS_CONFIG与VALUE_DESERIALIZER_CLASS_CONFIG指定key和value的编码、解码策略。kafka用key值确定value存放在哪个分区中。
后记
时间是解决问题的有效手段之一。
在spring boot 1.5版本中即可实现spring boot与kafka Auto-configuration
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
# spring
# boot
# kafka集成
# springboot集成kafka
# SpringBoot集成Kafka的步骤
# Spring boot集成Kafka消息中间件代码实例
# Springboot集成Kafka实现producer和consumer的示例代码
# spring boot 与kafka集成的示例代码
# Spring Boot集成Kafka的示例代码
# Spring boot集成Kafka+Storm的示例代码
# springboot 1.5.2 集成kafka的简单例子
# Spring Boot 集成 Kafkad的实现示例
# 也就
# 放在
# 多个
# 是由
# 还不
# 给大家
# 只有一个
# 解决问题
# 来实现
# 大家多多
# 区中
# 组中
# 时则
# 最终目标
# 发送消息
# 只需用
# 依赖于
# clients
# producer
# apache
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
js实现点击每个li节点,都弹出其文本值及修改
奇安信“盘古石”团队突破 iOS 26.1 提权
Android仿QQ列表左滑删除操作
专业企业网站设计制作公司,如何理解商贸企业的统一配送和分销网络建设?
胶州企业网站制作公司,青岛石头网络科技有限公司怎么样?
深圳网站制作培训,深圳哪些招聘网站比较好?
网易LOFTER官网链接 老福特网页版登录地址
悟空识字如何进行跟读录音_悟空识字开启麦克风权限与录音
如何用AI帮你把自己的生活经历写成一个有趣的故事?
哪家制作企业网站好,开办像阿里巴巴那样的网络公司和网站要怎么做?
Python自然语言搜索引擎项目教程_倒排索引查询优化案例
香港服务器网站搭建教程-电商部署、配置优化与安全稳定指南
Laravel如何使用集合(Collections)进行数据处理_Laravel Collection常用方法与技巧
Laravel API路由如何设计_Laravel构建RESTful API的路由最佳实践
制作公司内部网站有哪些,内网如何建网站?
如何快速生成可下载的建站源码工具?
如何快速生成专业多端适配建站电话?
Laravel怎么生成二维码图片_Laravel集成Simple-QrCode扩展包与参数设置【实战】
北京网站制作费用多少,建立一个公司网站的费用.有哪些部分,分别要多少钱?
进行网站优化必须要坚持的四大原则
百度浏览器ai对话怎么关 百度浏览器ai聊天窗口隐藏
Laravel怎么实现前端Toast弹窗提示_Laravel Session闪存数据Flash传递给前端【方法】
怎么制作网站设计模板图片,有电商商品详情页面的免费模板素材网站推荐吗?
大连 网站制作,大连天途有线官网?
深圳网站制作平台,深圳市做网站好的公司有哪些?
动图在线制作网站有哪些,滑动动图图集怎么做?
微信推文制作网站有哪些,怎么做微信推文,急?
Laravel如何使用Laravel Vite编译前端_Laravel10以上版本前端静态资源管理【教程】
音响网站制作视频教程,隆霸音响官方网站?
C++时间戳转换成日期时间的步骤和示例代码
Laravel模型关联查询教程_Laravel Eloquent一对多关联写法
如何有效防御Web建站篡改攻击?
Laravel如何实现用户密码重置功能?(完整流程代码)
如何快速搭建高效WAP手机网站?
如何在云指建站中生成FTP站点?
Laravel怎么多语言本地化设置_Laravel语言包翻译与Locale动态切换【手册】
Laravel全局作用域是什么_Laravel Eloquent Global Scopes应用指南
javascript中数组(Array)对象和字符串(String)对象的常用方法总结
如何在 Python 中将列表项按字母顺序编号(a.、b.、c. …)
大同网页,大同瑞慈医院官网?
微信小程序 HTTPS报错整理常见问题及解决方案
香港服务器部署网站为何提示未备案?
jquery插件bootstrapValidator表单验证详解
Laravel如何实现数据导出到CSV文件_Laravel原生流式输出大数据量CSV【方案】
rsync同步时出现rsync: failed to set times on “xxxx”: Operation not permitted
怎么用AI帮你为初创公司进行市场定位分析?
Laravel队列由Redis驱动怎么配置_Laravel Redis队列使用教程
大学网站设计制作软件有哪些,如何将网站制作成自己app?
Laravel如何配置和使用缓存?(Redis代码示例)
Laravel如何与Docker(Sail)协同开发?(环境搭建教程)

