kafka Topic not present in metadata after 200 ms 引发的思考(下)

前言

继上篇《kafka Topic not present in metadata after 200 ms 引发的思考(上)》,我们尝试下重试方案是否可行。

尝试自己重试方案

package com.onepiece.kafka.listener;
import com.onepiece.cache.service.CacheService;
import com.onepiece.cache.service.impl.CacheServiceImpl;
import com.onepiece.kafka.constant.Constant;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.regex.Pattern;
/**
* @author wanghaifeng
*/
@Component
@Slf4j
public class RetryProducerListener implements ProducerListener, ApplicationContextAware {
private static final String TOPIC_NOT_PRESENT_PATTERN = "Topic .* not present in metadata after .* ms.";
private static final Pattern REGEX = Pattern.compile(TOPIC_NOT_PRESENT_PATTERN);
private KafkaTemplate kafkaTemplate;
private static CacheService cacheService;
public RetryProducerListener(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 精准匹配异常类型和errormessage,使用缓存控制重试次数
*
* @param producerRecord the failed record
* @param exception the exception thrown
*/
@SneakyThrows
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
if (exception instanceof TimeoutException && REGEX.matcher(exception.getMessage()).matches()) {
Header header = producerRecord.headers().lastHeader(Constant.MESSAGE_ID);
String messageId = new String(header.value(), "UTF-8");
String retryKey = "producer_fail_retry?" + messageId;
Integer failRetryTimes = cacheService.get(retryKey, Integer.class);
if (ObjectUtils.isEmpty(failRetryTimes) || failRetryTimes < 3) {
cacheService.incr(retryKey);
// 这里不需要sleep怕网络短时间没没有恢复,继续发送无效,因为获取metadata本来就有waittime通过max.block.ms可以控制
kafkaTemplate.send(producerRecord);
return;
}
}
log.error("onError error:", exception);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
CacheServiceImpl cacheServiceImpl = (CacheServiceImpl) applicationContext.getBean("cacheServiceImpl");
cacheService = cacheServiceImpl;
}
}
package com.onepiece.kafka.listener;



import com.onepiece.cache.service.CacheService;
import com.onepiece.cache.service.impl.CacheServiceImpl;
import com.onepiece.kafka.constant.Constant;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;


import java.util.regex.Pattern;


/**
 * @author wanghaifeng
 */
@Component
@Slf4j
public class RetryProducerListener implements ProducerListener, ApplicationContextAware {
    private static final String TOPIC_NOT_PRESENT_PATTERN = "Topic .* not present in metadata after .* ms.";
    private static final Pattern REGEX = Pattern.compile(TOPIC_NOT_PRESENT_PATTERN);
    private KafkaTemplate kafkaTemplate;
    private static CacheService cacheService;

    public RetryProducerListener(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }


