目 录CONTENT

文章目录

Spring Batch 学习分享

码农街
2024-04-09 / 0 评论 / 1 点赞 / 200 阅读 / 19883 字 / 正在检测是否收录...

介绍

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

Spring Batch 使用内存缓冲机制,将读取的数据记录暂存于内存中,然后批量处理这些数据。通过减少对磁盘或数据库的频繁访问,内存缓冲可以提高读取和处理的效率,而且Spring Batch 提供了批量读取的机制,允许一次性读取和处理多个数据记录,这两点都减轻 I/O 压力。

业务方案

  1. 企业消息驱动处理;

  2. 自动化地处理大批量复杂的数据,如月结计算;

  3. 重复性地处理大批量数据,如费率计算;

  4. 消息驱动处理,充当内部系统和外部系统的数据纽带;

  5. 中间需要对数据进行格式化,校验,转换处理等;

  6. 那么还能用到哪些业务场景上呢?

架构图

任务启动器 Job Launcher 负责运行Job,任务存储仓库Job Repository存储着Job的执行状态,参数和日志等信息。Job处理任务又可以分为三大类:

  • 数据读取 Item Reader

  • 数据中间处理 Item Processor

  • 数据输出 Item Writer。

JobLauncher

任务启动器 Job Launcher 负责运行Job

Job

Spring Batch里最基本的单元就是任务Job,一个Job由若干个步骤Step组成。是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job所定义的内容。

Job相关概念如下:

  1. Job:封装处理实体,定义过程逻辑。

  2. JobInstance:Job的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。

  3. JobParameters:与JobInstance相关联的参数。

  4. JobExecution:代表Job的一次实际执行,可能成功、可能失败。

Step

Step是对Job某个过程的封装,一个Job可以包含一个或多个Step,一步步的Step按特定逻辑执行,才代表Job执行完成 。

定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item Reader、Item Processor和Item Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。

  1. Item Reader

  1. Item Processor

  1. Item Write

JobRepository

对整个Job的新增、更新、执行进行记录,存储着Job的执行状态,参数和日志等信息。

准备

  1. 创建一个 Springboot 项目

这里我就不多讲了

  1. 添加依赖

dependencies {
    implementation 'org.springframework.boot:spring-boot-starter-batch'
    implementation 'org.springframework.boot:spring-boot-starter-jdbc'
    implementation 'mysql:mysql-connector-java:8.0.33'
    implementation 'com.baomidou:mybatis-plus-boot-starter:3.5.4'
    compileOnly 'org.projectlombok:lombok'
    annotationProcessor 'org.springframework.boot:spring-boot-configuration-processor'
    annotationProcessor 'org.projectlombok:lombok'
    testImplementation 'org.springframework.boot:spring-boot-starter-test'
    testImplementation 'org.springframework.batch:spring-batch-test'
}
  1. 数据库初始化

任务存储仓库支持多种结构化和非结构化数据库。

可以选择通过yaml配置进行初始化

spring:
  batch:
    jdbc:
      # 设置数据库表前缀
      table-prefix: test_
      # 初始化schema
      initialize-schema: always
      platform: mysql
      # 事务等级
      isolation-level-for-create: default

也可以自己选择执行:

我这里使用mysql,所以把 schema-mysql.sql 导入到数据库中。导入后,库表如下图所示:

  1. application.yml 配置

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/springbatch
    username: test
    password: 123456
  batch:
    job:
      # 启动服务的时候是否启动JOB
      enabled: false
      # 指定需要启动的JOB
      #names:
      #  - ""
    jdbc:
      # 设置数据库表前缀
      table-prefix: test_
      # 初始化schema
      initialize-schema: always
      platform: mysql
      # 事务等级
      isolation-level-for-create: default
  1. @EnableBatchProcessing

接着在Spring Boot的入口类上添加@EnableBatchProcessing注解,表示开启Spring Batch批处理功能:

@EnableBatchProcessing
@SpringBootApplication
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

至此,基本框架搭建好了,下面开始进行一些测试吧。

测试DEMO

一、入门实践

  1. 简单任务

  1. 多步骤任务

  1. Flow任务

  1. 并行执行

  1. 任务决策器

  1. 任务嵌套

二、读取数据

读取实现类参考:https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/appendix.html#itemReadersAppendix

  1. 简单数据读取

  1. 文本数据读取

  1. 数据库数据读取

  1. XML数据读取

  1. JSON数据读取

  1. 多文本数据读取

三、输出数据

输出实现类参考:https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/appendix.html#itemWritersAppendix

  1. 输出文本数据

  1. 输出XML数据

  1. 输出JSON数据

  1. 输出数据到数据库

  1. 多文本输出

四、处理数据

  1. 格式校验

  1. 数据过滤

  1. 数据转换

  1. 聚合处理

五、监听器

