spring boot与kafka集成的简单实例

发布时间 - 2026-01-11 03:10:56    点击率:

本文介绍了spring boot与kafka集成的简单实例,分享给大家,具体如下:

引入相关依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.1.1.RELEASE</version>
</dependency>

从依赖项的引入即可看出,当前spring boot(1.4.2)还不支持完全以配置项的配置来实现与kafka的无缝集成。也就意味着必须通过java config的方式进行手工配置。

定义kafka基础配置

与redisTemplate及jdbcTemplate等类似。spring同样提供了org.springframework.kafka.core.KafkaTemplate作为kafka相关api操作的入口。

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

@Configuration
@EnableKafka
public class KafkaProducerConfig {

  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
    props.put(ProducerConfig.RETRIES_CONFIG, 0);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096);
    props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 40960);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    return props;
  }

  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }

  @Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<String, String>(producerFactory());
  }
}

KafkaTemplate依赖于ProducerFactory,而创建ProducerFactory时则通过一个Map指定kafka相关配置参数。通过KafkaTemplate对象即可实现消息发送。

kafkaTemplate.send("test-topic", "hello");
or
kafkaTemplate.send("test-topic", "key-1", "hello");

监听消息配置

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaConsumerConfig {

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(3);
    factory.getContainerProperties().setPollTimeout(3000);
    return factory;
  }

  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }


  public Map<String, Object> consumerConfigs() {
    Map<String, Object> propsMap = new HashMap<>();
    propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.179.200:9092");
    propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
    propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
    propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
    propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return propsMap;
  }

  @Bean
  public Listener listener() {
    return new Listener();
  }
}

实现消息监听的最终目标是得到监听器对象。该监听器对象自行实现。

import org.apache.kafka.clients.consumer.ConsumerRecord;
  import org.springframework.kafka.annotation.KafkaListener;

  import java.util.Optional;

  public class Listener {

  @KafkaListener(topics = {"test-topic"})
  public void listen(ConsumerRecord<?, ?> record) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    if (kafkaMessage.isPresent()) {
      Object message = kafkaMessage.get();
      System.out.println("listen1 " + message);
    }
  }
}

只需用@KafkaListener指定哪个方法处理消息即可。同时指定该方法用于监听kafka中哪些topic。

注意事项

定义监听消息配置时,GROUP_ID_CONFIG配置项的值用于指定消费者组的名称,如果同组中存在多个监听器对象则只有一个监听器对象能收到消息。

@KafkaListener中topics属性用于指定kafka topic名称,topic名称由消息生产者指定,也就是由kafkaTemplate在发送消息时指定。

KEY_DESERIALIZER_CLASS_CONFIG与VALUE_DESERIALIZER_CLASS_CONFIG指定key和value的编码、解码策略。kafka用key值确定value存放在哪个分区中。

后记

时间是解决问题的有效手段之一。

在spring boot 1.5版本中即可实现spring boot与kafka Auto-configuration

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。


# spring  # boot  # kafka集成  # springboot集成kafka  # SpringBoot集成Kafka的步骤  # Spring boot集成Kafka消息中间件代码实例  # Springboot集成Kafka实现producer和consumer的示例代码  # spring boot 与kafka集成的示例代码  # Spring Boot集成Kafka的示例代码  # Spring boot集成Kafka+Storm的示例代码  # springboot 1.5.2 集成kafka的简单例子  # Spring Boot 集成 Kafkad的实现示例  # 也就  # 放在  # 多个  # 是由  # 还不  # 给大家  # 只有一个  # 解决问题  # 来实现  # 大家多多  # 区中  # 组中  # 时则  # 最终目标  # 发送消息  # 只需用  # 依赖于  # clients  # producer  # apache 


相关栏目: 【 网站优化151355 】 【 网络推广146373 】 【 网络技术251813 】 【 AI营销90571


相关推荐: js实现点击每个li节点,都弹出其文本值及修改  奇安信“盘古石”团队突破 iOS 26.1 提权  Android仿QQ列表左滑删除操作  专业企业网站设计制作公司,如何理解商贸企业的统一配送和分销网络建设?  胶州企业网站制作公司,青岛石头网络科技有限公司怎么样?  深圳网站制作培训,深圳哪些招聘网站比较好?  网易LOFTER官网链接 老福特网页版登录地址  悟空识字如何进行跟读录音_悟空识字开启麦克风权限与录音  如何用AI帮你把自己的生活经历写成一个有趣的故事?  哪家制作企业网站好,开办像阿里巴巴那样的网络公司和网站要怎么做?  Python自然语言搜索引擎项目教程_倒排索引查询优化案例  香港服务器网站搭建教程-电商部署、配置优化与安全稳定指南  Laravel如何使用集合(Collections)进行数据处理_Laravel Collection常用方法与技巧  Laravel API路由如何设计_Laravel构建RESTful API的路由最佳实践  制作公司内部网站有哪些,内网如何建网站?  如何快速生成可下载的建站源码工具?  如何快速生成专业多端适配建站电话?  Laravel怎么生成二维码图片_Laravel集成Simple-QrCode扩展包与参数设置【实战】  北京网站制作费用多少,建立一个公司网站的费用.有哪些部分,分别要多少钱?  进行网站优化必须要坚持的四大原则  百度浏览器ai对话怎么关 百度浏览器ai聊天窗口隐藏  Laravel怎么实现前端Toast弹窗提示_Laravel Session闪存数据Flash传递给前端【方法】  怎么制作网站设计模板图片,有电商商品详情页面的免费模板素材网站推荐吗?  大连 网站制作,大连天途有线官网?  深圳网站制作平台,深圳市做网站好的公司有哪些?  动图在线制作网站有哪些,滑动动图图集怎么做?  微信推文制作网站有哪些,怎么做微信推文,急?  Laravel如何使用Laravel Vite编译前端_Laravel10以上版本前端静态资源管理【教程】  音响网站制作视频教程,隆霸音响官方网站?  C++时间戳转换成日期时间的步骤和示例代码  Laravel模型关联查询教程_Laravel Eloquent一对多关联写法  如何有效防御Web建站篡改攻击?  Laravel如何实现用户密码重置功能?(完整流程代码)  如何快速搭建高效WAP手机网站?  如何在云指建站中生成FTP站点?  Laravel怎么多语言本地化设置_Laravel语言包翻译与Locale动态切换【手册】  Laravel全局作用域是什么_Laravel Eloquent Global Scopes应用指南  javascript中数组(Array)对象和字符串(String)对象的常用方法总结  如何在 Python 中将列表项按字母顺序编号(a.、b.、c. …)  大同网页,大同瑞慈医院官网?  微信小程序 HTTPS报错整理常见问题及解决方案  香港服务器部署网站为何提示未备案?  jquery插件bootstrapValidator表单验证详解  Laravel如何实现数据导出到CSV文件_Laravel原生流式输出大数据量CSV【方案】  rsync同步时出现rsync: failed to set times on “xxxx”: Operation not permitted  怎么用AI帮你为初创公司进行市场定位分析?  Laravel队列由Redis驱动怎么配置_Laravel Redis队列使用教程  大学网站设计制作软件有哪些,如何将网站制作成自己app?  Laravel如何配置和使用缓存?(Redis代码示例)  Laravel如何与Docker(Sail)协同开发?(环境搭建教程)