    /**
     * 精准匹配异常类型和errormessage,使用缓存控制重试次数
     *
     * @param producerRecord the failed record
     * @param exception      the exception thrown
     */
    @SneakyThrows
    @Override
    public void onError(ProducerRecord producerRecord, Exception exception) {
        if (exception instanceof TimeoutException && REGEX.matcher(exception.getMessage()).matches()) {
            Header header = producerRecord.headers().lastHeader(Constant.MESSAGE_ID);
            String messageId = new String(header.value(), "UTF-8");


            String retryKey = "producer_fail_retry?" + messageId;

            Integer failRetryTimes = cacheService.get(retryKey, Integer.class);
            if (ObjectUtils.isEmpty(failRetryTimes) || failRetryTimes < 3) {
                cacheService.incr(retryKey);
                // 这里不需要sleep怕网络短时间没没有恢复,继续发送无效,因为获取metadata本来就有waittime通过max.block.ms可以控制
                kafkaTemplate.send(producerRecord);
                return;
            }
        }
        log.error("onError error:", exception);
    }


    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        CacheServiceImpl cacheServiceImpl = (CacheServiceImpl) applicationContext.getBean("cacheServiceImpl");
        cacheService = cacheServiceImpl;
    }
}
package com.onepiece.kafka.listener; import com.onepiece.cache.service.CacheService; import com.onepiece.cache.service.impl.CacheServiceImpl; import com.onepiece.kafka.constant.Constant; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.header.Header; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.ProducerListener; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.regex.Pattern; /** * @author wanghaifeng */ @Component @Slf4j public class RetryProducerListener implements ProducerListener, ApplicationContextAware { private static final String TOPIC_NOT_PRESENT_PATTERN = "Topic .* not present in metadata after .* ms."; private static final Pattern REGEX = Pattern.compile(TOPIC_NOT_PRESENT_PATTERN); private KafkaTemplate kafkaTemplate; private static CacheService cacheService; public RetryProducerListener(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } /** * 精准匹配异常类型和errormessage,使用缓存控制重试次数 * * @param producerRecord the failed record * @param exception the exception thrown */ @SneakyThrows @Override public void onError(ProducerRecord producerRecord, Exception exception) { if (exception instanceof TimeoutException && REGEX.matcher(exception.getMessage()).matches()) { Header header = producerRecord.headers().lastHeader(Constant.MESSAGE_ID); String messageId = new String(header.value(), "UTF-8"); String retryKey = "producer_fail_retry?" + messageId; Integer failRetryTimes = cacheService.get(retryKey, Integer.class); if (ObjectUtils.isEmpty(failRetryTimes) || failRetryTimes < 3) { cacheService.incr(retryKey); // 这里不需要sleep怕网络短时间没没有恢复,继续发送无效,因为获取metadata本来就有waittime通过max.block.ms可以控制 kafkaTemplate.send(producerRecord); return; } } log.error("onError error:", exception); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { CacheServiceImpl cacheServiceImpl = (CacheServiceImpl) applicationContext.getBean("cacheServiceImpl"); cacheService = cacheServiceImpl; } }
@Bean
@Primary
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
kafkaTemplate.setProducerListener(new MyProducerListener(kafkaTemplate));
return kafkaTemplate;
}
@Bean
@Primary
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory);
    kafkaTemplate.setProducerListener(new MyProducerListener(kafkaTemplate));
    return kafkaTemplate;
}
@Bean @Primary public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory producerFactory) { KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory); kafkaTemplate.setProducerListener(new MyProducerListener(kafkaTemplate)); return kafkaTemplate; }

具体重试方式注释已经描述了,需要注意的是这里有个特殊的用法:

  • 因为MyProducerListener和KafkaTemplate之间互相依赖不能直接把MyProducerListener配置成一个bean,因此采用了构造函数+bean实现ApplicationContext+静态方法来满足MyProducerListener的依赖。

实验是否生效

实验所需:

spring:
kafka:
producer:
#额外参数 见https://kafka.apache.org/documentation/#producerconfigs
properties:
#update metadata 超时时间
max:
block:
ms: 1
batch-size: 1
spring:
  kafka:
    producer:
      #额外参数 见https://kafka.apache.org/documentation/#producerconfigs
      properties:
        #update metadata 超时时间
        max:
          block:
            ms: 1
      batch-size: 1
spring: kafka: producer: #额外参数 见https://kafka.apache.org/documentation/#producerconfigs properties: #update metadata 超时时间 max: block: ms: 1 batch-size: 1
kafkaTemplate.send(producerRecord);
kafkaTemplate.send(producerRecord);
kafkaTemplate.send(producerRecord);

发送消息,在MyProducerListener#onError中打个断点观察下:
image.png

发现

发现只有linstener两次:
image.png
第三次发送了但是没有listener到,并且再次发送新消息也不会,除非重启服务,猜测是kafka client做了控制,通过在listener的断点向前看下堆栈找到:
image.png
其实并不是:
发现在980行已经拿到了metadata并返回了,所以是发送成功了,max.block.ms控制的是999行的等待时间,970行的metata.featch()并不是实时请求而是从缓存中获取:

package org.apache.kafka.clients;
public class Metadata implements Closeable {
public synchronized Cluster fetch() {
return cache.cluster();
}
}
package org.apache.kafka.clients;
public class Metadata implements Closeable {
public synchronized Cluster fetch() {
    return cache.cluster();
}
}
package org.apache.kafka.clients; public class Metadata implements Closeable { public synchronized Cluster fetch() { return cache.cluster(); } }

