XXL-JOB 客户端执行流程

2023-11-22 22:44:44 358


客户端内嵌服务器
com.xxl.job.core.server.EmbedServer, 内建线程池, 基于netty的事件循环模型, 异步处理调度端的请求

ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
    }
},
new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
    }
});

com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0

bizThreadPool.execute(new Runnable() {
    @Override
    public void run() {
        // 接到调度请求后进行处理
        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

        String responseJson = GsonTool.toJson(responseObj);
        // write response
        writeResponse(ctx, keepAlive, responseJson);
    }
});

通过请求参数匹配跳转到这里
com.xxl.job.core.biz.impl.ExecutorBizImpl#run

public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    // 一个任务id 一个线程
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // 匹配运行模式
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // 调度端发来的最新jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        // 线程不为空 且任务handler 和 新的匹配不上 
        // 调度端更改了JobHandler的名称, 会出现这种情况, 将jobThread置空, 稍后会将原进程终止掉
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            // 再次判空
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else
    ...

    // 不为空 就是当前任务已经有在执行了
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // 丢弃后续调度 直接返回失败
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // 覆盖之前调度 将jobThread置为空
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // 单机串行
            // 啥也不干 等待线程执行完上一次调度 再执行
        }
    }
    // replace thread (new or exists invalid)
    if (jobThread == null) {
        // 旧线程会终止, 并在映射map中替换为新的thread
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 将本次调度数据加入队列(LinkedBlockingQueue) 并返回成功
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

com.xxl.job.core.thread.JobThread#run

public void run() {
    // 初始化
    try {
        handler.init();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    // execute
    while(!toStop){
        running = false;
        // 空闲次数
        idleTimes++;

        TriggerParam triggerParam = null;
        try {
            // 设置最大阻塞时间 因为需要判断toStop信号量 所以避免使用take一直阻塞
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                running = true;
                idleTimes = 0;
                // triggerLogIdSet使用的是 Collections.synchronizedSet(new HashSet<Long>());
                triggerLogIdSet.remove(triggerParam.getLogId());

                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                        triggerParam.getJobId(),
                        triggerParam.getExecutorParams(),
                        logFileName,
                        triggerParam.getBroadcastIndex(),
                        triggerParam.getBroadcastTotal());

                // 初始化上下文 包括调度端传递的参数
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
                // 若设置了任务超时时间 则此次任务使用FutureTask执行
                if (triggerParam.getExecutorTimeout() > 0) {
                    // limit timeout
                    Thread futureThread = null;
                    try {
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {

                                // 再次设置上下文 因为开了一个新线程
                                XxlJobContext.setXxlJobContext(xxlJobContext);

                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();

                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {

                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);

                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // 默认直接执行
                    handler.execute();
                }

                // 处理返回结果
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                            ?tempHandleMsg.substring(0, 50000).concat("...")
                            :tempHandleMsg;
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                        + XxlJobContext.getXxlJobContext().getHandleCode()
                        + ", handleMsg = "
                        + XxlJobContext.getXxlJobContext().getHandleMsg()
                );

            } else {
                // 连续闲置 30*3 = 90s后 且没有待执行的任务, 线程关闭
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            // 将异常输出
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();

            XxlJobHelper.handleFail(errorMsg);

            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // 上报执行结果
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.getXxlJobContext().getHandleCode(),
                            XxlJobContext.getXxlJobContext().getHandleMsg() )
                    );
                } else {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_COCE_FAIL,
                            stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }

    // 走到这里说明线程被终止 若有剩余任务则全部标记失败 并上报
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.HANDLE_COCE_FAIL,
                    stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }

    // 销毁逻辑
    try {
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}


Java操作Redis的常见误区

不能使用 keys * 命令不能在set中存放大量数据
2021-01-11

Java 单例模式

双重校验懒汉式单例public class Singleton { //1. 避免jvm指令重排 由于JVM具有指令重排的特性,执行顺序有可能变成 1-3-2。指令重排在单线程下不会出现问题,但是在多线程下会导致一个线程获得一个未初始化的实例。例如:线程T1执行了1和3,此时T2调用 ge
2021-01-06

Java 工厂模式

public class Rectangle implements Shape { @Override public void draw() { System.out.println("Inside Rectangle::draw() method."); } }
2021-01-02

Java 模板模式

public abstract class Template { public final void templ() { System.out.println("开始"); code(); System.out.println("结束"); } public abstract v
2021-01-02

Java 装饰者模式

接口Printerpublic interface Printer { void start(); void print(); void stop(); } 实现类 HPPrinter public class HPPrinter implements Printer {
2021-01-02
Java Proxy.newProxyInstance动态代理

Java Proxy.newProxyInstance动态代理

定义接口public interface Student { void buy(); String talk(); } 实现类public class Lisi implements Student { @Override public void buy() { System.ou
2021-01-02

Java 解析windows 快捷方式 lnk文件

转载自 https://www.oschina.net/code/snippet_12_274import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileInputStream; public c
2020-12-10

Spring Boot项目中使用RedisTemplate.delete() 删除指定key失败的解决办法

https://blog.csdn.net/hello_world_qwp/article/details/85763286上面这篇博客扯一大堆, 居然还分析源码实际只是自定义了key的序列化方式导致最终操作redis的时候序列化的key与预期的key不一致而已, 自然就删不掉redis中的数据了
2020-11-26

jpa方法参数必须加上@Param

void deleteByKl(String kl);线上可能报错原因可能是编译时没有加-parameters这个参数, 编译后丢失了参数名称, 使得反射拿不到对应参数需要加上注解void deleteByKl(@Param("kl") String kl);同理public ResultVO de
2020-09-28

日志规范

日志中要打印参数错误示例 @GetMapping("/share_coupon") public ActionResult shareCoupon(Long couponSn) { //validate code try { retur
2020-09-17

Java中的Date和时区转换

1.Date中保存的是什么在java中,只要我们执行 Date date = new Date(); 就可以得到当前时间。如:Date date = new Date(); System.out.println(date); 输出结果是: Thu Aug 24 10:15:29 CST 2017 也
2020-03-28
基于自定义注解手写权限控制

基于自定义注解手写权限控制

方法一: AOP 方法二: 拦截器项目结构项目依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-w
2020-03-28

如何在Java中替换多个if语句

1. 概述选择结构是任何编程语言的重要组成部分。但是我们编写了大量嵌套的if语句,这使得我们的代码更加复杂和难以维护。 在本教程中,我们将介绍替换嵌套if语句的各种方法。 让我们探索如何简化代码的不同选项。2. 案例研究我们经常遇到涉及很多条件的业务逻辑,并且每个都需要不同的处理。为了演示,我们以C
2020-03-28
Java中方法的参数传递机制

Java中方法的参数传递机制

来看一段代码 public class Man { private String name; private Integer age; public String getName() { return name; } publi
2020-03-28

freemarker 时间显示不正常 设置时区

项目在本地开发的时候显示正常,部署上服务器就一直差8个小时,最后发现freemarker官方文档有这样的说明time_zone:时区的名称来显示并格式化时间。 默认情况下,使用JVM的时区。 也可以是 Java 时区 API 接受的值,或者 "JVM default" (从 FreeMarker 2
2020-03-28
IDEA 2019.1 xml 不高亮

IDEA 2019.1 xml 不高亮

前几天更新了idea后,发现xml里的代码都没有了高亮,变得跟记事本一个德性了打开setting ,搜索 File Types,找到xml项, 查看下方的匹配格式,果然没有xml,(idea真是厉害)点击右方的+,输入*.xml,点击ok,解决问题
2020-03-28

npm install 淘宝镜像

npm install --registry=https://registry.npm.taobao.org
2020-03-28
Java中方法的参数传递机制

Java中方法的参数传递机制

来看一段代码 public class Man { private String name; private Integer age; public String getName() { return name; } publi
2020-03-28
基于自定义注解手写权限控制

基于自定义注解手写权限控制

方法一: AOP 方法二: 拦截器项目结构项目依赖<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-w
2020-03-28

Docker 部署 详细全过程 附代码

Docker 部署本站 全过程环境:CentOS7.61. 安装Docker其他版本CentOS可以参考这个https://help.aliyun.com/document_detail/187598.html查看本机内核版本,内核版本需高于 3.10uname -r 确保 yum 包最新yum u
2020-03-28

SpringBoot 启动普通java工程

引入依赖<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>2.0.9</version> </dependency>
2020-03-28

Vue.js DOM操作

<template> <input type="button" @click="reply($event)" value="回复"> </template> export default { methods: { replyFun(e) {
2020-03-29
CentOS7编译调试OpenJDK12

CentOS7编译调试OpenJDK12

1. 下载源码https://hg.openjdk.java.net/jdk/jdk12点击左侧的browse,再点击zip,就可以下载zip格式的源码压缩包。unzip xxx.zip 解压文件2. 安装jdkyum install java-11-openjdk-devel -y3. 运行con
2020-04-23
编写自己的Spring Boot Starter

编写自己的Spring Boot Starter

1.新建一个maven项目命名规则统一是xxx-spring-boot-starter完整pom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"
2020-06-29