Spring-Kafka(四)—— KafkaTemplate发送消息及结果回调

在前几章中,我们使用KafkaTemplate.send(String data)这个方法发送消息到Kafka中,显然这个方法并不能满足我们系统的需求,那我们需要查看一下KafkaTemplate所实现的接口,看看还提供了什么方法。当我们发送消息到Kafka后,我们又怎么去确认消息是否发送成功呢?这就涉及到KafkaTemplate的发送回调方法了。接下来我们开始正式讲解。

查看发送接口

首先我们Ctrl+鼠标左键进入KafkaTemplate的源代码中查看一下,可以看到有关发送的接口如下。这里的参数还是比较简单的,值得一提的事,方法中有个Long类型的时间戳(timestamp)参数,这是Kafka0.10版本提供的新功能,主要用来使用时间索引进行查询数据以及日志切分清除策略。还有一个ProducerRecord参数,这个类其实就是整合了topic、partition、data等数据的消费实体类。

稍微提一下这些参数都是什么意思吧: topic:这里填写的是Topic的名字 partition:这里填写的是分区的id,其实也是就第几个分区,id从0开始。表示指定发送到该分区中 timestamp:时间戳,一般默认当前时间戳 key:消息的键 data:消息的数据 ProducerRecord:消息对应的封装类,包含上述字段 Message<?>:Spring自带的Message封装类,包含消息及消息头

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ListenableFuture<SendResult<K, V>> sendDefault(V data);

ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, V data);

ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

ListenableFuture<SendResult<K, V>> send(Message<?> message);

使用sendDefault发送消息

首先在KafkaConfiguration编写一个带有默认Topic参数的KafkaTemplate,同时为另外一个KafkaTemplate加上@Primary注解,@Primary注解的意思是在拥有多个同类型的Bean时优先使用该Bean,到时候方便我们使用@Autowired注解自动注入。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
    //这个是我们之前编写的KafkaTemplate代码,加入@Primary注解
   @Bean
   @Primary
   public KafkaTemplate<Integer, String> kafkaTemplate() {
       KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
       return template;
  }

   @Bean("defaultKafkaTemplate")
   public KafkaTemplate<Integer, String> defaultKafkaTemplate() {
       KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
       template.setDefaultTopic("topic.quick.default");
       return template;
  }

接着编写测试方法,可以看到我们这里调用的是sendDefault方法,而且并没有在方法参数上添加topicName,这是因为我们在声明defaultKafkaTemplate这个Bean的时候添加了这行代码 template.setDefaultTopic(“topic.quick.default”),只要调用sendDefault方法,kafkaTemplate会自动把消息发送到名为"topic.quick.default"的Topic中。

1
2
3
4
5
6
7
    @Resource
   private KafkaTemplate defaultKafkaTemplate;

   @Test
   public void testDefaultKafkaTemplate() {
       defaultKafkaTemplate.sendDefault("I`m send msg to default topic");
  }

img

测试结果

这里也顺便测试一下其他几个吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
    @Test
   public void testTemplateSend() {
       //发送带有时间戳的消息
       kafkaTemplate.send("topic.quick.demo", 0, System.currentTimeMillis(), 0, "send message with timestamp");

       //使用ProducerRecord发送消息
       ProducerRecord record = new ProducerRecord("topic.quick.demo", "use ProducerRecord to send message");
       kafkaTemplate.send(record);

       //使用Message发送消息
       Map map = new HashMap();
       map.put(KafkaHeaders.TOPIC, "topic.quick.demo");
       map.put(KafkaHeaders.PARTITION_ID, 0);
       map.put(KafkaHeaders.MESSAGE_KEY, 0);
       GenericMessage message = new GenericMessage("use Message to send message",new MessageHeaders(map));
       kafkaTemplate.send(message);
  }

消息结果回调

一般来说我们都会去获取KafkaTemplate发送消息的结果去判断消息是否发送成功,如果消息发送失败,则会重新发送或者执行对应的业务逻辑。所以这里我们去实现这个功能。

KafkaSendResultHandler

第一步还是编写一个消息结果回调类KafkaSendResultHandler。当我们使用KafkaTemplate发送消息成功的时候回调用OnSuccess方法,发送失败则会调用onError方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class KafkaSendResultHandler implements ProducerListener {

   private static final Logger log = LoggerFactory.getLogger(KafkaSendResultHandler.class);

   @Override
   public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
       log.info("Message send success : " + producerRecord.toString());
  }

   @Override
   public void onError(ProducerRecord producerRecord, Exception exception) {
       log.info("Message send error : " + producerRecord.toString());
  }
}

接下来就使用KafkaSendResultHandler实现消息发送结果回调,这里为什么又要休眠,稍后进行讲解

1
2
3
4
5
6
7
8
9
    @Autowired
   private KafkaSendResultHandler producerListener;

   @Test
   public void testProducerListen() throws InterruptedException {
       kafkaTemplate.setProducerListener(producerListener);
       kafkaTemplate.send("topic.quick.demo", "test producer listen");
       Thread.sleep(1000);
  }

