数据批量处理优化


整体概要设计


批量导出设计

  • 针对每一个导出请求,构建 ExportTask 导出任务。
  • 针对每一个导出任务,对其分配 BlockingQueue 阻塞式队列。(该队列主要用于生产者任务与消费者之间的缓冲,解耦两者逻辑,避免生产者线程与消费者线程串行相互等待,同时该队列具有最大容量限制,避免内存溢出)
  • 针对每一个导出任务,构建 ProducerTask 生产者任务,该任务负责从数据库中拉取原始数据,之后对数据进行组装,最后将结果数据排序后入队。(本功能要求导出数据有序)
  • 针对每一个导出任务,构建 ConsumerTask 消费者任务,该任务负责不断从队列中取出结果数据,将结果数据写入结果文件中。
  • 针对一个 ProducerTask 生产者任务,对其分配 N 个 ProcessTask 数据组装任务,并行处理,将原始数据组装为可导出的结果数据。(在本功能中此过程占据主要耗时,是性能瓶颈所在,故对其分配主要线程资源)

整体优化思路


  • 在此实现中 BlockingQueue 的使用至关重要,作为 JDK 给我们提供的阻塞式队列,此队列在异步编程中具有广发的使用价值(例如线程池 ThreadPoolExecutor 中的 workQueue 队列也是基于此实现)。在本次重构中,此队列的主要作用在于作为生产者线程与消费者线程间的缓冲区,生产者只管写入数据,无需关心消费者进度,消费者只管消费数据,无需关心生产者进度,但是生产消费速度本身不可控,两者无法彻底协调,因此队列必须是阻塞式的。假如队列满,生产者线程进入阻塞状态,消费者继续消费,假如队列空,消费者线程进入阻塞状态,生产者继续生产。在本实现中,由于生产者缓慢而消费者迅速,所以采用多线程写入,单线程取出的模型,尽可能协调两者速率,避免长时间相互等待。

  • 众所周知,我们可以把程序分为三种类型,计算密集型、内存密集型和 I/O密集型。在实际场景中,一个程序往往同时包含了这三种操作,但是其比重是不同的,一个程序必然具有某一种偏向。一般来讲,一个程序中 I/O 操作越频繁,那么我们可以说这个程序偏向于 I/O 密集型,对于这一类程序,多线程所带来的吞吐量提升往往是革命性的。原因很简单,大量的网络 I/O 或磁盘 I/O 必然导致线程频繁被挂起,被挂起的线程是不需要占用 CPU 时间片的,如果这时候程序中并没有其他并发任务需要被执行,那么相当于 CPU 被闲置了,性能没有得到充分压榨,从大局来看,也就意味着整个系统的吞吐量下降了。在本示例中,ProcessTask 所承担的职责是将数据库原始数据转换为可导出的结果数据,在此过程中需要请求大量接口获取所需数据(大量网络 I/O 操作),每一次接口请求都存在响应时间,线程被挂起,但采用多线程并发处理情况则不用,若 A 线程被挂起,CPU 依然可以切换到 B 线程进行计算操作,在线程上下文切换的过程中 CPU 整体的利用率是大幅提升的,在现代多核 CPU 下提升则更为明显。

  • 如何有效控制单个请求的内存占用量,大量的软件问题本质是对于时间复杂度和空间复杂度的权衡,在此示例中,无论一个请求需要导出几百量级的数据还是十万级的数据量,它所能占用的内存应该是有一个最大限度的。不能因为一个请求的失控触发OOM导致服务器宕机。因此这里将 BlockingQueue 的作用域和生命周期限定于一个请求中,不采用多请求共享的方式,队列随请求生而生,请求结束,无论成功与否,该请求所占用的资源都必须被释放。

编码实现


ExportTask

package com.wanshifu.export.common;

import com.wanshifu.framework.core.BusException;
import com.wanshifu.user.importapi.domains.emums.ExportType;
import com.wanshifu.user.importapi.domains.vo.export.ExportContext;
import com.wanshifu.utils.ApplicationContextUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.io.File;
import java.time.Instant;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

