Spring Kafka是Spring官方提供的一个Spring集成框架的扩展,用来为使用Spring框架的应用程序提供Kafka框架的集成。
- 2016-04-07Spring Kafka 1.0.0.M1 正式版
- 2017-10-11Spring框架 4.3.12 正式版
Spring Kafka配置详解
用来向Kafka集群发送消息。消息读取于Spring Integration channel。当前的版本需要你指定Topic和MessageKey。
final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class); channel.send( MessageBuilder.withPayload(payload) .setHeader("messageKey", "key") .setHeader("topic", "test").build());
channel是这样配置的
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="kafkaProducerContext" auto-startup="false" channel="inputToKafka"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/> </int-kafka:outbound-channel-adapter>
上面一个很关键的属性是kafka-producer-context-ref, 它用来配置Producer。
<int-kafka:producer-context id="kafkaProducerContext"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="localhost:9092" key-class-type="java.lang.String" value-class-type="java.lang.String" topic="test1" value-encoder="kafkaEncoder" key-encoder="kafkaEncoder" compression-codec="default"/> <int-kafka:producer-configuration broker-list="localhost:9092" topic="test2" compression-codec="default" async="true"/> <int-kafka:producer-configuration broker-list="localhost:9092" topic="regextopic.*" compression-codec="default"/> </int-kafka:producer-configurations> </int-kafka:producer-context>
可以看到, Spring将很多Kafka native codes中的配置抽象成SPring bean,通过Spring配置的方式生成Spring beans。 每个producer-configuration最终转换成一个Kafka producer。每个Topic都对应一个Producer。上面的例子会产生两个Producer,一个对应 topic test1,另外一个对应topic test2。 每个Producer都可以配置下面的属性:
broker-list List of comma separated brokers that this producer connects to topic Topic name or Java regex pattern of topic name compression-codec Compression method to be used. Default is no compression. Supported compression codec are gzip and snappy. Anything else would result in no compression value-encoder Serializer to be used for encoding messages. key-encoder Serializer to be used for encoding the partition key key-class-type Type of the key class. This will be ignored if no key-encoder is provided value-class-type Type of the value class. This will be ignored if no value-encoder is provided. partitioner Custom implementation of a Kafka Partitioner interface. async True/False - default is false. Setting this to true would make the Kafka producer to use an async producer batch-num-messages Numbe
value-encoder 和 key-encoder 可以引用其它的Spring bean。partitioner 也可以是一个实现Kafka Partitioner 接口的Spring bean。 这里有一个encoder的例子:
<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder"> <constructor-arg value="com.company.AvroGeneratedSpecificRecord" /> </bean>
如果没有配置,将采用Kafka默认的encoder。 默认的Encoder将数据视为byte数组。 如果key和消息都是字符串, Kafka提供Spring Encoder.它需要一个VerifiableProperties 作为构造函数参数。spring-integration-kafka提供可一个Properties对象封装。 所以你可以配置属性如:
<bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="send.buffer.bytes">5242880</prop> </props> </property> </bean> <int-kafka:producer-context id="kafkaProducerContext" producer-properties="producerProperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration> <int-kafka:producer-configuration ... > ... </int-kafka:producer-configuration> ... <int-kafka:producer-configurations> </int-kafka:producer-context>
Inbound Channel Adapter 用来消费消息。 消息会被放入到channel中。 Kafka提供两种方式的consumer API: High Level Consumer 和 Simple Consumer. 对于client来说,如果不需要特别的控制,只是用来出来目前的消息,High Level consumer API更直接,更易用。 但是它不提供offset管理。 如果你想获取前面的消息,或者重新获取已获取的消息, 它做不到。 你需要使用Simple consumer API. Spring Integration Kafka inbound channel adapter 当前只支持High Level Consumer.
<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="false" channel="inputFromKafka"> <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="5"/> </int-kafka:inbound-channel-adapter>
必须定义kafka-consumer-context-ref用来产生consumer。
<int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default" value-decoder="valueDecoder" key-decoder="valueDecoder" max-messages="5000"> <int-kafka:topic id="test1" streams="4"/> <int-kafka:topic id="test2" streams="4"/> </int-kafka:consumer-configuration> <int-kafka:consumer-configuration group-id="default3" value-decoder="kafkaSpecificDecoder" key-decoder="kafkaReflectionDecoder" max-messages="10"> <int-kafka:topic-filter pattern="regextopic.*" streams="4" exclude="false"/> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context>
需要配置zookeeper connection:
<int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="localhost:2181" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" />