【一文系列】一篇文章说明指数退避算法以及使用Java语言的实现
“一文系列”目标是仅通过一篇文章来解决一个类别问题,简洁高效,快速获取知识,提升技能。
前言
在网络通信中,数据传输的成功和效率是至关重要的。然而,由于各种原因,如网络拥塞、服务器负载等,数据传输可能会面临一些挑战,其中之一就是传输失败。当数据传输失败时,为了确保数据的可靠性和完整性,最容易想到的解决方案就是使用重试机制但是,简单地使用固定时间间隔进行重试可能会导致进一步的冲突和拥塞,降低整体的通信效率。在这种情况下,指数退避算法
(Exponential Backoff Algorithm)应运而生,本文将深入探讨指数退避算法的原理、步骤、应用场景,并提供使用Java语言实现指数退避算法的示例代码。通过本文的阐述,读者将能够全面理解指数退避算法的重要性及其在网络通信中的应用场景。
让我们开始吧!!!
一、指数退避算法原理
指数退避算法最初用于以太网中,用于解决在共享媒介上进行的数据传输过程中可能出现的冲突和拥塞问题。该算法的核心思想是在发生通信错误时,通过等待一段时间后进行重试,以避免连续的冲突。重试的时间间隔随着重试次数的增加呈指数级增长,从而使得网络资源能够更加合理地分配。
二、指数退避算法步骤
1.描述
指数退避算法实现步骤相对简单,主要是一些五个步骤:
(1)初始化退避参数
:设置初始重试次数为0,初始等待时间为最小等待时间。
(2)发送数据
:尝试发送数据,并等待响应。
(3)处理响应
:若响应正常,则完成数据传输;若响应错误,则进行下一步。
(4)更新退避参数
:将重试次数加1,计算下一次的等待时间,这个等待时间随着重试次数增加呈指数级增长。在实际应用中,为了防止间隔等待时间过大地延长,通常会对增长指数进行最大值限制。
(5)等待并重试
:等待当前计算得到的等待时间,然后重复步骤(2)~(4),直到达到最大重试次数或者数据成功传输。
三、常见应用场景
这种重试机制主要应用在服务可用性和一致性要求都较高的系统中,常见的常见如下:
1. 网络抖动
指数退避算法广泛应用于各种网络通信场景,如网络请求、消息队列等。在网络发生抖动时,它能够有效降低网络拥塞带来的通信失败率,并提高整体系统的可靠性和性能。
2. 流量限制
在日常开发时会免不了调用一些第三方API接口,为了防止被恶意地刷接口,这些API基本都使用了限流机制或其他限制手段,在近期开发体验较深的是在调用目前火热的ChatGPT模型API时,经常会因为调用频率过快而被限制。在这个时候指数退避算法就有了用武之地,由于重试的间隔时间会呈现指数级增长,通常会给被调用方有足够的喘息时间,从而提升了重试时的成功率。
3. 容器化服务客户端开发
对于诸如docker、kubernetes等这个容器化服务客户端的开发中,由于容器资源会在集群节点中进行再分配,所以无法保证每一次调用都能保证成功(例如获取集群信息等),这种短暂的服务不可用对应客户端而言同样是需要进行重试的,并且采用指数退避这种方案往往会带来重试时更大的成功率。
四、指数退避算法的实现(Java版本)
在这部分将根据第二部分中给出的算法步骤并使用Java语言进行实现,为了便于理解,本文给出了模拟http请求时使用指数退避算法的示例。
1. 功能设计
(1)生成随机响应码
:用来模拟http请求的响应结果,使用[1, 100]范围内的随机数作为响应结果来够满足响应结果的随机性。
(2)成功响应的判定
:为了方便观察测试结果,设定响应码位于[70, 100]区间范围内属于成功,其他情况为失败,即每一次响应有30%的成功率。
(3)重试计数与退避指数生成
:由于退避指数与重试次数相关,在需要重试时需要使用重试计数器增加当前重试次数;为了防止退避指数过大带来重试间隔时间的过度延长,约定退避指数上限,退避指数生成器综合重试次数与最大退避指数按照以下规则生成退避指数:
(4)重试间隔时间计算
:定义初始等待时间initialInterval
作为最小等待时间,综合退避指数按照一下公式计算重试间隔时间:
(5)等待并重试
:当需要重试时根据重试间隔时间进行等待,并触发需要重试的逻辑。
2. 代码实现
响应构造器
public class ResponseBuilder {
public static final Integer MAX_CODE = 100;
private static final Integer SUCCESS_THRESHOLD = (int) (MAX_CODE * 0.7);
private static final Random RANDOM = new Random();
private ResponseBuilder() {}
public static ResponseBuilder getInstance() {
return ResponseBuilderHolder.INSTANCE;
}
public SimpleResponse getResponse() {
// 返回码为1~100中的随机值
return new SimpleResponse(RANDOM.nextInt(MAX_CODE) + 1);
}
private static class ResponseBuilderHolder {
private static final ResponseBuilder INSTANCE = new ResponseBuilder();
}
/**
* 响应对象
*/
public static class SimpleResponse {
/**
* 响应码
*/
private final int code;
private SimpleResponse(int code) {
this.code = code;
}
/**
* 判断响应是否成功,30%的成功率
*
* @return 判断响应是否成功
*/
public boolean isSuccess() {
return code >= SUCCESS_THRESHOLD;
}
public int getCode() {
return code;
}
}
}
重试计数,退避指数生成与重试间隔时间计算
@Immutable
@Slf4j
public class ExponentialBackoffIntervalCalculator {
/**
* 初始时间间隔(单位:毫秒)
*/
private final long initialInterval;
/**
* 给定的最大重试的指数量级,防止间隔时间过大
*/
private final int maxRetryIntervalExponent;
/**
* 最大重试次数
*/
private final int maxRetries;
/**
* 当前重试的次数,使用原子变量保证计数器的线程安全
*/
private final AtomicInteger currentRetryAttempt = new AtomicInteger(0);
public ExponentialBackoffIntervalCalculator(long initialInterval, int maxRetryIntervalExponent, int maxRetries) {
this.initialInterval = initialInterval;
this.maxRetryIntervalExponent = maxRetryIntervalExponent;
this.maxRetries = maxRetries;
}
/**
* 获取下一次重试时的时间间隔(单位:毫秒)
*
* @return 下一次重试时的时间间隔
*/
public long nextRetryInterval() {
// 根据重试次数来约定指数量级,最大指数量级不会超过最大上限
int exponentOfTwo = currentRetryAttempt.incrementAndGet();
if (exponentOfTwo > this.maxRetryIntervalExponent) {
exponentOfTwo = this.maxRetryIntervalExponent;
}
// 计算下一次重试的时间间隔,指数递增
long nextRetryInterval = this.initialInterval * (1L << exponentOfTwo);
log.info("[nextRetryInterval]: NextRetryInterval={}ms", nextRetryInterval);
return nextRetryInterval;
}
/**
* 获取当前的重试次数
*
* @return 当前的重试次数
*/
public int getCurrentRetryAttempt() {
return currentRetryAttempt.get();
}
/**
* 是否需要重试
*
* @return 是否需要重试
*/
public boolean shouldRetry() {
return currentRetryAttempt.get() < maxRetries || maxRetries < 0;
}
}
可重试的客户端
@Slf4j
public class RetryableClient {
private static final ResponseBuilder RESPONSE_BUILDER = ResponseBuilder.getInstance();
/**
* 初始时间间隔(单位:毫秒)
*/
private final long initialInterval;
/**
* 给定的最大重试的指数量级,防止间隔时间过大
*/
private final int maxRetryIntervalExponent;
/**
* 最大重试次数
*/
private final int maxRetries;
public RetryableClient(long initialInterval, int maxRetryIntervalExponent, int maxRetries) {
this.initialInterval = initialInterval;
this.maxRetryIntervalExponent = maxRetryIntervalExponent;
this.maxRetries = maxRetries;
}
/**
* 发送同步请求
*
* @return 请求响应
*/
public ResponseBuilder.SimpleResponse send() {
ExponentialBackoffIntervalCalculator calculator =
new ExponentialBackoffIntervalCalculator(initialInterval, maxRetryIntervalExponent, maxRetries);
return doSend(calculator);
}
private ResponseBuilder.SimpleResponse doSend(ExponentialBackoffIntervalCalculator calculator) {
ResponseBuilder.SimpleResponse response = RESPONSE_BUILDER.getResponse();
if (calculator.shouldRetry()) {
// 获取下一次重试时的时间间隔
long nextRetryInterval = calculator.nextRetryInterval();
if (response.isSuccess()) {
return response;
}
log.info("[doSend]: Need to retry, code={}, currentRetryAttempt={}, nextRetryInterval={}ms",
response.getCode(), calculator.getCurrentRetryAttempt(), nextRetryInterval);
// 睡眠间隔时间后进行重试
try {
TimeUnit.MILLISECONDS.sleep(nextRetryInterval);
return doSend(calculator);
} catch (InterruptedException e) {
// 恢复中断
Thread.currentThread().interrupt();
}
}
return response;
}
}
3. 测试
RetryableClient中send()方法的测试
@Slf4j
public class RetryableClientTest {
private final int maxRetryAttempt = 6;
@Test
void send() {
RetryableClient client = new RetryableClient(300, 4, maxRetryAttempt);
ResponseBuilder.SimpleResponse response = client.send();
// 以下操作被阻塞直到send()方法返回
log.info("[send]: Response code={}, success={}", response.getCode(), response.isSuccess());
}
}
测试结果
4. 优化
(1)优化点 上述实现虽然能够实现指数退避重试功能,但是在正式开发中却有很大的优化空间。参考开源项目fabric8中对于kubernetes client的实现,并结合上述功能总结出了以下优化点与优化方案:
- 上述实现方式属于同步阻塞的,当重试操作时间较长时会严重影响系统的效率。考虑改进为使用
异步回调
的方式。 - 上述实现中使用线程sleep的方式实现延迟等待,这种方式适用于简单的延迟等待,且没有复杂的任务调度和管理需求。往往在正式系统中都需要更多的任务调度和管理功能。考虑改进为使用
ScheduledThreadPoolExecutor
去实现延迟等待效果。
(2)实现
使用异步回调来支持发送异步请求
@Slf4j
public class RetryableClient {
private static final ResponseBuilder RESPONSE_BUILDER = ResponseBuilder.getInstance();
private final CachedSingleThreadScheduler SHARED_SCHEDULER = new CachedSingleThreadScheduler();
/**
* 初始时间间隔(单位:毫秒)
*/
private final long initialInterval;
/**
* 给定的最大重试的指数量级,防止间隔时间过大
*/
private final int maxRetryIntervalExponent;
/**
* 最大重试次数
*/
private final int maxRetries;
public RetryableClient(long initialInterval, int maxRetryIntervalExponent, int maxRetries) {
this.initialInterval = initialInterval;
this.maxRetryIntervalExponent = maxRetryIntervalExponent;
this.maxRetries = maxRetries;
}
/**
* 发送异步请求
*
* @return 请求响应的CompletableFuture包装对象
*/
public CompletableFuture<ResponseBuilder.SimpleResponse> sendAsync() {
ExponentialBackoffIntervalCalculator calculator =
new ExponentialBackoffIntervalCalculator(initialInterval, maxRetryIntervalExponent, maxRetries);
// 创建一个future去接响应
CompletableFuture<ResponseBuilder.SimpleResponse> responseFuture = new CompletableFuture<>();
return doSendAsync(responseFuture, calculator);
}
private CompletableFuture<ResponseBuilder.SimpleResponse> doSendAsync(CompletableFuture<ResponseBuilder.SimpleResponse> responseFuture,
ExponentialBackoffIntervalCalculator calculator) {
CompletableFuture
.supplyAsync(RESPONSE_BUILDER::getResponse)
.whenComplete((response, throwable) -> {
if (Objects.nonNull(throwable)) {
responseFuture.completeExceptionally(throwable);
return;
}
if (calculator.shouldRetry()) {
// 获取下一次重试时的时间间隔
long nextRetryInterval = calculator.nextRetryInterval();
if (!response.isSuccess()) {
log.info("[doSendAsync]: Need to retry, code={}, currentRetryAttempt={}, nextRetryInterval={}ms",
response.getCode(), calculator.getCurrentRetryAttempt(), nextRetryInterval);
// 使用定时调度器去重试
CompletableFuture<Void> scheduleCompletedFuture = new CompletableFuture<>();
SHARED_SCHEDULER.schedule(Runnable::run, () -> {
doSendAsync(responseFuture, calculator);
scheduleCompletedFuture.complete(null);
}, nextRetryInterval, TimeUnit.MILLISECONDS);
return;
}
}
// 去接响应体
responseFuture.complete(response);
});
return responseFuture;
}
}
使用ScheduledThreadPoolExecutor的定时调度器
@Slf4j
public class CachedSingleThreadScheduler {
private static final String SCHEDULER_THREAD_PREFIX = "cached_single_thread_scheduler-";
/**
* 默认的执行器关闭检测周期频率(10s)
*/
private static final long DEFAULT_CHECK_SHUTDOWN_TTL_MILLS = TimeUnit.SECONDS.toMillis(10);
private ScheduledThreadPoolExecutor scheduledThreadPoolExecutor;
private final long checkShutdownTtlMillis;
public CachedSingleThreadScheduler() {
this.checkShutdownTtlMillis = DEFAULT_CHECK_SHUTDOWN_TTL_MILLS;
}
public CachedSingleThreadScheduler(long checkShutdownTtlMillis) {
this.checkShutdownTtlMillis = checkShutdownTtlMillis;
}
/**
* 延迟执行任务,复合操作需要方法上锁
*/
public synchronized void schedule(Executor executor, Runnable command, long delay, TimeUnit unit) {
if (Objects.isNull(scheduledThreadPoolExecutor)) {
// 使用单线程调度执行器
scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
final ThreadFactory threadFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(@Nonnull Runnable r) {
Thread thread = threadFactory.newThread(r);
// 设置为守护线程
thread.setDaemon(true);
thread.setName(SCHEDULER_THREAD_PREFIX + thread.getName());
return thread;
}
});
scheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true);
// 周期性进行执行器关闭检查
scheduledThreadPoolExecutor.scheduleWithFixedDelay(this::shutdownExecutorIfNecessary,
this.checkShutdownTtlMillis, this.checkShutdownTtlMillis, TimeUnit.MILLISECONDS);
}
// 开始执行任务
doSchedule(executor, command, delay, unit);
}
/**
* 在满足执行器关闭条件时关闭执行器,复合操作需要方法上锁
*/
private synchronized void shutdownExecutorIfNecessary() {
if (Objects.nonNull(scheduledThreadPoolExecutor) && scheduledThreadPoolExecutor.getQueue().isEmpty()) {
scheduledThreadPoolExecutor.shutdownNow();
scheduledThreadPoolExecutor = null;
log.info("[shutdownExecutorIfNecessary]: Shut down executor");
}
}
private void doSchedule(Executor executor, Runnable command, long delay, TimeUnit unit) {
// 判断是否执行成功的标记
CompletableFuture<Void> cancelScheduleFuture = new CompletableFuture<>();
ScheduledFuture<?> scheduledFuture = scheduledThreadPoolExecutor.schedule(() -> {
try {
log.info("[doSchedule]: Start schedule");
executor.execute(command);
log.info("[doSchedule]: Finish schedule");
cancelScheduleFuture.complete(null);
} catch (Throwable ex) {
cancelScheduleFuture.completeExceptionally(ex);
}
}, delay, unit);
// 执行成功后需要取消此次执行任务的执行
cancelScheduleFuture.whenComplete((v, ex) -> scheduledFuture.cancel(true));
}
}
(3)测试
RetryableClient中sendAsync()方法的测试
@Slf4j
public class RetryableClientTest {
private final int maxRetryAttempt = 6;
@Test
void sendAsync() throws ExecutionException, InterruptedException {
RetryableClient client = new RetryableClient(300, 4, maxRetryAttempt);
CompletableFuture<ResponseBuilder.SimpleResponse> responseCompletableFuture = client.sendAsync();
// 以下操作不会被阻塞
log.info("[sendAsync] Doing something at {}", Thread.currentThread().getName());
ResponseBuilder.SimpleResponse resp = responseCompletableFuture.get();
log.info("[sendAsync]: Response code={}, success={}", resp.getCode(), resp.isSuccess());
}
}
测试结果
总结
一文总结
指数退避算法是一种在网络通信中常用的拥塞控制算法,能够有效降低通信失败率。本文通过介绍指数退避算法原理、描述步骤、实际应用场景以及Java语言的实现示例,希望能帮助读者理解和应用指数退避算法,并在实际工作中提高网络通信的可靠性和性能。 最后希望笔者的文章能给大家带来帮助,内容方面如有不足之处也希望大家多多给出意见和建议,让我们共同进步!!!
示例代码地址:gitee