运行测试方法,我们可以看到控制台输出的日志如下

1
2018-09-08 15:51:39.975  INFO 10268 --- [ad | producer-1] c.v.k.handler.KafkaSendResultHandler     : Message send success : ProducerRecord(topic=topic.quick.demo, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=test producer listen, timestamp=null)

KafkaTemplate异步发送消息

上文提及了发送消息的时候需要休眠一下,否则发送时间较长的时候会导致进程提前关闭导致无法调用回调时间。主要是因为KafkaTemplate发送消息是采取异步方式发送的,我们可以看下KafkaTemplate的源代码

这是我们刚才调用的发送消息方法,可以看到KafkaTemplate会使用ProducerRecord把我们传递进来的参数再一次封装,最后调用doSend方法发送消息到Kafka中

send(String topic, V data)
1
2
3
4
    public ListenableFuture<SendResult<K, V>> send(String topic, V data) {
       ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, data);
       return this.doSend(producerRecord);
  }
ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord)

doSend方法先是检测是否开启事务,紧接着使用SettableListenableFuture发送消息,然后判断是否启动自动冲洗数据到Kafka中,我们再接着看看SettableListenableFuture实现了什么接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
    protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) {
       if (this.transactional) {
           Assert.state(this.inTransaction(), "No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record");
      }

       final Producer<K, V> producer = this.getTheProducer();
       if (this.logger.isTraceEnabled()) {
           this.logger.trace("Sending: " + producerRecord);
      }

       final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture();
       producer.send(producerRecord, new Callback() {
           public void onCompletion(RecordMetadata metadata, Exception exception) {
               try {
                   if (exception == null) {
                       future.set(new SendResult(producerRecord, metadata));
                       if (KafkaTemplate.this.producerListener != null) {
                           KafkaTemplate.this.producerListener.onSuccess(producerRecord, metadata);
                      }

                       if (KafkaTemplate.this.logger.isTraceEnabled()) {
                           KafkaTemplate.this.logger.trace("Sent ok: " + producerRecord + ", metadata: " + metadata);
                      }
                  } else {
                       future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception));
                       if (KafkaTemplate.this.producerListener != null) {
                           KafkaTemplate.this.producerListener.onError(producerRecord, exception);
                      }

                       if (KafkaTemplate.this.logger.isDebugEnabled()) {
                           KafkaTemplate.this.logger.debug("Failed to send: " + producerRecord, exception);
                      }
                  }
              } finally {
                   if (!KafkaTemplate.this.transactional) {
                       KafkaTemplate.this.closeProducer(producer, false);
                  }

              }

          }
      });
       if (this.autoFlush) {
           this.flush();
      }

       if (this.logger.isTraceEnabled()) {
           this.logger.trace("Sent: " + producerRecord);
      }

       return future;
  }

可以看到SettableListenableFuture实现了ListenableFuture接口,ListenableFuture则实现了Future接口,Future是Java自带的实现异步编程的接口,支持返回值的异步,而我们使用Thread或者Runnable都是不带返回值的。

1
2
public class SettableListenableFuture<T> implements ListenableFuture<T>
public interface ListenableFuture<T> extends Future<T>

KafkaTemplate同步发送消息

KafkaTemplate异步发送消息大大的提升了生产者的并发能力,但某些场景下我们并不需要异步发送消息,这个时候我们可以采取同步发送方式,实现也是非常简单的,我们只需要在send方法后面调用get方法即可。Future模式中,我们采取异步执行事件,等到需要返回值得时候我们再调用get方法获取future的返回值

1
2
3
4
    @Test
   public void testSyncSend() throws ExecutionException, InterruptedException {
       kafkaTemplate.send("topic.quick.demo", "test sync send message").get();
  }

get方法还有一个比较有意思的重载方法,get(long timeout, TimeUnit unit),当send方法耗时大于get方法所设定的参数时会抛出一个超时异常,但需要注意,这里仅抛出异常,消息还是会发送成功的。这里的测试方法设置send耗时必须小于 一微秒(那必须得失败呀,嘿嘿嘿),运行后我们可以看到抛出的异常,但也发现消息能发送成功并被监听器接收了。那这功能有什么作用呢,如果还没有接触过SQL慢查询可以去了解一下,使用该方法作为SQL慢查询记录的条件。

1
2
3
4
    @Test
   public void testTimeOut() throws ExecutionException, InterruptedException, TimeoutException {
       kafkaTemplate.send("topic.quick.demo", "test send message timeout").get(1,TimeUnit.MICROSECONDS);
  }
1
2
3
2018-09-08 16:36:09.110  INFO 7724 --- [     demo-0-C-1] com.viu.kafka.listen.DemoListener        : demo receive : test send message timeout

java.util.concurrent.TimeoutException

更多文章请关注该 Spring-Kafka史上最强入门教程 专题