前言:
今天朋友们对“kafka集成spring”大致比较看重,大家都想要学习一些“kafka集成spring”的相关文章。那么小编在网上收集了一些有关“kafka集成spring””的相关文章,希望各位老铁们能喜欢,小伙伴们快快来学习一下吧!背景
工作中Java开发大部分项目可能都是使用spring/springboot,好处就是可以很容易的集成其他技术或中间件。本文通过源码讲解了springboot集成kafka时如何消费的。
实例
pom.xml
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId></dependency>
application.yml
spring: kafka: bootstrap-servers: 127.0.0.1:9092 consumer: group-id: cosnumer-group enable-auto-commit: true auto-commit-interval: 100ms properties: session.timeout.ms: 15000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer auto-offset-reset: earliest producer: retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送 batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置 buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置 key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类 value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
KafkaConsumerListener.java
/** * 自定义bean **/@Componentpublic class KafkaConsumerListener { // 加个注解即可实现监听消费 @KafkaListener(topics = "app-test") public void receive(ConsumerRecord<?, ?> record) { // handle }}
可以看出通过非常简单的代码就可以实现KafkaConsumer的功能。
原理
Springboot项目的主函数(Main)一般都是SpringApplication.run(xx.class, args)。看过源码的应该都知道,springboot项目启动过程其实核心是在ApplicationContext中完成的,主要流程如下:
@Overridepublic void refresh() throws BeansException, IllegalStateException { synchronized (this.startupShutdownMonitor) { // Prepare this context for refreshing. prepareRefresh(); // Tell the subclass to refresh the internal bean factory. ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory(); // Prepare the bean factory for use in this context. prepareBeanFactory(beanFactory); try { // Allows post-processing of the bean factory in context subclasses. postProcessBeanFactory(beanFactory); // Invoke factory processors registered as beans in the context. invokeBeanFactoryPostProcessors(beanFactory); // Register bean processors that intercept bean creation. registerBeanPostProcessors(beanFactory); // Initialize message source for this context. initMessageSource(); // Initialize event multicaster for this context. initApplicationEventMulticaster(); // Initialize other special beans in specific context subclasses. onRefresh(); // Check for listener beans and register them. registerListeners(); // Instantiate all remaining (non-lazy-init) singletons. finishBeanFactoryInitialization(beanFactory); // Last step: publish corresponding event. finishRefresh(); } }}
其中bean的创建和初始化都是在finishBeanFactoryInitialization这一步完成的,spring bean初始化前后都会有相应的处理(类似于拦截器),见源码:
/** * Initialize the given bean instance, applying factory callbacks * as well as init methods and bean post processors. * <p>Called from {@link #createBean} for traditionally defined beans, * and from {@link #initializeBean} for existing bean instances. * @see #applyBeanPostProcessorsBeforeInitialization * @see #invokeInitMethods * @see #applyBeanPostProcessorsAfterInitialization */protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) { ... // 初始化前置处理 wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName); // 初始化 invokeInitMethods(beanName, wrappedBean, mbd); // 初始化后置处理 wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName); return wrappedBean;}
如果bean的方法中有使用注解@KafkaListener,则会在KafkaListenerAnnotationBeanPostProcessor#postProcessAfterInitialization 方法中做相应的处理
@Overridepublic Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { ... // 找出使用KafkaListener注解的方法 Map<Method, Set<KafkaListener>> annotatedMethods = xxx; for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) { Method method = entry.getKey(); for (KafkaListener listener : entry.getValue()) { // 做相应的处理 processKafkaListener(listener, method, bean, beanName); } }}protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener, Object bean, Object adminTarget, String beanName) { ... // 注册 KafkaEndpoint(记录consumer信息) this.registrar.registerEndpoint(endpoint, factory);}
上面的操作已经记录了所有Kafka Consumer的信息(入口是解析@KafkaListener)。
在所有bean创建并初始化之后会调用bean的afterPropertiesSet方法:
@Overridepublic void afterPropertiesSet() { registerAllEndpoints();}
这一步会逐一创建对应的KafkaMessageListenerContainer(KafkaListenerEndpointRegistry
#registerListenerContainer),记录所有的kafkaListenerContainer
/** * Create a message listener container for the given {@link KafkaListenerEndpoint}. * <p>This create the necessary infrastructure to honor that endpoint * with regards to its configuration. */ @SuppressWarnings("unchecked") public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory, boolean startImmediately) { // KafkaListenerContainerFactory#createListenerContainer(endpoint) MessageListenerContainer container = createListenerContainer(endpoint, factory); this.listenerContainers.put(id, container); }
在完成bean初始化之后(包括后置处理),会在finishRefresh这一步中启动一些继承了Lifecycle的bean
/** * Start the specified bean as part of the given set of Lifecycle beans, * making sure that any beans that it depends on are started first.*/private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) { Lifecycle bean = lifecycleBeans.remove(beanName); bean.start();}
KafkaListenerEndpointRegistry就继承了Lifecycle,所以相应的start方法就会调用
@Overridepublic void start() { for (MessageListenerContainer listenerContainer : getListenerContainers()) { if (this.contextRefreshed || listenerContainer.isAutoStartup()) { listenerContainer.start(); } } this.running = true;}
从源码中可以看到,依次会启动KafkaMessageListenerContainer
// 启动KafkaConsumer@Overrideprotected void doStart() { this.listenerConsumer = new ListenerConsumer(listener, listenerType); // 设置为可执行的状态,拉取数据的时候会用到 setRunning(true); this.listenerConsumerFuture = containerProperties .getConsumerTaskExecutor() .submitListenable(this.listenerConsumer);}
KafkaConsumer线程执行,就可以不断消费到数据了
@Overridepublic void run() { // running = true 开启拉取数据 while (isRunning()) { // 拉取数据 pollAndInvoke(); }}总结
springboot很容易的集成了kafka组件,kafka producer和consumer都帮我们封装好了,我们使用的时候只需要简单的配置和修改就可以运行了。但是其中的一些原理还得我们通过分析源码才能看出底层做了什么手脚,本文就通过源码介绍了项目启动过程中Kafka consumer是如何启动的。
标签: #kafka集成spring