数据批量处理优化之 Spring Batch


在企业级生产场景中数据的批量处理往往是一个具有广泛性的需求,例如用户数据的导入导出,内部数据的迁移,业务数据的统计计算等,这样的数据往往都是较大体量,需要从一种形式转换为另一种形式,或是从一个数据源迁移到另一个目的地。显然这一类问题是具有高度抽象性的,那就意味着我们可以抽象出一些关键组件解决这一类问题,我了解到 Spring Batch 正是这一类问题的解决方案,与我当时的实现对比,我发现有些设计想法颇为相似,当然 Spring Batch 在数据处理的容错性、并发性、扩展性、复用性上有更深入的考虑。我发现当时的实现有些重复造轮子的感觉,不过每一次造轮子总能让自己有所收获,同时从 Spring Batch 的设计中也可以反思自己在实现过程中的缺陷。

Spring Batch 整体架构设计


在我们设计一个框架时,首先我们需要规划其整体架构,由简入繁,由抽象到具体,我们需要针对一个问题抓到其本质,并将其抽象为程序语言来表达,下图是 Spring 官方文档中对 Spring Batch 模型的整体描绘,可以帮助我们理解 Spring Batch 的运作机制。

Spring Batch 架构模型

  • Job 是对一个批处理任务的抽象,代表着我们执行任务的最大粒度。
  • Job 代表的是我们所定义一类具体化的批处理任务,例如订单导出任务,在实际的场景中一个批处理任务往往具备一定筛选条件,例如我想导出2020年10月的订单,也就是我需要给 Job 设定参数,这些参数就叫做 JobParametersJobLauncher 是任务加载器,它帮助我们将 JobJobParameters 组合起来成为 JobInstanceJobInstance 代表 Job 的一个实例,它的条件已经是确定的。而 JobInstance 又可能被执行多次,例如一次导出任务失败后,系统可能具备重试机制,这时候每一次执行叫做 JobExecution,所以当我们实际执行一个 Job 时,最终我们运行的是 JobExecution 对象。
  • Step 代表 Job 中的每一个步骤,一个 Job 可以只有一个步骤组成,也可以是多个步骤组成,这个取决于我们业务的复杂性,Spring Batch 为我们提供了这样的扩展性。
  • 我们可以把 Step 看成是一个批处理中的子任务,那么一个子任务往往需要三个组成部分,数据的读取(ItemReader),数据的处理(ItemProcessor),数据的写出(ItemWriter),Spring Batch 把一个 Step 切割为这三个部分,可以帮助我们很好地划分一个任务,做到单一责任制,每一个组件可各司其职完成一次任务。
  • JobRepository 是一个任务持久化仓库,在一个任务的执行过程中,我们总是需要记录一些任务的执行进度,执行状态以及执行结果,所以我们需要将这些数据持久化,Spring Batch 为我们提供了不同的实现,例如基于 JDBC 的实现或是单纯基于内存的实现,一般来讲推荐将数据持久化到数据库,保证任务状态的可靠性。

构建一个 Spring Batch 任务


要构建一个 Spring Batch 任务,我们需要由外而内地去分解问题,例如我想构建一个 Job,首先我需要分析它应该由几个 Step 组成,每一个 Step 分别承载什么逻辑。Spring Batch 支持使用 Java Bean Configuration 的方式构建任务,以代码为例:

@Configuration
@EnableBatchProcessing
public class BatchDemoConfiguration {

	@Resource
    private ThreadPoolTaskExecutor batchTaskExecutor;
    @Resource
    private JobBuilderFactory jobBuilderFactory;
    @Resource
    private StepBuilderFactory stepBuilderFactory;

	@Bean
    public Job parallelJob(Step demoSecondStep) {
        return jobBuilderFactory
                .get("parallelJob")
                .start(demoSecondStep)
                .build();
    }

	@Bean
    public Step demoSecondStep(DemoInputItemReader demoInputItemReader,
                               DemoInputOutputItemProcessor demoInputOutputItemProcessor,
                               EasyExcelItemWriter<DemoOutput> easyExcelItemWriter) {
        return stepBuilderFactory
                .get("demoSecondStep")
                .<DemoInput, DemoOutput>chunk(500)
                .reader(demoInputItemReader)
                .processor(demoInputOutputItemProcessor)
                .writer(easyExcelItemWriter)
                .taskExecutor(batchTaskExecutor)
                .throttleLimit(batchTaskExecutor.getMaxPoolSize())
                .build();
    }

	@StepScope
    @Bean
    public DemoInputItemReader demoInputItemReader() {
        DemoInputItemReader demoInputItemReader = new DemoInputItemReader();
        demoInputItemReader.setName("demoInputItemReader");
        demoInputItemReader.setPageSize(1000);
        demoInputItemReader.setTotalPage(100);
        return demoInputItemReader;
    }