image.png
在120行打个断点发现是kafka的异步Sender线程会不停的刷MetadataCache。

进一步看看Sender线程是如何刷MetadataCache的

先看下Sender是被哪个线程触发执行的:
image.png
发送的时候会触发构造KafkaProducer会在构造函数中触发KafkaThread线程运行。

一般不在构造函数中做这么多逻辑处理,因为构造函数的主要作用就是初始化对象状态,如果初始化过程中发生了异常对象就无法成功创建。这里居然还启动了一个线程.

时序图

kafka刷MetadataCache.drawio.png

  1. 发送的时候会创建KafkaProducer对象,在构造函数中触发KafkaThread线程运行。
  2. 调用Sender#run拉取metadata:
    Sender#runOnce
void runOnce() {
if (transactionManager != null) {
try {
transactionManager.resetProducerIdIfNeeded();
if (!transactionManager.isTransactional()) {
// this is an idempotent producer, so make sure we have a producer id
maybeWaitForProducerId();
} else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
transactionManager.transitionToFatalError(
new KafkaException("The client hasn't received acknowledgment for " +
"some previously sent messages and can no longer retry them. It isn't safe to continue."));
} else if (maybeSendAndPollTransactionalRequest()) {
return;
}
// do not continue sending if the transaction manager is in a failed state or if there
// is no producer id (for the idempotent case).
if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
} else if (transactionManager.hasAbortableError()) {
accumulator.abortUndrainedBatches(transactionManager.lastError());
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request: {}", e);
transactionManager.authenticationFailed(e);
}
}
long currentTimeMs = time.milliseconds();
//发送消息
long pollTimeout = sendProducerData(currentTimeMs);
client.poll(pollTimeout, currentTimeMs);
}
void runOnce() {
    if (transactionManager != null) {
        try {
            transactionManager.resetProducerIdIfNeeded();


            if (!transactionManager.isTransactional()) {
                // this is an idempotent producer, so make sure we have a producer id
                maybeWaitForProducerId();
            } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) {
                transactionManager.transitionToFatalError(
                    new KafkaException("The client hasn't received acknowledgment for " +
                        "some previously sent messages and can no longer retry them. It isn't safe to continue."));
            } else if (maybeSendAndPollTransactionalRequest()) {
                return;
            }

            // do not continue sending if the transaction manager is in a failed state or if there
            // is no producer id (for the idempotent case).
            if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) {
                RuntimeException lastError = transactionManager.lastError();
                if (lastError != null)
                    maybeAbortBatches(lastError);
                client.poll(retryBackoffMs, time.milliseconds());
                return;
            } else if (transactionManager.hasAbortableError()) {
                accumulator.abortUndrainedBatches(transactionManager.lastError());
            }
        } catch (AuthenticationException e) {
            // This is already logged as error, but propagated here to perform any clean ups.
            log.trace("Authentication exception while processing transactional request: {}", e);
            transactionManager.authenticationFailed(e);
        }
    }



    long currentTimeMs = time.milliseconds();
    //发送消息
    long pollTimeout = sendProducerData(currentTimeMs);
    client.poll(pollTimeout, currentTimeMs);
}
void runOnce() { if (transactionManager != null) { try { transactionManager.resetProducerIdIfNeeded(); if (!transactionManager.isTransactional()) { // this is an idempotent producer, so make sure we have a producer id maybeWaitForProducerId(); } else if (transactionManager.hasUnresolvedSequences() && !transactionManager.hasFatalError()) { transactionManager.transitionToFatalError( new KafkaException("The client hasn't received acknowledgment for " + "some previously sent messages and can no longer retry them. It isn't safe to continue.")); } else if (maybeSendAndPollTransactionalRequest()) { return; } // do not continue sending if the transaction manager is in a failed state or if there // is no producer id (for the idempotent case). if (transactionManager.hasFatalError() || !transactionManager.hasProducerId()) { RuntimeException lastError = transactionManager.lastError(); if (lastError != null) maybeAbortBatches(lastError); client.poll(retryBackoffMs, time.milliseconds()); return; } else if (transactionManager.hasAbortableError()) { accumulator.abortUndrainedBatches(transactionManager.lastError()); } } catch (AuthenticationException e) { // This is already logged as error, but propagated here to perform any clean ups. log.trace("Authentication exception while processing transactional request: {}", e); transactionManager.authenticationFailed(e); } } long currentTimeMs = time.milliseconds(); //发送消息 long pollTimeout = sendProducerData(currentTimeMs); client.poll(pollTimeout, currentTimeMs); }
  1. 调用NetworkClient#poll进行请求及处理响应:
@Override
public List<ClientResponse> poll(long timeout, long now) {
//确保client是活跃的
ensureActive();
//处理异常响应
if (!abortedSends.isEmpty()) {
// If there are aborted sends because of unsupported version exceptions or disconnects,
// handle them immediately without waiting for Selector#poll.
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//监听和处理kafaka服务器发送来的响应
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// process completed actions
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
//如果没有拿到预期响应则在发送完成时返回
handleCompletedSends(responses, updatedNow);
//处理完成后的接收
handleCompletedReceives(responses, updatedNow);
handleDisconnections(responses, updatedNow);
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
@Override
public List<ClientResponse> poll(long timeout, long now) {
    //确保client是活跃的
    ensureActive();
    //处理异常响应
    if (!abortedSends.isEmpty()) {
        // If there are aborted sends because of unsupported version exceptions or disconnects,
        // handle them immediately without waiting for Selector#poll.
        List<ClientResponse> responses = new ArrayList<>();
        handleAbortedSends(responses);
        completeResponses(responses);
        return responses;
    }


    long metadataTimeout = metadataUpdater.maybeUpdate(now);
    try {
        //监听和处理kafaka服务器发送来的响应
        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }


    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();
    //如果没有拿到预期响应则在发送完成时返回
    handleCompletedSends(responses, updatedNow);
    //处理完成后的接收
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);


    return responses;
}
@Override public List<ClientResponse> poll(long timeout, long now) { //确保client是活跃的 ensureActive(); //处理异常响应 if (!abortedSends.isEmpty()) { // If there are aborted sends because of unsupported version exceptions or disconnects, // handle them immediately without waiting for Selector#poll. List<ClientResponse> responses = new ArrayList<>(); handleAbortedSends(responses); completeResponses(responses); return responses; } long metadataTimeout = metadataUpdater.maybeUpdate(now); try { //监听和处理kafaka服务器发送来的响应 this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs)); } catch (IOException e) { log.error("Unexpected error during I/O", e); } // process completed actions long updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); //如果没有拿到预期响应则在发送完成时返回 handleCompletedSends(responses, updatedNow); //处理完成后的接收 handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleInitiateApiVersionRequests(updatedNow); handleTimedOutRequests(responses, updatedNow); completeResponses(responses); return responses; }
  1. 调用NetworkClient#handleCompletedReceives处理完成的接收:
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
InFlightRequest req = inFlightRequests.completeNext(source);
Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
throttleTimeSensor, now);
if (log.isTraceEnabled()) {
log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
req.header.apiKey(), req.header.correlationId(), responseStruct);
}
// If the received response includes a throttle delay, throttle the connection.
AbstractResponse body = AbstractResponse.
parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
maybeThrottle(body, req.header.apiVersion(), req.destination, now);
if (req.isInternalRequest && body instanceof MetadataResponse)
//拿到的是MetadataResponse的话就更新matadata
metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
else
responses.add(req.completed(body, now));
}
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
    for (NetworkReceive receive : this.selector.completedReceives()) {
        String source = receive.source();
        InFlightRequest req = inFlightRequests.completeNext(source);
        Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header,
            throttleTimeSensor, now);
        if (log.isTraceEnabled()) {
            log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination,
                req.header.apiKey(), req.header.correlationId(), responseStruct);
        }
        // If the received response includes a throttle delay, throttle the connection.
        AbstractResponse body = AbstractResponse.
                parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion());
        maybeThrottle(body, req.header.apiVersion(), req.destination, now);
        if (req.isInternalRequest && body instanceof MetadataResponse)
          //拿到的是MetadataResponse的话就更新matadata       
           metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body);
        else if (req.isInternalRequest && body instanceof ApiVersionsResponse)
            handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body);
        else
            responses.add(req.completed(body, now));
    }
}
private void handleCompletedReceives(List<ClientResponse> responses, long now) { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); if (log.isTraceEnabled()) { log.trace("Completed receive from node {} for {} with correlation id {}, received {}", req.destination, req.header.apiKey(), req.header.correlationId(), responseStruct); } // If the received response includes a throttle delay, throttle the connection. AbstractResponse body = AbstractResponse. parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion()); maybeThrottle(body, req.header.apiVersion(), req.destination, now); if (req.isInternalRequest && body instanceof MetadataResponse) //拿到的是MetadataResponse的话就更新matadata metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); else if (req.isInternalRequest && body instanceof ApiVersionsResponse) handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) body); else responses.add(req.completed(body, now)); } }
  1. 调用Metadata#update进行更新:
