V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
爱意满满的作品展示区。
lien321
V2EX  ›  分享创造

[自研开源] MyData 数据集成任务的流程介绍 v0.7.1

  •  
  •   lien321 · 304 天前 · 1235 次点击
    这是一个创建于 304 天前的主题,其中的信息可能已经有所发展或是发生改变。

    开源地址:gitee | github

    详细介绍:MyData 基于 Web API 的数据集成平台 v0.7.0

    部署文档:[用 Docker 部署 MyData v0.7.1]( https://www.mydata.work/docs#/./docker/用 Docker 部署 MyData)

    使用手册:MyData 使用手册 v0.7.1

    交流 Q 群:430089673

    MyData 后端结构

    MyData 的后端由 3 个子服务组成,分别是管理服务任务服务业务数据服务

    • 管理服务:通过项目、数据标准、应用 API 、环境的管理 配置出同步业务数据的任务;
    • 任务服务:根据配置的任务 定时调用应用 API 和数据服务 实现业务数据的传输和存储;
    • 数据服务:封装业务数据的隔离机制和读写操作;

    依赖的组件:

    • MySQL:存储管理数据;
    • Redis:缓存管理数据和任务;
    • MongoDB ;存储业务数据;

    下图从数据流角度 展示 3 个子服务的关联: image.png 注:开源版本采用单体 SpringBoot ;

    任务服务

    配置任务

    任务主要包括:项目环境、数据标准、应用 API 、任务类型、字段映射、任务周期;

    • 项目环境:确定应用 API 的统一前缀地址;
    • 数据标准:明确集成的业务数据的数据结构;
    • 应用 API: 业务数据的传输通道;
    • 任务类型:明确数据的传输方向,提供数据表示从应用 API 读取业务员数据、消费数据表示向应用 API 发送业务数据;
    • 字段映射:配置接口响应结构中 与标准数据字段的映射关系;
    • 任务周期:定期执行任务的时间间隔,格式为 cron 表达式; image.png

    任务流程

    数据集成的任务执行流程如下图:

    1. 任务服务启动时(即 MyData 系统启动),查询所有运行状态的任务;

      public class JobExecutor implements ApplicationRunner {
          ...
      
          @Override
          public void run(ApplicationArguments args) {
              // 移除已有缓存
              jobCache.removeAll();
      
              // 查询已启动的任务
              List<Task> tasks = taskService.listRunningTasks();
              log.info("tasks.size() = " + tasks.size());
              if (CollUtil.isNotEmpty(tasks)) {
                  tasks.forEach(this::startTask);
              }
          }
      
          ...
      }
      
    2. 根据任务的 cron 表达式,计算任务的下次执行时间;

      /**
       * 根据 任务的上次执行时间 和 设定间隔规则,计算任务的 下次执行时间
       *
       * @param taskInfo 定时任务
       */
      private void calculateNextRunTime(TaskInfo taskInfo) {
          Assert.notNull(taskInfo);
          Assert.notEmpty(taskInfo.getTaskPeriod());
      
          Date date = taskInfo.getStartTime();
          if (taskInfo.getFailCount() > 0) {
              date = taskInfo.getNextRunTime();
          }
      
          CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod());
          Date nextRunTime = cronExpression.getNextValidTimeAfter(date);
          taskInfo.setNextRunTime(nextRunTime);
      }
      
    3. 计算任务的下次执行时间 与 当前时间的时间差,以时间差作为缓存失效期 将任务存入 redis 缓存;

      /**
       * 缓存任务
       *
       * @param taskInfo 任务对象
       * @throws IllegalArgumentException 缓存时长无效
       */
      public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException {
          // 计算任务缓存有效时长
          long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND);
          if (expire <= 0) {
              throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}"
                      , DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN)
                      , DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN)));
          }
      
          redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo);
          redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire);
          taskInfo.appendLog("任务存入 redis ,缓存时长 {} 秒", expire);
      }
      
    4. 通过监听 redis 的 key 失效事件,获得待执行的任务;

      public class RedisKeyExpiredListener implements MessageListener {
      
          private final JobExecutor jobExecutor;
      
          @Override
          public void onMessage(Message message, byte[] pattern) {
              String expiredKey = message.toString();
              if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) {
                  String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length());
                  jobExecutor.notify(taskId);
              }
          }
      }
      
    5. 将任务加入待执行的线程池,随后即可执行

      /**
       * 任务存入执行队列
       *
       * @param taskInfo 任务
       */
      private void executeJob(TaskInfo taskInfo) {
          taskInfo.appendLog("任务存入执行队列");
          Runnable runnable = new JobThread(taskInfo);
          getThreadPoolExecutor().execute(runnable);
      }
      
    6. 根据任务类型分别执行提供数据消费数据流程;

      1. 提供数据

        1. 调用应用 API ,获取 json 格式数据;
        2. 根据任务中字段映射 解析 json 为业务数据 Map 集合;
        3. 调用数据服务 将业务数据存入 MongoDB ;
        case MdConstant.DATA_PRODUCER:
            // 调用 api 获取 json
            String json = ApiUtil.read(taskInfo);
            // 将 json 按字段映射 解析为业务数据
            jobDataService.parseData(taskInfo, json);
            // 根据条件过滤数据
            jobDataFilterService.doFilter(taskInfo);
            // 保存业务数据
            jobDataService.saveTaskData(taskInfo);
            // 更新环境变量
            jobVarService.saveVarValue(taskInfo, json);
        
            break;
        
      2. 消费数据

        1. 根据任务所选数据标准,查询业务数据;
        2. 再根据字段映射,将业务数据 转为指定的 json 对象集合;
        3. 调用应用 API ,传输 json 数据;
        case MdConstant.DATA_CONSUMER:
            List<BizDataFilter> filters = taskInfo.getDataFilters();
            if (CollUtil.isNotEmpty(filters)) {
                // 解析过滤条件值中的 自定义字符串
                parseFilterValue(filters);
                // 排除值为 null 的条件
                filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
            }
            // 根据过滤条件 查询数据
            String dataCode = taskInfo.getDataCode();
            if (StrUtil.isNotEmpty(dataCode)) {
                List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
                taskInfo.setConsumeDataList(dataList);
                // 根据字段映射转换为 api 参数
                jobDataService.convertData(taskInfo);
            }
            // 调用 api 传输数据
            ApiUtil.write(taskInfo);
            break;
        
    7. 保存任务执行日志;

    目前尚无回复
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   1729 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 22ms · UTC 16:33 · PVG 00:33 · LAX 08:33 · JFK 11:33
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.