    @StepScope
    @Bean
    public DemoInputOutputItemProcessor demoInputOutputItemProcessor() {
        return new DemoInputOutputItemProcessor();
    }

    @StepScope
    @Bean
    public EasyExcelItemWriter<DemoOutput> easyExcelItemWriter(@Value("#{jobParameters['writePath']}") String writePath,
                                                               @Value("#{jobParameters['template']}") String template) {
        EasyExcelItemWriter<DemoOutput> easyExcelItemWriter = new EasyExcelItemWriter<>();
        easyExcelItemWriter.setResource(new FileSystemResource(writePath));
        easyExcelItemWriter.setTargetClz(DemoOutput.class);
        easyExcelItemWriter.setMaxSheetLines(10000);
        return easyExcelItemWriter;
    }
    
}
  • 我需要构建一个 Job 名为 parallelJob,它只有一个 Step 组成名为 demoSecondStep,所以首先需要构建这个 Step 子任务。
  • 把这个 Stepchunk 设定为 500,这里的 chunk 意为数据块的大小,在程序设计中,数据的块状驱动是一种很重要的思想,例如当我们想把一组数据写入磁盘时,我们总是更倾向于先将数据写入某个 buffer 缓冲区,缓冲区积累一部分数据后再进行一次刷盘操作,这样的效率往往是更高的,这个缓冲区的大小就相当于这里的 chunk,至于 chunk 设定为多少合适取决于我们的系统情况,一般来讲当一次写出操作比较消耗资源时,我们倾向于将 chunk 设定得大一些,尽可能减少一些写入的执行次数,但是 chunk 设定得过大也意味着降低内存利用率,整体来讲要根据情况综合考虑。
  • 在这个例子中 demoSecondStepDemoInputItemReaderDemoInputOutputItemProcessorEasyExcelItemWriter 三部分组成,数据从 reader 中读取成功后,会立刻进入 processor 中间处理,之后放入缓冲区,直到 chunk 被写满后进行一次 writer 操作。
  • Step 支持多线程并发执行,在构建 Step 可以设定一个 TaskExecutor 类型的执行器,在这里我将 ThreadPoolTaskExecutor 基于线程池实现的执行器传入后,意味着整个 Step 过程会交给线程池执行,一般来说如果 Step 中具有大量的 IO 密集型操作,同时服务器的配置是多核 CPU,那么并发带来的提升是极其显著。不过不得不提的是并发意味着我们需要保证 readerprocessorwriter 这些组件必须是线程安全的,这给我们的代码设计带来更大的复杂性,在决定是否使用并发时我们需要根据其必要性,投入产出比等因素多方位考量。
  • StepScope 注解将 readerprocessorwriterbean 生命周期与 Step 捆绑在一起,这样的作用域可以让我们注入 JobStep 上下文中的参数,同时也有助于提高资源利用率,因为在一个 Step 开始时相关的 bean 才会被实例化。

结合 EasyExcel 扩展 Spring Batch


在数据的批处理需求中,针对文件的读取写入往往尤为普遍,在我工作的实际场景中也是如此,因此以 Excel 文件的读写为例,我针对 Spring Batch 中的 ItemReaderItemWriter 做了相应扩展,其中对于 Excel 文件的读写操作基于 EasyExcel 提供的 API,这个例子可以展示 Spring Batch 如何帮助我们方便地构建批处理任务,同时通过自定义相应的组件,我们可以高度定制个性化的业务任务,达到代码的高度扩展性及复用性。

自定义 ItemReader 之 EasyExcelItemReader

package priv.just.framework.batch.reader;

import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelReader;
import com.alibaba.excel.context.AnalysisContext;
import com.alibaba.excel.event.AnalysisEventListener;
import com.alibaba.excel.read.metadata.ReadSheet;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.item.ItemReader;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;

import javax.annotation.concurrent.ThreadSafe;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

/**
 * EasyExcel 扩展 Spring Batch {@link ItemReader}
 * 支持多线程并发读取
 * 支持读取 excel 文件中的多个 sheet
 * 支持根据偏移量 {@link #sheetIndexes} 或名称 {@link #sheetNames} 过滤 sheet
 * @param <T> 目标读取类型
 * @author Ethan Zhang          
 */
@Slf4j
@Getter
@Setter
@ThreadSafe
public class EasyExcelItemReader<T> implements ItemReader<T>, InitializingBean, DisposableBean {