public synchronized void update(int requestVersion, MetadataResponse response, long now) {
Objects.requireNonNull(response, "Metadata response cannot be null");
if (isClosed())
throw new IllegalStateException("Update requested after metadata close");
if (requestVersion == this.requestVersion)
this.needUpdate = false;
else
requestUpdate();
this.lastRefreshMs = now;
this.lastSuccessfulRefreshMs = now;
this.updateVersion += 1;
String previousClusterId = cache.cluster().clusterResource().clusterId();
//根据响应构造metadta
this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now));
Cluster cluster = cache.cluster();
maybeSetMetadataError(cluster);
this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now));
String newClusterId = cache.cluster().clusterResource().clusterId();
if (!Objects.equals(previousClusterId, newClusterId)) {
log.info("Cluster ID: {}", newClusterId);
}
clusterResourceListeners.onUpdate(cache.cluster().clusterResource());
log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);
}
public synchronized void update(int requestVersion, MetadataResponse response, long now) {
    Objects.requireNonNull(response, "Metadata response cannot be null");
    if (isClosed())
        throw new IllegalStateException("Update requested after metadata close");


    if (requestVersion == this.requestVersion)
        this.needUpdate = false;
    else
        requestUpdate();


    this.lastRefreshMs = now;
    this.lastSuccessfulRefreshMs = now;
    this.updateVersion += 1;


    String previousClusterId = cache.cluster().clusterResource().clusterId();
    //根据响应构造metadta
    this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now));


    Cluster cluster = cache.cluster();
    maybeSetMetadataError(cluster);

    this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now));

    String newClusterId = cache.cluster().clusterResource().clusterId();
    if (!Objects.equals(previousClusterId, newClusterId)) {
        log.info("Cluster ID: {}", newClusterId);
    }
    clusterResourceListeners.onUpdate(cache.cluster().clusterResource());


    log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);
}
public synchronized void update(int requestVersion, MetadataResponse response, long now) { Objects.requireNonNull(response, "Metadata response cannot be null"); if (isClosed()) throw new IllegalStateException("Update requested after metadata close"); if (requestVersion == this.requestVersion) this.needUpdate = false; else requestUpdate(); this.lastRefreshMs = now; this.lastSuccessfulRefreshMs = now; this.updateVersion += 1; String previousClusterId = cache.cluster().clusterResource().clusterId(); //根据响应构造metadta this.cache = handleMetadataResponse(response, topic -> retainTopic(topic.topic(), topic.isInternal(), now)); Cluster cluster = cache.cluster(); maybeSetMetadataError(cluster); this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, now)); String newClusterId = cache.cluster().clusterResource().clusterId(); if (!Objects.equals(previousClusterId, newClusterId)) { log.info("Cluster ID: {}", newClusterId); } clusterResourceListeners.onUpdate(cache.cluster().clusterResource()); log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache); }

幂等控制

因为加入了重试,所以可能会出现重复发送的异常(比如发送到了broker但是producer没有等到ack),因此需要控制幂等。

这里我们使用的我们使用的是spring kafka 2.3(kafka client也是2.3),看看官方文档,发现是支持幂等控制配置(enable.idempotence)的,但是限制颇多:

When set to 'true', the producer will ensure that exactly one copy of each message is
written in the stream. If 'false', producer retries due to broker failures, etc., may
write duplicates of the retried message in the stream. Note that enabling idempotence
requires max.in.flight.requests.per.connection to be less than or equal to 5, retries
to be greater than 0 and acks must be 'all'. If these values are not explicitly set by
the user, suitable values will be chosen. If incompatible values are set, a
ConfigException will be thrown.
When set to 'true', the producer will ensure that exactly one copy of each message is 
written in the stream. If 'false', producer retries due to broker failures, etc., may 
write duplicates of the retried message in the stream. Note that enabling idempotence 
requires max.in.flight.requests.per.connection to be less than or equal to 5, retries 
to be greater than 0 and acks must be 'all'. If these values are not explicitly set by 
the user, suitable values will be chosen. If incompatible values are set, a 
ConfigException will be thrown.
When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5, retries to be greater than 0 and acks must be 'all'. If these values are not explicitly set by the user, suitable values will be chosen. If incompatible values are set, a ConfigException will be thrown.

主要是会影响性能,实现幂等并不复杂我们可以自己尝试下

思考

版本1

实现ConsumerInterceptor,计算record value的hash,使用hash作为是否被消费的标识进而控制幂等,但是:

  • 会不会有发送的消息内容就是一样的业务?
    可能会,因此pass,考虑发送的时候在消息头部塞入一个messageId
  • 如果消费失败了咋办?使用ConsumerInterceptor实现关注不了消费情况。那使用spring aspect切@KafkaListener实现?
    看了下使用@kafkaLinstener位置,发现有的地方使用String message有的地方使用ConsumerRecord作为入参数,因此不好拿到messageId,放弃这个方案,那么怎么处理消费失败的情况?还是说真的要考虑?
    并不需要,默认使用的异步提交,并不关注消费状态,因此不做过多考虑,如果要考虑也可以使用其他方案。

版本2

实现ProducerInterceptor在头部塞入messageId,实现ConsumerInterceptor使用messgeId进行幂等控制,那么:

  • 幂等控制标识存放在哪里?
    一个分区只会被一个消费组中的消费者消费,因此使用本地缓存即可。
  • 需要控制多久?
    不过分考虑极端情况,如果业务非常重要且不允许重复消费,可以在业务层单独控制,设置有效期为20s
  • 光一个messgeId就够了吗?
    不够,一个分区只能被一个消费组中的消费者消费,但是允许多个消费组,因此幂等控制标识为messageId+groupId

幂等控制实现

OnepieceKafkaIdempotentProducerInterceptor

package com.onepiece.kafka.interceptor;
import cn.hutool.core.util.IdUtil;
import com.onepiece.kafka.constant.Constant;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.springframework.util.ObjectUtils;
import java.util.Map;
/**
* 幂等配合控制producer拦截器
*
* @author wanghaifeng
*/
public class OnepieceKafkaIdempotentProducerInterceptor implements ProducerInterceptor<String, Object> {
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
Header messageIdHeader = record.headers().lastHeader("message_id");
if (ObjectUtils.isEmpty(messageIdHeader)) {
record.headers().add(Constant.MESSAGE_ID, IdUtil.objectId().getBytes());
}
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
package com.onepiece.kafka.interceptor;




import cn.hutool.core.util.IdUtil;
import com.onepiece.kafka.constant.Constant;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.springframework.util.ObjectUtils;


import java.util.Map;

/**
 * 幂等配合控制producer拦截器
 *
 * @author wanghaifeng
 */
public class OnepieceKafkaIdempotentProducerInterceptor implements ProducerInterceptor<String, Object> {
    @Override
    public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) {
        Header messageIdHeader = record.headers().lastHeader("message_id");


        if (ObjectUtils.isEmpty(messageIdHeader)) {
            record.headers().add(Constant.MESSAGE_ID, IdUtil.objectId().getBytes());
        }


        return record;
    }


    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

    }



    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }

}
package com.onepiece.kafka.interceptor; import cn.hutool.core.util.IdUtil; import com.onepiece.kafka.constant.Constant; import org.apache.kafka.clients.producer.ProducerInterceptor; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.header.Header; import org.springframework.util.ObjectUtils; import java.util.Map; /** * 幂等配合控制producer拦截器 * * @author wanghaifeng */ public class OnepieceKafkaIdempotentProducerInterceptor implements ProducerInterceptor<String, Object> { @Override public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> record) { Header messageIdHeader = record.headers().lastHeader("message_id"); if (ObjectUtils.isEmpty(messageIdHeader)) { record.headers().add(Constant.MESSAGE_ID, IdUtil.objectId().getBytes()); } return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }

