滑动窗口源码
核心代码类-SlidingWindow
package com.jd.platform.hotkey.worker.tool;
import cn.hutool.core.date.SystemClock;
import org.checkerframework.checker.units.qual.C;
import java.util.Arrays;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicLong;
/**
* 滑动窗口。该窗口同样的key都是单线程计算。
*
* @author wuweifeng wrote on 2019-12-04.
*/
public class SlidingWindow {
/**
* 循环队列,就是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicLong[] timeSlices;
/**
* 队列的总长度
*/
private int timeSliceSize;
/**
* 每个时间片的时长,以毫秒为单位
*/
private int timeMillisPerSlice;
/**
* 共有多少个时间片(即窗口长度)
*/
private int windowSize;
/**
* 在一个完整窗口期内允许通过的最大阈值
*/
private int threshold;
/**
* 该滑窗的起始创建时间,也就是第一个数据
*/
private long beginTimestamp;
/**
* 最后一个数据的时间戳
*/
private long lastAddTimestamp;
public static void main(String[] args) throws InterruptedException {
//1秒一个时间片,窗口共5个
SlidingWindow window = new SlidingWindow(2, 40);
//循环屏障
CyclicBarrier cyclicBarrier = new CyclicBarrier(10);
CountDownLatch latch=new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
//步调统一
cyclicBarrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
boolean hot = window.addCount(2);
System.out.println(hot);
window.print();
latch.countDown();
}
}).start();
}
latch.await();
for (int i = 0; i < 100; i++) {
System.out.println(window.addCount(2));
window.print();
System.out.println("--------------------------");
try {
Thread.sleep(102);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void print() {
Arrays.asList(timeSlices).stream().forEach(e->System.out.print(e+" "));
System.out.println();
}
public SlidingWindow(int duration, int threshold) {
//超过10分钟的按10分钟
if (duration > 600) {
duration = 600;
}
//要求5秒内探测出来的,
if (duration <= 5) {
this.windowSize = 5;
this.timeMillisPerSlice = duration * 200;
} else {
this.windowSize = 10;
this.timeMillisPerSlice = duration * 100;
}
this.threshold = threshold;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
public SlidingWindow(int timeMillisPerSlice, int windowSize, int threshold) {
this.timeMillisPerSlice = timeMillisPerSlice;
this.windowSize = windowSize;
this.threshold = threshold;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
/**
* 初始化
*/
private void reset() {
beginTimestamp = SystemClock.now();
//窗口个数
AtomicLong[] localTimeSlices = new AtomicLong[timeSliceSize];
for (int i = 0; i < timeSliceSize; i++) {
localTimeSlices[i] = new AtomicLong(0);
}
timeSlices = localTimeSlices;
}
/**
* 计算当前所在的时间片的位置
*/
private int locationIndex() {
long now = SystemClock.now();
//如果当前的key已经超出一整个时间窗口了,那么就直接初始化就行了,不用去计算了
if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
reset();
}
int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
if (index < 0) {
return 0;
}
return index;
}
/**
* 增加count个数量
*/
public synchronized boolean addCount(long count) {
//当前自己所在的位置,是哪个小时间窗(时间片)
int index = locationIndex();
// System.out.println("index:" + index);
//然后清空自己前面windowSize到2*windowSize之间的数据格的数据
//譬如1秒分4个窗口,那么数组共计8个窗口
//当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
//后面的一半需要清理的, 前面的一般,是要统计的, 一半是一个时间窗的大小
clearFromIndex(index);
int sum = 0;
// 在当前时间片里继续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时间片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
lastAddTimestamp = SystemClock.now();
return sum >= threshold;
}
private void clearFromIndex(int index) {
for (int i = 1; i <= windowSize; i++) {
int j = index + i;
if (j >= windowSize * 2) {
j -= windowSize * 2;
}
timeSlices[j].set(0);
}
}
}
核心字段
/**
* 循环队列,就是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicLong[] timeSlices;
/**
* 队列的总长度
*/
private int timeSliceSize;
/**
* 每个时间片的时长,以毫秒为单位
*/
private int timeMillisPerSlice;
/**
* 共有多少个时间片(即窗口长度)
*/
private int windowSize;
/**
* 在一个完整窗口期内允许通过的最大阈值
*/
private int threshold;
/**
* 该滑窗的起始创建时间,也就是第一个数据
*/
private long beginTimestamp;
/**
* 最后一个数据的时间戳
*/
private long lastAddTimestamp;
SlidingWindow:构造函数
- 10 是说采集的一个数组下标 的时间是 10*100/1000=1s 也就是除以10 的秒数
- 小于=5的话 就默认翻倍 除以10 也就是 5 代表着1s 而不是0.5s
- 1秒一个时间片,窗口共5个 大于1s的时间片窗口是10个
SlidingWindow window = new SlidingWindow(10, 40 ); - 窗口 我们设置为10个时间片
public SlidingWindow(int duration, int threshold) {
//超过10分钟的按10分钟
if (duration > 600) {
duration = 600;
}
//要求5秒内探测出来的,
if (duration <= 5) {
this.windowSize = 5;
this.timeMillisPerSlice = duration * 200;
} else {
this.windowSize = 10;
this.timeMillisPerSlice = duration * 100;
}
this.threshold = threshold;
// 保证存储在至少两个window
this.timeSliceSize = windowSize * 2;
reset();
}
我们构造滑动窗口的时候先设置一个时间片的时长,和阈值 一般会统计最近十个时间片的统计数量 和阈值 比较 看是不是热度key
每个可以占用的统计时长为
timeMillisPerSlice * windowSize
locationIndex:计算当前所在的时间片的位置
计算当前所在的时间片的位置源码
/**
* 计算当前所在的时间片的位置
*/
private int locationIndex() {
long now = SystemClock.now();
//如果当前的key已经超出一整个时间窗口了,那么就直接初始化就行了,不用去计算了
if (now - lastAddTimestamp > timeMillisPerSlice * windowSize) {
reset();
}
int index = (int) (((now - beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
if (index < 0) {
return 0;
}
return index;
}
这个计算公式 意思就是 从开始 到现在 过了多少个时间片, 因为使用的是 长度为timeSliceSize 的环形buffer 形式,所以再跟队列长度取模下 得到的结果就是 当前所在的时间片的位置
int index = (int) (((now – beginTimestamp) / timeMillisPerSlice) % timeSliceSize);
有两个时间戳 beginTimestamp 和 lastAddTimestamp
/**
* 该滑窗的起始创建时间,也就是第一个数据 reset 的时候更新
*/
private long beginTimestamp;
/**
* 最后一个数据的时间戳 addcount的时候更新
*/
private long lastAddTimestamp;
addCount:增加count个数量
/**
* 增加count个数量
*/
public synchronized boolean addCount(long count) {
//当前自己所在的位置,是哪个小时间窗(时间片)
int index = locationIndex();
// System.out.println("index:" + index);
//然后清空自己前面windowSize到2*windowSize之间的数据格的数据
//譬如1秒分4个窗口,那么数组共计8个窗口
//当前index为5时,就清空6、7、8、1。然后把2、3、4、5的加起来就是该窗口内的总和
//后面的一半需要清理的, 前面的一般,是要统计的, 一半是一个时间窗的大小
clearFromIndex(index);
int sum = 0;
// 在当前时间片里继续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时间片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
lastAddTimestamp = SystemClock.now();
return sum >= threshold;
}
滑动窗口存在的高并发问题
hotkey 热度计算的高并发问题
作者在进行热度计算的时候,为了提升 计算效率 消费队列使用了多线程(cpu核数)进行消费
@Bean
public Consumer consumer() {
int nowCount = CpuNum.workerCount();
//将实际值赋给static变量
if (threadCount != 0) {
nowCount = threadCount;
} else {
if (nowCount >= 8) {
nowCount = nowCount / 2;
}
}
List<KeyConsumer> consumerList = new ArrayList<>();
for (int i = 0; i < nowCount; i++) {
KeyConsumer keyConsumer = new KeyConsumer();
keyConsumer.setKeyListener(iKeyListener);
consumerList.add(keyConsumer);
threadPoolExecutor.submit(keyConsumer::beginConsume);
}
return new Consumer(consumerList);
}
public void beginConsume() {
while (true) {
try {
HotKeyModel model = QUEUE.take();
if (model.isRemove()) {
iKeyListener.removeKey(model, KeyEventOriginal.CLIENT);
} else {
iKeyListener.newKey(model, KeyEventOriginal.CLIENT);
}
//处理完毕,将数量加1
totalDealCount.increment();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void newKey(HotKeyModel hotKeyModel, KeyEventOriginal original) {
//cache里的key
String key = buildKey(hotKeyModel);
//判断是不是刚热不久
Object o = hotCache.getIfPresent(key);
if (o != null) {
return;
}
SlidingWindow slidingWindow = checkWindow(hotKeyModel, key);
//看看hot没
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
。。。。。。。
}
}
/**
* 增加count个数量
*/
public synchronized boolean addCount(long count) {
//当前自己所在的位置,是哪个小时间窗(时间片)
int index = locationIndex();
clearFromIndex(index);
int sum = 0;
// 在当前时间片里继续+1
sum += timeSlices[index].addAndGet(count);
//加上前面几个时间片
for (int i = 1; i < windowSize; i++) {
sum += timeSlices[(index - i + timeSliceSize) % timeSliceSize].get();
}
lastAddTimestamp = SystemClock.now();
return sum >= threshold;
}
每个线程获取到HotKeyModel以后 都在进行 iKeyListener.newKey(model, KeyEventOriginal.CLIENT); 热度计算。最终都会调用
boolean hot = slidingWindow.addCount(hotKeyModel.getCount());
进而调用
sum += timeSlices[index].addAndGet(count);
如果多个线程 对同一个key进行热度计算,也就会获取同一个 滑动窗口,但是
sum += timeSlices[index].addAndGet(count);
是并发不安全的
好在 热度计算这种结果数据结果正确性要求不严格,可以一定程度接受,
就像caffiene缓存 进行内存淘汰的时候,对缓存key 的调用次数计算使用W-TinyLFU 算法,也是不精确计算,一般这种热度计算或者点击次数计算,不要求 绝对精度的业务场景,要优先考虑的时候 内存占用 计算复杂度等
解决方案
可以参考 caffiene缓存 条状环形buff 替换掉目前的环形buff 循环队列,
/**
* 循环队列,就是装多个窗口用,该数量是windowSize的2倍
*/
private AtomicLong[] timeSlices;
这样每个线程的计算节点都会被分散到不同的索引位置 ,同时这样也增加的设计和计算的复杂度。
© 版权声明
文章版权归作者所有,未经允许请勿转载,侵权请联系 admin@trc20.tw 删除。
THE END