    private Resource resource;
    private Class<? extends T> targetClz;
    private EasyExcelItemReaderListener listener;
    private BlockingQueue<T> buffer;
    private int bufferSize = Integer.MAX_VALUE;
    private ExcelReader reader;
    private boolean finished;
    private AtomicInteger linesReaded = new AtomicInteger(0);
    private Set<Integer> sheetIndexes = new HashSet<>();
    private Set<String> sheetNames = new HashSet<>();
    private AtomicInteger sheetNum;

    @Override
    public T read() throws InterruptedException {
        if (finished && buffer.size() == 0) {
            return null;
        }
        linesReaded.incrementAndGet();
        return buffer.poll(1, TimeUnit.MINUTES);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(resource, "The resource must be set!");
        Assert.notNull(targetClz, "The target class must be set!");
        buffer = new LinkedBlockingQueue<>(bufferSize);
        listener = new EasyExcelItemReaderListener();
        reader = EasyExcel.read(resource.getFile(), targetClz, listener).build();
        List<ReadSheet> readSheets = reader.excelExecutor().sheetList();
        if (CollectionUtils.isNotEmpty(sheetIndexes) || CollectionUtils.isNotEmpty(sheetNames)) {
            readSheets = readSheets.stream()
                    .filter(readSheet -> sheetIndexes.contains(readSheet.getSheetNo())
                            || sheetNames.contains(readSheet.getSheetName()))
                    .collect(Collectors.toList());
        }
        Assert.notEmpty(readSheets, "The resource file must have at least 1 sheet!");
        sheetNum = new AtomicInteger(readSheets.size());
        reader.read(readSheets);
    }

    @Override
    public void destroy() {
        if (Objects.nonNull(reader)) {
            reader.finish();
        }
    }

    private class EasyExcelItemReaderListener extends AnalysisEventListener<T> {