加入配置:
image.png
可以看到放在前面,放在后面的ConsumerInterceptor就可以打印出messgaeId了。

OnepieceKafkaIdempotentConsumerInterceptor

package com.onepiece.kafka.interceptor;
import com.onepiece.cache.service.LocalCacheService;
import com.onepiece.cache.service.impl.LocalCacheServiceImpl;
import com.onepiece.kafka.constant.Constant;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
/**
* 幂等配合控制consumer拦截器
*
* @author wanghaifeng
*/
@Slf4j
@Component
public class OnepieceKafkaIdempotentConsumerInterceptor implements ConsumerInterceptor<String, Object>, ApplicationContextAware {
private static LocalCacheService localCacheService;
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
@SneakyThrows
@Override
public ConsumerRecords onConsume(ConsumerRecords<String, Object> records) {
Map<TopicPartition, List<ConsumerRecord<String, Object>>> newRecords = new HashMap<>(records.count() * 2);
Iterator<ConsumerRecord<String, Object>> iterator = records.iterator();
while (iterator.hasNext()) {
ConsumerRecord<String, Object> record = iterator.next();
Header header = record.headers().lastHeader(Constant.MESSAGE_ID);
if (ObjectUtils.isEmpty(header)) {
newRecords.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
continue;
}
String key = KafkaUtils.getConsumerGroupId() + "-" + new String(header.value(), "UTF-8");
Object cacheObj = localCacheService.get(Constant.KAFKA_IDEMPOTENT_CACHE_INSTANCE_NAME, key, Constant.KAFKA_IDEMPOTENT_CACHE_SECONDS, Constant.KAFKA_IDEMPOTENT_CACHE_MAX_SIZE);
if (!ObjectUtils.isEmpty(cacheObj)) {
log.info("record:{},groupId:{},已经存在将不放入newRecords", record, KafkaUtils.getConsumerGroupId());
} else {
newRecords.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
localCacheService.set(Constant.KAFKA_IDEMPOTENT_CACHE_INSTANCE_NAME, key, "1", Constant.KAFKA_IDEMPOTENT_CACHE_SECONDS, Constant.KAFKA_IDEMPOTENT_CACHE_MAX_SIZE);
}
}
return new ConsumerRecords(newRecords);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
LocalCacheServiceImpl localCacheServiceImpl = (LocalCacheServiceImpl) applicationContext.getBean("localCacheServiceImpl");
localCacheService = localCacheServiceImpl;
}
}
package com.onepiece.kafka.interceptor;