/**
 * 批量导出任务,一个导出任务拆分为
  * 生产者任务 {@link AbstractProducerTask} 可并发生产者任务 {@link AbstractConcurrentProducerTask}
 * 消费者任务 {@link AbstractConsumerTask}
 * 数据中间处理任务 {@link AbstractProcessTask}
 * 阻塞式队列作为中间缓冲 {@link BlockingQueue}
 * 数据源(DB) -> 生产者(producer) -> 数据中间处理(process) -> 队列(queue) -> 消费者(consumer) -> 结果文件(file)
  * @author zhangyifan@wshifu.com
 */ 
 @Getter 
 @Slf4j 
 public abstract class AbstractExportTask<T extends Exportable> implements Supplier<File> {

    /**
 * 队列容量
  */
  private static final int DEFAULT_QUEUE_CAPACITY = 1 << 10;

    /**
 * 最大等待时间(30分钟)
  */
  private static final int DEFAULT_MAX_EXPORT_MILLIS = (int) TimeUnit.SECONDS.toMillis(30);

    /**
 * 导出上下文
  */
  private volatile ExportContext exportContext;

    /**
 * 阻塞式队列
  */
  private volatile BlockingQueue<T> queue;

    /**
 * 生产者线程
  */
  private AbstractProducerTask<T> producer;

    /**
 * 消费者线程
  */
  private AbstractConsumerTask<T> consumer;

    /**
 * 结果文件
  */
  private volatile File file;

    /**
 * 线程池
  */
  private ThreadPoolTaskExecutor executor;

    /**
 * 缓存管理
  */
  private CacheManager cacheManager;

    public AbstractExportTask(long userId, long exportId, ExportType exportType, ExportContext exportContext, ThreadPoolTaskExecutor executor) {
        exportContext.setStart(Instant.now());
        int exportCount = getExportCount(exportContext);
        if (exportCount < 1) {
            throw new BusException("520", "没有查询到数据,无法导出");
        }
        exportContext.start(userId, exportId, exportType, exportCount);
        this.exportContext = exportContext;
        this.executor = executor;
        this.cacheManager = ApplicationContextUtil.getBean(CacheManager.class);
        this.queue = new ArrayBlockingQueue<>(Math.min(exportCount, DEFAULT_QUEUE_CAPACITY));
        this.file = generateFile(exportContext);
        this.producer = generateProducer(queue, exportContext, executor);
        this.consumer = generateConsumer(queue, exportContext, file);
    }

    @Override
  public File get() {
        preExport(exportContext);
        FutureTask<Boolean> producerTask = new FutureTask<>(producer);
        FutureTask<Boolean> consumerTask = new FutureTask<>(consumer);
        executor.execute(producerTask);
        executor.execute(consumerTask);
        /*
 * 此处阻塞,等待生产者消费者线程结束,最大等待时间:30分钟  * 超时则视为失败,避免导出长时间无结果  * 一旦生产者或消费者线程抛出异常,此处可以捕获,判定该次导出失败  */  try {
            producerTask.get(DEFAULT_MAX_EXPORT_MILLIS, TimeUnit.MILLISECONDS);
            consumerTask.get(DEFAULT_MAX_EXPORT_MILLIS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            throw new BusException("export producer/consumer error");
        }
        try {
            exportContext.getExportCountDown().await(DEFAULT_MAX_EXPORT_MILLIS, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            throw new BusException("export timeout (over 30 minutes)");
        }
        exportContext.finish();
        return postExport(file);
    }

    /**
 * 获取预期耗时(毫秒数)
  */
  public int getExpectedCostMillis() {
        return (int) (exportContext.getExportCount() * cacheManager.getUnitCost(exportContext.getExportType()));
    }

    /**
 * 批量导出前置处理
  */
  protected void preExport(ExportContext exportContext) {}

    /**
 * 批量导出后置处理
  */
  protected File postExport(File file) {
        return file;
    }

    /**
 * 获取批量导出总数
  */
  protected abstract int getExportCount(ExportContext exportContext);

    /**
 * 生成结果文件
  */
  protected abstract File generateFile(ExportContext exportContext);

    /**
 * 构建生产者任务
  * @param queue 队列
  * @param exportContext 导出上下文
  * @param executor 线程池
  */
  protected abstract AbstractProducerTask<T> generateProducer(BlockingQueue<T> queue, ExportContext exportContext, ThreadPoolTaskExecutor executor);

    /**
 * 构建消费者任务
  * @param queue 队列
  * @param exportContext 导出上下文
  * @param file 结果文件
  */
  protected abstract AbstractConsumerTask<T> generateConsumer(BlockingQueue<T> queue, ExportContext exportContext, File file);

}
  • executor:线程池
  • exportCount:导出总数据量
  • exportCountDown:导出任务倒计数器(每写入文件一条数据则递减,为零时代表批量导出任务完成)
  • queue:阻塞式队列
  • producer:生产者任务
  • consumer:消费者任务
  • file:结果文件
  • start:开始时间戳(用于计算任务总耗时)
  • conditions:数据查询条件(取决于具体业务)

Exportable 类似于标记性接口,没有实质内容,用于表示可导出对象,是 ExportTask 的目标结果。exportCountDown.await() 方法在批量导出任务未完成时会被阻塞,但是设定了最大等待时间,代表着业务所能接受的最长导出时间。若超时则认为任务失败,释放资源。getRate() 方法可以返回当前导出进度,便于对任务进行监控。generateProducer() 方法用于构建生产者任务,交给具体子类实现,generateConsumer() 方法同理。此处可以理解为是一种模板方法模式(Template Method),在抽象类中进行流程规范,子类进行业务实现。总之 ExportTask 中主要负责初始化资源,以及启动生产者消费者任务。

ProducerTask

package com.wanshifu.export.common;

import com.wanshifu.framework.core.BusException;
import com.wanshifu.user.importapi.domains.vo.export.ExportContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;

/**
 * 生产者任务
  * @author zhangyifan@wshifu.com
 */ 
 @Slf4j 
 @Getter 
 public abstract class AbstractProducerTask<T extends Exportable> implements Callable<Boolean> {

    private volatile BlockingQueue<T> queue;

    private volatile ExportContext exportContext;

    public AbstractProducerTask(BlockingQueue<T> queue, ExportContext exportContext) {
        this.queue = queue;
        this.exportContext = exportContext;
    }

    /**
 * 1.循环调用process()获取单批处理结果
  * 2.根据需要对结果数据进行排序
  * 3.数据依次入队
  */
  @Override
  public Boolean call() {
        try {
            do {
                List<T> exportableList = process().stream().filter(Objects::nonNull).collect(Collectors.toList());
                if (needSort()) {
                    exportableList.sort(getComparator());
                }
                for (T exportable : exportableList) {
                    queue.put(exportable);
                }
                exportableList.clear();
                postProcess();
            } while (hasMore());
            log.info("producer task finish");
            return true;
        } catch (Exception e) {
            log.error("producer task error", e);
            // 一旦生产者线程异常,将倒计数器归零,避免消费者线程长时间阻塞
  while (exportContext.getExportCountDown().getCount() > 0) {
                exportContext.getExportCountDown().countDown();
            }
            throw new BusException("producer task error");
        }
    }

    /**
 * 是否有更多数据
  */
  protected boolean hasMore() {
        return false;
    }

    /**
 * 后置处理
  */
  protected void postProcess() {}

    /**
 * 是否需要排序
  */
  protected boolean needSort() {
        return false;
    }

    /**
 * 获取比较器
  */
  protected Comparator<T> getComparator() {
        return Comparator.comparingInt(Object::hashCode);
    }

    /**
 * 处理单批数据
  */
  protected abstract List<T> process() throws Exception;

}

ProducerTask 中主要承担的职责为当 hasMore() 方法返回为 true 时,调用 process() 方法将一批原始数据处理为 Exportable 类型的结果数据,之后根据需求进行排序,最后入队。此处为抽象化的流程,不捆绑多线程的实现,提供拓展性。

ConcurrentProducerTask

package com.wanshifu.export.common;

import com.wanshifu.user.importapi.domains.vo.export.ExportContext;
import com.wanshifu.utils.ApplicationContextUtil;
import lombok.Getter;
import lombok.Setter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * 并发生产任务,将生产任务拆分为多个子任务,多线程执行后合并结果
  * 扩展 {@link AbstractProducerTask}
 * @author zhangyifan@wshifu.com */ 
 @Getter 
 @Setter 
 public abstract class AbstractConcurrentProducerTask<T extends Exportable> extends AbstractProducerTask<T> {

    /**
 * 默认单批处理数据量
  */
  private static final int DEFAULT_PROCESS_COUNT = 200;

    /**
 * 单批处理最大等待时间
  */
  private static final int DEFAULT_MAX_PROCESS_MILLIS = 5 * 60 * 1000;

    private ThreadPoolTaskExecutor executor;

    private CacheManager cacheManager;

    private int processCount;

    public AbstractConcurrentProducerTask(BlockingQueue<T> queue, ExportContext exportContext, ThreadPoolTaskExecutor executor) {
        super(queue, exportContext);
        this.executor = executor;
        this.cacheManager = ApplicationContextUtil.getBean(CacheManager.class);
        this.processCount = DEFAULT_PROCESS_COUNT;
    }

    /**
 * @see CompletableFuture 任务编排
  * 1.{@link #getProcessTaskList()} 获取子任务列表
  * 2.{@link CompletableFuture#allOf(CompletableFuture[])} 合并子任务
  * 3.{@link CompletableFuture#supplyAsync(Supplier)} 并发执行所有子任务
  * 4.{@link CompletableFuture#join()} 等待所有子任务完成后合并结果返回
  */
  @Override
  protected List<T> process() throws InterruptedException, ExecutionException, TimeoutException {
        List<AbstractProcessTask<T>> processTaskList = getProcessTaskList();
        List<CompletableFuture<T>> futureTaskList = processTaskList.stream().map(processTask ->
                CompletableFuture.supplyAsync(processTask, executor)).collect(Collectors.toList());
        CompletableFuture<?>[] futureTaskArr = new CompletableFuture[futureTaskList.size()];
        CompletableFuture<?>[] completableFutures = futureTaskList.toArray(futureTaskArr);
        CompletableFuture<Void> allFuture = CompletableFuture.allOf(completableFutures);
        CompletableFuture<List<T>> resFuture = allFuture.thenApply(v ->
                futureTaskList.stream().map(CompletableFuture::join).collect(Collectors.toList()));
        return resFuture.get(DEFAULT_MAX_PROCESS_MILLIS, TimeUnit.MILLISECONDS);
    }

    /**
 * 获取子任务列表,具体子任务由子类实现
  */
  protected abstract List<AbstractProcessTask<T>> getProcessTaskList();

}

ConcurrentProducerTask 继承了 ProducerTask,上文说过 ProducerTask 中不捆绑多线程并发实现,所以便在此实现。在单批数据 process() 过程中,针对单条数据,对其构建数据转换任务 ProcessTask,随后将任务统一交给 executor 线程池处理,处理完后返回处理结果。getProcessTaskList() 方法构建子任务列表,交给具体业务实现完成。

ProcessTask

package com.wanshifu.export.common;

import com.wanshifu.user.importapi.domains.vo.export.ExportContext;
import com.wanshifu.utils.ApplicationContextUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.util.function.Supplier;

/**
 * 数据处理任务
  * @author zhangyifan@wshifu.com
 */ 
 @Slf4j 
 @Getter 
 public abstract class AbstractProcessTask<T extends Exportable> implements Supplier<T> {

    private ExportContext exportContext;

    private CacheManager cacheManager;

    public AbstractProcessTask(ExportContext exportContext) {
        this.exportContext = exportContext;
        this.cacheManager = ApplicationContextUtil.getBean(CacheManager.class);
    }

    @Override
  public T get() {
        try {
            return process();
        } catch (Exception e) {
            // 单条数据处理异常,也需要更新进度,避免消费者线程长时间阻塞
  exportContext.getExportCountDown().countDown();
            cacheManager.incrementExportCount(exportContext.getExportId());
            log.error("process task error", e);
        }
        return null;
    }

    /**
 * 处理单位数据
  */
  protected abstract T process() throws Exception;

}

ProcessTask 中逻辑较为简单,将单条原始数据转换为 Exportable 结果数据,具体逻辑交给子类实现。

ConsumerTask

package com.wanshifu.export.common;

import com.wanshifu.framework.core.BusException;
import com.wanshifu.user.importapi.domains.vo.export.ExportContext;
import com.wanshifu.utils.ApplicationContextUtil;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;

/**
 * 消费者任务
  * @author zhangyifan@wshifu.com
 */ 
 @Slf4j 
 @Getter 
 public abstract class AbstractConsumerTask<T extends Exportable> implements Callable<Boolean> {

    private volatile BlockingQueue<T> queue;

    private volatile ExportContext exportContext;

    private volatile File file;

    private CacheManager cacheManager;

    public AbstractConsumerTask(BlockingQueue<T> queue, ExportContext exportContext, File file) {
        this.queue = queue;
        this.exportContext = exportContext;
        this.file = file;
        this.cacheManager = ApplicationContextUtil.getBean(CacheManager.class);
    }

    /**
 * 1.循环从队列中取出数据
  * 2.调用process()处理数据,写入文件,更新导出进度
  * 3.直到倒计数器递减为0,所有数据消费完毕
  */
  @Override
  public Boolean call() {
        try {
            while (exportContext.getExportCountDown().getCount() > 0) {
                List<T> exportableList = new ArrayList<>();
                exportableList.add(queue.take());
                queue.drainTo(exportableList);
                process(exportableList, file);
                for (int i = 0; i < exportableList.size(); i++) {
                    exportContext.getExportCountDown().countDown();
                    // 将导出进度更新到redis,解决分布式问题
  cacheManager.incrementExportCount(exportContext.getExportId());
                }
            }
            postProcess();
            log.info("consumer task finish");
            return true;
        } catch (Exception e) {
            log.error("consumer task error", e);
            throw new BusException("consumer task error");
        } finally {
            release();
        }
    }

    /**
 * 后置处理
  */
  protected void postProcess() {}

    /**
 * 释放资源
  */
  protected void release() {}

    /**
 * 处理数据
  */
  protected abstract void process(List<T> exportableList, File file);

}

ConsumerTask 的职责为不断从阻塞式队列中 take() 数据,随后调用 process() 方法进行处理。

ConsumerToExcelTask

package com.wanshifu.export.common;

import com.alibaba.excel.EasyExcel;
import com.alibaba.excel.ExcelWriter;
import com.alibaba.excel.write.metadata.WriteSheet;
import com.alibaba.excel.write.metadata.fill.FillConfig;
import com.wanshifu.user.importapi.domains.emums.ExportType;
import com.wanshifu.user.importapi.domains.vo.export.ExportContext;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.net.URL;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;

/**
 * 消费数据至 EXCEL 任务
  * 扩展 {@link AbstractConsumerTask}
 * @author zhangyifan@wshifu.com */ 
 @Getter 
 @Slf4j 
 public class ConsumerToExcelTask<T extends Exportable> extends AbstractConsumerTask<T> {

    private ExcelWriter writer;

    private WriteSheet sheet;

    private FillConfig fillConfig;

    public ConsumerToExcelTask(BlockingQueue<T> queue, ExportContext exportContext, File file) {
        super(queue, exportContext, file);
        init();
    }

    public ConsumerToExcelTask(BlockingQueue<T> queue, ExportContext exportContext, File file, FillConfig fillConfig) {
        this(queue, exportContext, file);
        this.fillConfig = fillConfig;
    }

    @Override
  protected void process(List<T> exportableList, File file) {
        try {
            writer.fill(exportableList, fillConfig, sheet);
        } catch (Exception e) {
            log.error("export to excel error", e);
        }
    }

    @Override
  protected void release() {
        if (Objects.nonNull(writer)) {
            writer.finish();
        }
    }

    private void init() {
        ExportType exportType = getExportContext().getExportType();
        String templateSuffix = exportType.getCode();
        URL resource = getClass().getClassLoader().getResource(String.format("templates/export_template_%s.xlsx", templateSuffix));
        String templatePath = Objects.nonNull(resource) ? resource.getPath() : "";
        this.writer = EasyExcel.write(getFile().getAbsolutePath()).withTemplate(templatePath).build();
        this.sheet = EasyExcel.writerSheet().build();
    }

}

由于本业务中主要需求为将结果数据写至 Excel 中,所以 ConsumerToExcelTask 作为此实现,其中依赖了阿里开源组件 easyexcel 进行 Excel 读写操作,避免重复造轮子。本示例中提供的代码均为抽象上层代码,具体业务实现需对其进行拓展,此处不进行探讨。


文章作者: Ethan Zhang
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Ethan Zhang !
评论
 上一篇
微服务网关 Spring Cloud Gateway 之限流 微服务网关 Spring Cloud Gateway 之限流
Spring Cloud Gateway 作为 Spring 官方全新推出的微服务网关解决方案,用于替代原有的 Netflix Zuul 框架,相比于 Zuul,Spring Cloud Gateway 最大的不同点在于它是基于 Spri
2020-04-08 Ethan Zhang
下一篇 
事件驱动之 Spring Cloud Bus 事件驱动之 Spring Cloud Bus
在分布式系统中我们往往需要大量的状态广播,以通知系统中各个节点,触发相关行为,例如用于保证分布式数据一致性或进行相关补偿行为。这时候事件驱动型的编程模型有利于我们处理相关问题,在一个节点发布一个事件,另外N个节点监听该事件作出相关反应,这
2019-09-25 Ethan Zhang