XXL-JOB 客户端执行流程

kyaa111 5月前 ⋅ 154 阅读

客户端内嵌服务器
com.xxl.job.core.server.EmbedServer, 内建线程池, 基于netty的事件循环模型, 异步处理调度端的请求

ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
    }
},
new RejectedExecutionHandler() {
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
    }
});

com.xxl.job.core.server.EmbedServer.EmbedHttpServerHandler#channelRead0

bizThreadPool.execute(new Runnable() {
    @Override
    public void run() {
        // 接到调度请求后进行处理
        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);

        String responseJson = GsonTool.toJson(responseObj);
        // write response
        writeResponse(ctx, keepAlive, responseJson);
    }
});

通过请求参数匹配跳转到这里
com.xxl.job.core.biz.impl.ExecutorBizImpl#run

public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread
    // 一个任务id 一个线程
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;

    // 匹配运行模式
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {

        // 调度端发来的最新jobhandler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

        // valid old jobThread
        // 线程不为空 且任务handler 和 新的匹配不上 
        // 调度端更改了JobHandler的名称, 会出现这种情况, 将jobThread置空, 稍后会将原进程终止掉
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

            jobThread = null;
            jobHandler = null;
        }

        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            // 再次判空
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else
    ...

    // 不为空 就是当前任务已经有在执行了
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // 丢弃后续调度 直接返回失败
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // 覆盖之前调度 将jobThread置为空
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                jobThread = null;
            }
        } else {
            // 单机串行
            // 啥也不干 等待线程执行完上一次调度 再执行
        }
    }
    // replace thread (new or exists invalid)
    if (jobThread == null) {
        // 旧线程会终止, 并在映射map中替换为新的thread
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }

    // 将本次调度数据加入队列(LinkedBlockingQueue) 并返回成功
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

com.xxl.job.core.thread.JobThread#run

public void run() {
    // 初始化
    try {
        handler.init();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    // execute
    while(!toStop){
        running = false;
        // 空闲次数
        idleTimes++;

        TriggerParam triggerParam = null;
        try {
            // 设置最大阻塞时间 因为需要判断toStop信号量 所以避免使用take一直阻塞
            triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
            if (triggerParam!=null) {
                running = true;
                idleTimes = 0;
                // triggerLogIdSet使用的是 Collections.synchronizedSet(new HashSet<Long>());
                triggerLogIdSet.remove(triggerParam.getLogId());

                // log filename, like "logPath/yyyy-MM-dd/9999.log"
                String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                XxlJobContext xxlJobContext = new XxlJobContext(
                        triggerParam.getJobId(),
                        triggerParam.getExecutorParams(),
                        logFileName,
                        triggerParam.getBroadcastIndex(),
                        triggerParam.getBroadcastTotal());

                // 初始化上下文 包括调度端传递的参数
                XxlJobContext.setXxlJobContext(xxlJobContext);

                // execute
                XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
                // 若设置了任务超时时间 则此次任务使用FutureTask执行
                if (triggerParam.getExecutorTimeout() > 0) {
                    // limit timeout
                    Thread futureThread = null;
                    try {
                        FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
                            @Override
                            public Boolean call() throws Exception {

                                // 再次设置上下文 因为开了一个新线程
                                XxlJobContext.setXxlJobContext(xxlJobContext);

                                handler.execute();
                                return true;
                            }
                        });
                        futureThread = new Thread(futureTask);
                        futureThread.start();

                        Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                    } catch (TimeoutException e) {

                        XxlJobHelper.log("<br>----------- xxl-job job execute timeout");
                        XxlJobHelper.log(e);

                        // handle result
                        XxlJobHelper.handleTimeout("job execute timeout ");
                    } finally {
                        futureThread.interrupt();
                    }
                } else {
                    // 默认直接执行
                    handler.execute();
                }

                // 处理返回结果
                if (XxlJobContext.getXxlJobContext().getHandleCode() <= 0) {
                    XxlJobHelper.handleFail("job handle result lost.");
                } else {
                    String tempHandleMsg = XxlJobContext.getXxlJobContext().getHandleMsg();
                    tempHandleMsg = (tempHandleMsg!=null&&tempHandleMsg.length()>50000)
                            ?tempHandleMsg.substring(0, 50000).concat("...")
                            :tempHandleMsg;
                    XxlJobContext.getXxlJobContext().setHandleMsg(tempHandleMsg);
                }
                XxlJobHelper.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- Result: handleCode="
                        + XxlJobContext.getXxlJobContext().getHandleCode()
                        + ", handleMsg = "
                        + XxlJobContext.getXxlJobContext().getHandleMsg()
                );

            } else {
                // 连续闲置 30*3 = 90s后 且没有待执行的任务, 线程关闭
                if (idleTimes > 30) {
                    if(triggerQueue.size() == 0) {	// avoid concurrent trigger causes jobId-lost
                        XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            }
        } catch (Throwable e) {
            if (toStop) {
                XxlJobHelper.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
            }

            // 将异常输出
            StringWriter stringWriter = new StringWriter();
            e.printStackTrace(new PrintWriter(stringWriter));
            String errorMsg = stringWriter.toString();

            XxlJobHelper.handleFail(errorMsg);

            XxlJobHelper.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
        } finally {
            if(triggerParam != null) {
                // 上报执行结果
                if (!toStop) {
                    // commonm
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.getXxlJobContext().getHandleCode(),
                            XxlJobContext.getXxlJobContext().getHandleMsg() )
                    );
                } else {
                    // is killed
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                            triggerParam.getLogId(),
                            triggerParam.getLogDateTime(),
                            XxlJobContext.HANDLE_COCE_FAIL,
                            stopReason + " [job running, killed]" )
                    );
                }
            }
        }
    }

    // 走到这里说明线程被终止 若有剩余任务则全部标记失败 并上报
    while(triggerQueue !=null && triggerQueue.size()>0){
        TriggerParam triggerParam = triggerQueue.poll();
        if (triggerParam!=null) {
            // is killed
            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                    triggerParam.getLogId(),
                    triggerParam.getLogDateTime(),
                    XxlJobContext.HANDLE_COCE_FAIL,
                    stopReason + " [job not executed, in the job queue, killed.]")
            );
        }
    }

    // 销毁逻辑
    try {
        handler.destroy();
    } catch (Throwable e) {
        logger.error(e.getMessage(), e);
    }

    logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}