上一篇1. xxl-job源码分析-从demo开始介绍了xxl-job
中的xxl-job-executor-sample-springboot
,本篇继续深入,介绍其中的重点内容XxlJobExecutor
。只要是要集成xxl-job
的都需要创建一个XxlJobExecutor
,也就是xxl-job
的一个执行器。这次让我们一探究竟,看他是如何同中心服务进行连接执行的。
XxlJobExecutor
参数
首先看到的是这个类的参数,也就是可以给XxlJobExecutor
赋值配置的属性。
// 调度中心配置的地址
private String adminAddresses;
// 访问控制的token
private String accessToken;
// 本地应用的名字
private String appname;
// 本地的地址,也就是注册地址,如果设置了值就忽略ip+端口的地址
private String address;
// 本地的ip
private String ip;
// 本地的端口
private int port;
// 日志路径
private String logPath;
// 保留日志的天数
private int logRetentionDays;
这些属性都是见名知意的,具体的作用会在后续介绍。
在XxlJobExecutor
类中,作者很贴心的帮忙把这个重要的类分解成了几个部分,用分割线分割开。下面我们跟着作者的思路,一一进行剖析讲解。
start + stop
// ---------------------- start + stop ----------------------
public void start() throws Exception {
// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// 根据中心调度平台,初始化调用的客户端对象
initAdminBizList(adminAddresses, accessToken);
// 启动日志文件清理线程,用来清理日志文件
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 启动回调线程
TriggerCallbackThread.getInstance().start();
// 初始化客户端服务
initEmbedServer(address, ip, port, appname, accessToken);
}
public void destroy(){
// 停止客户端服务
stopEmbedServer();
// 销毁处理的线程
if (jobThreadRepository.size() > 0) {
for (Map.Entry<Integer, JobThread> item: jobThreadRepository.entrySet()) {
JobThread oldJobThread = removeJobThread(item.getKey(), "web container destroy and kill the job.");
// wait for job thread push result to callback queue
if (oldJobThread != null) {
try {
oldJobThread.join();
} catch (InterruptedException e) {
logger.error(">>>>>>>>>>> xxl-job, JobThread destroy(join) error, jobId:{}", item.getKey(), e);
}
}
}
jobThreadRepository.clear();
}
jobHandlerRepository.clear();
// 停止日志清理
JobLogFileCleanThread.getInstance().toStop();
// 停止回调线程
TriggerCallbackThread.getInstance().toStop();
}
上面这段代码,描述了客户端在启动和停止的流程。
启动的时候,需要初始化日志路径、初始化调用的客户端对象(类方法,后面会有想想讲解)、启动日志文件清理线程、启动回调线程、初始化客户端服务(类方法,这里采用了Netty进行网络传输,后面会说到)。
停止的时候,就和启动是相反了,启动了什么,就需要关闭什么,包括停止客户端服务、销毁处理的线程、停止日志清理和停止回调线程。
这里面很多具体的实现并没有直接写在启动和停止的内部,而是拆分成了多个小方法。这样让代码看起来更加的精简,也方便我们快速寻找我们所关系的逻辑。
因为初始化日志路径,启动日志文件清理线程,这两个都不是重点。都是见名知意,不是很复杂,大家可以自行查看研究。这里只是介绍一下启动回调线程。这个回调线程是在执行器执行完任务后,回调任务结果时使用的。
启动回调线程
public void start() {
// 校验注册到中心的客户端是否为空
if (XxlJobExecutor.getAdminBizList() == null) {
logger.warn(">>>>>>>>>>> xxl-job, executor callback config fail, adminAddresses is null.");
return;
}
// callback
triggerCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
// normal callback
while(!toStop){
try {
// 线程死循环的获取阻塞队列中的回调到调度中心的参数,获取不到会进行阻塞
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
// 将阻塞队列中的参数一次性全部取出,防止生产过多,消费慢的情况
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
// 真正的回调方法
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// last callback
try {
// 如果线程停止了,最后的处理逻辑
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
if (callbackParamList!=null && callbackParamList.size()>0) {
doCallback(callbackParamList);
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>>>> xxl-job, executor callback thread destroy.");
}
});
// 设置为后台线程
triggerCallbackThread.setDaemon(true);
triggerCallbackThread.setName("xxl-job, executor TriggerCallbackThread");
triggerCallbackThread.start();
// 启动一个重试线程
triggerRetryCallbackThread = new Thread(new Runnable() {
@Override
public void run() {
while(!toStop){
try {
// 重试失败的回调文件
retryFailCallbackFile();
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 等待30s
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, executor retry callback thread destroy.");
}
});
triggerRetryCallbackThread.setDaemon(true);
triggerRetryCallbackThread.start();
}
在启动回调线程的方法里,其实不仅仅是启动了回调线程,还启动了失败重试的线程。回调线程会利用阻塞队列,一直获取阻塞队列中需要回调到中心节点的内容,获取到了就进行回调。而重试失败的回调文件的线程会获取失败重试的文件,因为当回调失败的时候,会写入失败重试的文件。这个重试的线程则会每隔30s去检查下是否有需要重试的文件信息,如果有的话,就需要获取文件解析成报文继续回调。
admin-client (rpc invoker)
// ---------------------- admin-client (rpc invoker) ----------------------
private static List<AdminBiz> adminBizList;
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
// 配置中心的地址,根据逗号分隔
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
// 根据每一个地址生成一个连接中心的客户端
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
// 存放的链表为空,则先创建一个,然后将客户端放入链表缓存起来
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
adminBizList.add(adminBiz);
}
}
}
}
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
从上面的注释中可以看出。这段代码根据配置的adminAddresses
创建AdminBizClient
。下面继续看下AdminBizClient
的代码。
public class AdminBizClient implements AdminBiz {
public AdminBizClient() {
}
public AdminBizClient(String addressUrl, String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
}
private String addressUrl ;
private String accessToken;
private int timeout = 3;
@Override
public ReturnT<String> callback(List<HandleCallbackParam> callbackParamList) {
return XxlJobRemotingUtil.postBody(addressUrl+"api/callback", accessToken, timeout, callbackParamList, String.class);
}
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
@Override
public ReturnT<String> registryRemove(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registryRemove", accessToken, timeout, registryParam, String.class);
}
}
AdminBizClient
的代码也非常简单,它实现了AdminBiz
,也就是实现了对调度中心的回调,注册,取消注册的功能。而如果继续追踪XxlJobRemotingUtil
的postBody
方法,就可以发现它其实就是远程http
的post
方法。所以在注释中写着rpc invoker
也就是这层含义。
executor-server (rpc provider)
// ---------------------- executor-server (rpc provider) ----------------------
// 内嵌的服务端,需要和中心节点通信
private EmbedServer embedServer = null;
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// 填充ip和端口
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// 生成地址
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// 访问Token
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// 启动服务
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
private void stopEmbedServer() {
// 停止服务
if (embedServer != null) {
try {
embedServer.stop();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
不知道大家看了上面的代码会不会迷惑,怎么刚刚创建了中心节点的客户端,又要创建一个服务端。自己怎么既是客户端又是服务端呢?其实这两点并不冲突,因为它们的功能作用不一样。作为调用中心的客户端我们不难理解,因为我们需要找到中心节点。而作为服务端,是为了接受调度中心的指令。比如调度中心在界面上进行操作,需要服务端执行某个Job
的话,那它如何接受指令呢?那就得开放一个地址,端口,接收指令,这样一来,它不就又是一个服务端,作为服务的提供者,等待请求调用。
作为一个服务,大部分情况下他即使生产者,提供自身的功能给他人使用,又是一个消费者,需要用到其他服务的能力。
job handler repository
// 创建一个ConcurrentMap,根据名称存储IJobHandler
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
// 获取一个IJobHandler的方法
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
// 注册一个IJobHandler
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
// 根据注解XxlJob,带有注解的bean对象,执行的方法注册
protected void registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod){
if (xxlJob == null) {
return;
}
String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
executeMethod.setAccessible(true);
// init and destroy
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
}
因为上一篇1. xxl-job源码分析-从demo开始 已经分析过registJobHandler(XxlJob xxlJob, Object bean, Method executeMethod)
方法了,这里只需要知道有一个ConcurrentMap<String, IJobHandler> jobHandlerRepository
存储着这些IJobHandler
,在需要的时候能够通过名称获取即可。
job thread repository
// 创建一个ConcurrentMap,根据jobId存储JobThread
private static ConcurrentMap<Integer, JobThread> jobThreadRepository = new ConcurrentHashMap<Integer, JobThread>();
// 用JobId和IJobHandler注册JobThread
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
// JobId和IJobHandler创建一个JobThread
JobThread newJobThread = new JobThread(jobId, handler);
// 线程启动
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
// 存入jobThreadRepository,存在返回旧的值,返回了旧的值则需要销毁
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
// 反馈新的线程
return newJobThread;
}
// 根据JobId和理由移除一个存在的线程
public static JobThread removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
return oldJobThread;
}
return null;
}
// 根据jobId获取JobThread
public static JobThread loadJobThread(int jobId){
return jobThreadRepository.get(jobId);
}
jobThreadRepository
和jobHandlerRepository
有着类似的功能,都是作为一个缓存使用。方便在后续的时候的使用。
总结
本篇介绍了XxlJobExecutor
类的参数和一些重要的方法,主要还是跟着作者的思路,介绍了start+stop
,admin-client
,executor-server
,job handler repository
和job thread repository
这几个部分。然后对这些内容做了一些简单的分析。大概了解到了Executor是如何和中心节点进行传输通信的。不过还有一部分executor-server
没有展开讲解,这部分的内容也非常重要,这部分的内容留到下篇继续,敬请期待。