SkyWalking源码– Agent 数据缓存

本文基于 SkyWalking-Java-agent 8.15.0 版本

image.png

image.png

QueueBuffer

实现

QueueBuffer 队列缓存,是数据缓存的最顶层接口,共有 ArrayBlockingQueueBufferBuffer 两个实现类。

  • ArrayBlockingQueueBuffer
    • 基于 ArrayBlockingQueue 数组阻塞队列,实现 queue 进行数据缓存
    • BufferStrategy 判断队列满时的存放策略。基于阻塞队列的机制,这里只能使用 BLOCKING 阻塞策略。
  • Buffer
    • 基于 Object[] 对象数组,实现 环形队列 进行数据缓存
    • 通过 AtomicRangeInteger 进行 环形队列 的索引控制
    • BufferStrategy 判断队列满时的存放策略

BufferStrategy

BufferStrategy 共有两种策略:

  • 第一种 BLOCKING 阻塞,等待队列有空位置
  • 第二种 IF_POSSIBLE 能放就放

AtomicRangeInteger

AtomicRangeInteger extends Number,基于 AtomicIntegerArray 进行的封装。

AtomicIntegerArray 是 JDK 提供的一个可以原子化操作数组的封装。其中

  • AtomicIntegerArray.base :获取一个 int 类型数组对象的 对象头的字节长度
  • int scale = unsafe.arrayIndexScale(int[].class); 计算指定数据类型的数组中每个元素所占用的内存空间
  • Integer.numberOfLeadingZeros(scale); 计算传入的数字在二进制表示下,从左开始有多少个连续的 0
  • private static long byteOffset(int i) 传入任何一个数组下标,只需要左移 shift 位并加上对象头的偏移长度,就可以得到当前这个下标所对应的元素在这个数组对象的内存空间的偏移量

AtomicIntegerArray 在不同版本 JDK 里有不同的实现

  • JDK8 基于 Unsafe 实现
  • JDK8+ 基于 VarHandle 实现,VarHandle 是对 Unsafe 操作的封装

Channels

概念

  • 管理多个队列缓存。多个队列缓存采用 QueueBuffer<T>[] 数组的方式进行统一管理。
  • 使用 BufferStrategy 缓存策略决定选用哪种队列缓存结构。策略选用 BLOCKING 时,选用 ArrayBlockingQueueBuffer 结构,否则选用 Buffer
  • 使用 IDataPartitioner<T> 数据分区器来选择数组中相应的缓存进行数据存放

IDataPartitioner

IDataPartitioner 数据分区器一共有两个实现类,分别是 ProducerThreadPartitioner、SimpleRollingPartitioner

  • ProducerThreadPartitioner 采用 (int) Thread.currentThread().getId() % total 算法,最大重试次数 1 次
  • SimpleRollingPartitioner 采用 Math.abs(i++ % total) 算法,最大重试次数 3 次

image.png

数据消费

  • ConsumerThread
    • IConsumer<T> 消费
    • List<DataSource> 数据缓存。DataSource 是对 QueueBuffer<T> 的再封装,本质上还是 QueueBuffer
  • MultipleChannelsConsumer
    • 单个消费线程,多个channels
  • ConsumeDriver
    • 一堆消费者线程(ConsumerThread[])拿着一堆缓存数据(Channels<T>),按 allocateBuffer2Thread() 的策略进行分配消费
private void allocateBuffer2Thread() {  
    // buffer 的数量  
    int channelSize = this.channels.getChannelSize();  
    /**  
    * 因为 channels 里面有很多个 buffer,同时这里也有很多个消费者线程  
    * 这一步的操作就是将这些 buffer 分配给不同的消费者线程去消费  
    *  
    * if consumerThreads.length < channelSize  
    * each consumer will process several channels.  
    *  
    * if consumerThreads.length == channelSize  
    * each consumer will process one channel.  
    *  
    * if consumerThreads.length > channelSize  
    * there will be some threads do nothing.  
    */  
    for (int channelIndex = 0; channelIndex < channelSize; channelIndex++) {  
        int consumerIndex = channelIndex % consumerThreads.length;  
        consumerThreads[consumerIndex].addDataSource(channels.getBuffer(channelIndex));  
    }  
}
  • BulkConsumePool
    • List<MultipleChannelsConsumer> 多个 MultipleChannelsConsumer

© 版权声明
THE END
喜欢就支持一下吧
点赞0

Warning: mysqli_query(): (HY000/3): Error writing file '/tmp/MY66a5Ob' (Errcode: 28 - No space left on device) in /www/wwwroot/583.cn/wp-includes/class-wpdb.php on line 2345
admin的头像-五八三
评论 抢沙发
头像
欢迎您留下宝贵的见解!
提交
头像

昵称

图形验证码
取消
昵称代码图片