每种监听器都可以通过两种方式使用:

  1. 接口实现;

  2. 注解驱动。

使用注解标注的方法名必须和注解一致

六、异常处理

  1. 异常重试

private Step step() {
        return stepBuilderFactory.get("step")
                .<String, String>chunk(2)
                .reader(listItemReader())
                .processor(myProcessor())
                .writer(list -> list.forEach(System.out::println))
                // 配置错误容忍
                .faultTolerant() 
                // 配置重试的异常类型
                .retry(MyJobExecutionException.class) 
                // 重试3次,三次过后还是异常的话,则任务会结束,
                .retryLimit(3) 
                // 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常重试
                .build();
    }
  1. 异常跳过

  private Step step() {
        return stepBuilderFactory.get("step")
                .<String, String>chunk(2)
                .reader(listItemReader())
                .processor(myProcessor())
                .writer(list -> list.forEach(System.out::println))
                // 配置错误容忍
                .faultTolerant() 
                 // 配置跳过的异常类型
                .skip(MyJobExecutionException.class)
                // 最多跳过1次,1次过后还是异常的话,则任务会结束,
                .skipLimit(1) 
                // 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
                .build();
    }

异常监听

@Component
public class MySkipListener implements SkipListener<String, String> {
    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println("在读取数据的时候遇到异常并跳过,异常:" + t.getMessage());
    }

    @Override
    public void onSkipInWrite(String item, Throwable t) {
        System.out.println("在输出数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
    }

    @Override
    public void onSkipInProcess(String item, Throwable t) {
        System.out.println("在处理数据的时候遇到异常并跳过,待输出数据:" + item + ",异常:" + t.getMessage());
    }
}
    @Autowired
    private MySkipListener mySkipListener;
    
    private Step step() {
        return stepBuilderFactory.get("step")
                .<String, String>chunk(2)
                .reader(listItemReader())
                .processor(myProcessor())
                .writer(list -> list.forEach(System.out::println))
                .faultTolerant() // 配置错误容忍
                // 配置跳过的异常类型
                .skip(MyJobExecutionException.class) 
                // 最多跳过1次,1次过后还是异常的话,则任务会结束
                .skipLimit(1) 
                // 异常的次数为reader,processor和writer中的总数,这里仅在processor里演示异常跳过
                .listener(mySkipListener)
                .build();
    }
  1. 事务问题

一次Step分为Reader、Processor和Writer三个阶段,这些阶段统称为Item。默认情况下如果错误不是发生在Reader阶段,那么没必要再去重新读取一次数据。但是某些场景下需要Reader部分也需要重新执行,比如Reader是从一个JMS队列中消费消息,当发生回滚的时候消息也会在队列上重放,因此也要将Reader纳入到回滚的事物中,根据这个场景可以使用readerIsTransactionalQueue()来配置数据重读:

private Step step() {
    return stepBuilderFactory.get("step")
            .<String, String>chunk(2)
            .reader(listItemReader())
            .writer(list -> list.forEach(System.out::println))
            // 消息队列数据重读
            .readerIsTransactionalQueue() 
            .build();
}

我们还可以在Step中手动配置事务属性,事物的属性包括隔离等级(isolation)、传播方式(propagation)以及过期时间(timeout)等:

private Step step() {
    DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
    attribute.setPropagationBehavior(Propagation.REQUIRED.value());
    attribute.setIsolationLevel(Isolation.DEFAULT.value());
    attribute.setTimeout(30);

    return stepBuilderFactory.get("step")
            .<String, String>chunk(2)
            .reader(listItemReader())
            .writer(list -> list.forEach(System.out::println))
            .transactionAttribute(attribute)
            .build();
}
  1. 重启机制

默认情况下,任务执行完毕的状态为COMPLETED,再次启动项目,该任务的Step不会再执行,我们可以通过配置allowStartIfComplete(true)来实现每次项目重新启动都将执行这个Step:

private Step step() {
    return stepBuilderFactory.get("step")
            .<String, String>chunk(2)
            .reader(listItemReader())
            .writer(list -> list.forEach(System.out::println))
            .allowStartIfComplete(true)
            .build();
}

某些Step可能用于处理一些先决的任务,所以当Job再次重启时这Step就没必要再执行,可以通过设置startLimit()来限定某个Step重启的次数。当设置为1时候表示仅仅运行一次,而出现重启时将不再执行:

private Step step() {
    return stepBuilderFactory.get("step")
            .<String, String>chunk(2)
            .reader(listItemReader())
            .writer(list -> list.forEach(System.out::println))
            .startLimit(1)
            .build();
}
  1. 关闭重启机制

要关闭Spring Batch启动项目自动运行任务的机制,需要在项目配置文件application.yml中添加如下配置:

spring:
  batch:
    job:
      enabled: false

七、API调度

  1. JobLauncher 任务创建启动

  1. JobOperator 任务操作

1

评论区