KafkaTemplate发送KafKa消息

最近Kafka消息构造数据的自动化比较多,小总结一发

版本如下

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.0</version>
</dependency>

具体的思路就是调用spring-kafka里KafkaTemplate类的sendXXXX方法,具体可以查看该类

首先构造一个创建template的方法

package com.lihuia.qa.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* Copyright (C), lihuia.com
* FileName: KafkaBrokers
* Author: lihui
* Date: 2019/02/02
*/

public class KafkaBrokers {

private String kafkaZkServer;

public KafkaBrokers(String kafkaZkServer) {
this.kafkaZkServer = kafkaZkServer;
}

public KafkaTemplate<String, String> createTemplate() {

Map<String, Object> props = new HashMap<String, Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaZkServer);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

ProducerFactory<String, String> pf =
new DefaultKafkaProducerFactory(props);
KafkaTemplate<String, String> template = new KafkaTemplate<String, String>(pf);
return template;
}
}

在该方法里,主要是配置发送kafka消息的各种参数,但kafkaZkServer是由不同的测试环境来决定的,因此可以将它作为参数在类实例化的时候传进去,那么在你切换不同的Profile的时候,根据不同的spring配置读取该Server的值

比如,在applicationContext.xml里添加配置

<bean id="kafkaBrokers" class="com.lihui.qa.kafka.KafkaBrokers">
<constructor-arg index="0" value="${kafka.zkServer}"></constructor-arg>
</bean>

这里的${kafka.zkServer}就可以从不同的properties配置里获取

那么在实例化的时候,通过@Resource注解即可完成

比如举一个小例子

package demo;

import com.alibaba.fastjson.JSONObject;
import com.lihuia.qa.kafka.KafkaBrokers;
import com.lihuia.qa.kafka.KafkaMessageParam;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import javax.annotation.Resource;

/**
* Copyright (C), 2018-2019
* FileName: SendKafka
* Author: lihui
* Date: 2019/2/2
*/


@ContextConfiguration(locations = {
"classpath*:applicationContext.xml"
})
public class SendKafka {

@Resource
private KafkaBrokers kafkaBrokers;
private long currentTime;

@BeforeClass
public void beforeClass() {
currentTime = System.currentTimeMillis();
}

@Test(description = "发送kafka消息")
public void sendKafkaTest() {

String kafkaKey = "myKey";
String kafkaMessage = "kafkaMessage";

KafkaTemplate<String, String> template = kafkaBrokers.createTemplate();
template.setDefaultTopic("LIHUI_TEST_TOPIC");
template.sendDefault(kafkaKey,
JSONObject.toJSONString(KafkaMessageParam.builder()
.kafkaKey(kafkaKey)
.kafkaMessage(kafkaMessage)
.timestamp(currentTime)
.build()));
template.flush();
logger.info("Sending KafKa Message => LIHUI_TEST_TOPIC");
}
}

加载Spring的xml文件后,通过注解实例化完成kafkaTemplate的配置,最终调用sendDefault将消息发送出去

发表回复