        @Override
        public void invoke(T data, AnalysisContext context) {
            try {
                buffer.put(data);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void doAfterAllAnalysed(AnalysisContext context) {
            if (sheetNum.decrementAndGet() == 0) {
                finished = true;
            }
        }

        @Override
        public void onException(Exception exception, AnalysisContext context) throws Exception {
            log.error("EasyExcelItemReaderListener execute error!", exception);
            finished = true;
            throw exception;
        }
        
    }

}
  • resource 代表所读取的资源,这里利用了 Spring 对资源的抽象。
  • targetClz 资源读取后转换成的 Java Bean 类型。
  • listenerEasyExcel 中异步读取 excel 的监听器。
  • buffer 阻塞式队列,数据读取后先写入这个缓冲区。
  • bufferSize 数据缓冲区大小限制,主要为了避免 OOM
  • readerEasyExcel 中的 excel 读取对象。
  • finished 标示本次读取是否完成,在 excel 中最后一行数据被读取成功后将其置为 true
  • linesReaded 用于实时记录当前被读取的行数。
  • sheetIndexes 根据偏移量过滤需要被读取的 sheet,默认读取文件中所有 sheet
  • sheetNames 根据 sheet 名称过滤需要被读取的 sheet
  • sheetNum 倒计数器,被递减为 0 后表示所有 sheet 读取完毕。

EasyExcelItemReader 的初始化过程中,表示一个 Step 已经开始执行,此时需要初始化所有读取 excel 所需要的资源,并且执行 EasyExcel#read 方法开始读取数据,此时被读取的数据会通过 EasyExcelItemReaderListener 被暂存到 buffer 缓冲区中,若 EasyExcelItemReader#read 方法被 Spring Batch 框架调用,此尝试从缓冲区中获取数据,若读取较慢,则阻塞等待最多一分钟。BlockingQueue 类型的缓冲区将 EasyExcel的读取过程和 Spring Batch 的读取过程隔离开来,同时保证了 Spring Batch 使用并发读取时线程安全。

自定义 ItemWriter 之 EasyExcelItemWriter

package priv.just.framework.batch.writer;

import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelWriter;
import com.alibaba.excel.write.metadata.WriteSheet;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
 * EasyExcel 扩展 Spring Batch {@link ItemWriter}
 * 支持三种写入模式
 * 常规 {@link WriteMode#NORMAL}
 * 模块 {@link WriteMode#TEMPLATE}
 * 填充 {@link WriteMode#FILL}
 * 常规模式下支持数据分片写入多个工作表 {@link #sheets}
 * 每个工作表最多写入行数 {@link #maxSheetLines}
 * 一个工作表写满后 {@link #ensureSheetSize()} 自动扩容
 * @param <T> 目标写入类型
 * @author Ethan Zhang
 */
@Slf4j
@Getter
@Setter
@ThreadSafe
public class EasyExcelItemWriter<T> implements ItemWriter<T>, InitializingBean, DisposableBean {

    private Resource resource;
    private ExcelWriter writer;
    private List<WriteSheet> sheets;
    private AtomicInteger curSheetIndex = new AtomicInteger(-1);
    private AtomicInteger linesWritten = new AtomicInteger(0);
    private String sheetNamePrefix = "sheet-";
    private int maxSheetLines = Integer.MAX_VALUE;
    private Class<? extends T> targetClz;
    private Resource template;
    private WriteMode writeMode = WriteMode.NORMAL;
    private AtomicReference<WriteSheet> curSheet = new AtomicReference<>();

    @Override
    public synchronized void write(List<? extends T> items) {
        switch (writeMode) {
            case NORMAL:
                ensureSheetSize();
                writer.write(items, curSheet.get());
                break;
            case TEMPLATE:
                writer.write(items, curSheet.get());
                break;
            case FILL:
                writer.fill(items, curSheet.get());
                break;
            default:
                break;
        }
        linesWritten.addAndGet(items.size());
    }

    @Override
    public void afterPropertiesSet() throws IOException {
        Assert.notNull(resource, "The resource must be set!");
        Assert.notNull(targetClz, "The target class must be set!");
        switch (writeMode) {
            case NORMAL:
                writer = EasyExcel.write(resource.getFile(), targetClz).build();
                break;
            case TEMPLATE:
            case FILL:
                Assert.notNull(template, "The template must be set!");
                Assert.isTrue(template.getFile().exists(), "The template file must exists!");
                writer = EasyExcel.write(resource.getFile(), targetClz).withTemplate(template.getFile()).build();
                break;
            default:
                break;
        }
        sheets = new ArrayList<>();
        ensureSheetSize();
    }

    @Override
    public void destroy() {
        if (Objects.nonNull(writer)) {
            writer.finish();
        }
    }

    private void ensureSheetSize() {
        if (linesWritten.get() >= sheets.size() * maxSheetLines) {
            synchronized (this) {
                if (linesWritten.get() >= sheets.size() * maxSheetLines) {
                    curSheetIndex.incrementAndGet();
                    WriteSheet writeSheet = EasyExcel.writerSheet(curSheetIndex.get(), sheetNamePrefix + curSheetIndex.get()).build();
                    sheets.add(writeSheet);
                    curSheet.set(sheets.get(curSheetIndex.get()));
                }
            }
        }
    }

    public enum WriteMode {
        NORMAL,
        TEMPLATE,
        FILL
    }

}
  • resource 目标写入资源,一般为系统稳健资源。
  • writerEasyExcel 中的 excel 写入对象。
  • sheets 被写入的 sheet 集合,根据需要自动扩容。
  • curSheetIndex 当前被写入的 sheet 偏移量,逐步递增。
  • linesWritten 记录当前已被写入的行数。
  • sheetNamePrefix 自动生成的 sheet 名称前缀,可自定义。
  • maxSheetLines 限制单个 sheet 所能容纳的最大行数,写入时超出该行数则自动切换 sheet,实现 sheet 的自动扩容。
  • targetClz 代表写入资源的目标 Java Bean 类型。
  • writeMode 同于标示 EasyExcel 所支持的三种写入模式,分别为常规模式,模板模式以及填充模式。
  • template 模板资源,在模板模式以及填充模式下必须提供,一般为 classpath 下的文件资源。

EasyExcelItemWriter 的初始化过程中,首先根据写入模式的不同分别校验并初始化所需要的资源,随后调用 ensureSheetSize 方法进行首次扩容,创建第一个 sheet,在 Spring Batch 的写入过程中,根据写入模式的不同分别调用 EasyExcel 中相应的 API,由于 ExcelWriter#write 方法本身不是线程安全的,暂时采用加锁同步写入的方式,考虑到文件的写入可能需要保持数据的顺序性,且数据的写入过程往往并不是一次数据批处理任务中的性能瓶颈所在,还是建议使用同步方式保持问题的简单化。


文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论
 上一篇
Spring Cache 缓存抽象 Spring Cache 缓存抽象
要构建一个高性能高吞吐量的系统,缓存是我们必须考虑的问题。几乎可以这样说,任何一个系统都需要 N 个缓存层来解决数据交互中的 IO 问题,因为磁盘 IO 和内存 IO 在耗时上不是一个数量级的,我们总是更倾向于去读写内存而非磁盘,但内存资
2020-11-17 Ethan Zhang
下一篇 
RocketMQ 与 Spring 生态的融合 RocketMQ 与 Spring 生态的融合
RocketMQ 作为阿里开源的一款优秀的消息中间件,已经成为很多互联网公司构建其分布式系统的重要组成部分。同时 RocketMQ 成为了 Apache 开源项目,意味着该项目越来越受到开源社区的关注,也意味着更多的人加入到对 Rocke
2020-10-20 Ethan Zhang