import com.onepiece.cache.service.LocalCacheService;
import com.onepiece.cache.service.impl.LocalCacheServiceImpl;
import com.onepiece.kafka.constant.Constant;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.kafka.support.KafkaUtils;
import org.springframework.stereotype.Component;
import org.springframework.util.ObjectUtils;


import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;


/**
 * 幂等配合控制consumer拦截器
 *
 * @author wanghaifeng
 */
@Slf4j
@Component
public class OnepieceKafkaIdempotentConsumerInterceptor implements ConsumerInterceptor<String, Object>, ApplicationContextAware {
    private static LocalCacheService localCacheService;

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
    }

    @Override
    public void close() {
    }


    @Override
    public void configure(Map<String, ?> configs) {
    }


    @SneakyThrows
    @Override
    public ConsumerRecords onConsume(ConsumerRecords<String, Object> records) {
        Map<TopicPartition, List<ConsumerRecord<String, Object>>> newRecords = new HashMap<>(records.count() * 2);

        Iterator<ConsumerRecord<String, Object>> iterator = records.iterator();
        while (iterator.hasNext()) {
            ConsumerRecord<String, Object> record = iterator.next();
            Header header = record.headers().lastHeader(Constant.MESSAGE_ID);
            if (ObjectUtils.isEmpty(header)) {
                newRecords.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
                continue;
            }
            String key = KafkaUtils.getConsumerGroupId() + "-" + new String(header.value(), "UTF-8");
            Object cacheObj = localCacheService.get(Constant.KAFKA_IDEMPOTENT_CACHE_INSTANCE_NAME, key, Constant.KAFKA_IDEMPOTENT_CACHE_SECONDS, Constant.KAFKA_IDEMPOTENT_CACHE_MAX_SIZE);
            if (!ObjectUtils.isEmpty(cacheObj)) {
                log.info("record:{},groupId:{},已经存在将不放入newRecords", record, KafkaUtils.getConsumerGroupId());
            } else {
                newRecords.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record));
                localCacheService.set(Constant.KAFKA_IDEMPOTENT_CACHE_INSTANCE_NAME, key, "1", Constant.KAFKA_IDEMPOTENT_CACHE_SECONDS, Constant.KAFKA_IDEMPOTENT_CACHE_MAX_SIZE);
            }
        }

        return new ConsumerRecords(newRecords);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        LocalCacheServiceImpl localCacheServiceImpl = (LocalCacheServiceImpl) applicationContext.getBean("localCacheServiceImpl");
        localCacheService = localCacheServiceImpl;
    }
}
package com.onepiece.kafka.interceptor; import com.onepiece.cache.service.LocalCacheService; import com.onepiece.cache.service.impl.LocalCacheServiceImpl; import com.onepiece.kafka.constant.Constant; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerInterceptor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Header; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.kafka.support.KafkaUtils; import org.springframework.stereotype.Component; import org.springframework.util.ObjectUtils; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; /** * 幂等配合控制consumer拦截器 * * @author wanghaifeng */ @Slf4j @Component public class OnepieceKafkaIdempotentConsumerInterceptor implements ConsumerInterceptor<String, Object>, ApplicationContextAware { private static LocalCacheService localCacheService; @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } @SneakyThrows @Override public ConsumerRecords onConsume(ConsumerRecords<String, Object> records) { Map<TopicPartition, List<ConsumerRecord<String, Object>>> newRecords = new HashMap<>(records.count() * 2); Iterator<ConsumerRecord<String, Object>> iterator = records.iterator(); while (iterator.hasNext()) { ConsumerRecord<String, Object> record = iterator.next(); Header header = record.headers().lastHeader(Constant.MESSAGE_ID); if (ObjectUtils.isEmpty(header)) { newRecords.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record)); continue; } String key = KafkaUtils.getConsumerGroupId() + "-" + new String(header.value(), "UTF-8"); Object cacheObj = localCacheService.get(Constant.KAFKA_IDEMPOTENT_CACHE_INSTANCE_NAME, key, Constant.KAFKA_IDEMPOTENT_CACHE_SECONDS, Constant.KAFKA_IDEMPOTENT_CACHE_MAX_SIZE); if (!ObjectUtils.isEmpty(cacheObj)) { log.info("record:{},groupId:{},已经存在将不放入newRecords", record, KafkaUtils.getConsumerGroupId()); } else { newRecords.put(new TopicPartition(record.topic(), record.partition()), Arrays.asList(record)); localCacheService.set(Constant.KAFKA_IDEMPOTENT_CACHE_INSTANCE_NAME, key, "1", Constant.KAFKA_IDEMPOTENT_CACHE_SECONDS, Constant.KAFKA_IDEMPOTENT_CACHE_MAX_SIZE); } } return new ConsumerRecords(newRecords); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { LocalCacheServiceImpl localCacheServiceImpl = (LocalCacheServiceImpl) applicationContext.getBean("localCacheServiceImpl"); localCacheService = localCacheServiceImpl; } }

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MYWOnoaa' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片