龙空技术网

如何实现快速同步亿级商品数据至 Elasticsearch?

互联网高级架构师 1870

前言:

今天姐妹们对“java计算sku算法”都比较重视,咱们都需要了解一些“java计算sku算法”的相关资讯。那么小编也在网上收集了一些有关“java计算sku算法””的相关资讯,希望同学们能喜欢,朋友们一起来学习一下吧!

在公司的实际业务场景中,初始化数据是个避不过去的话题。比如项目上线初期,将数据库的商品数据同步到搜索引擎 ElasticSearcgh 、缓存 Redis 或者其它的数据库。

在这种初始化同步的场景下,如果使用单线程同步,几千万或者上亿的数据可能需要同步很长时间,而且其中一些细节知识点也很值得考究。如何解决大数据深分页问题?如何防止应用内存溢出等。

数据同步目标

一句话可以概述,商品数据全量初始化任务需要快速执行完成,其次要稳定。

避免 MySQL 大数据量读取造成的深分页问题。初始化任务需要在短时间内同步大批量数据。规避因读取过多数据量而造成内存溢出问题。MySQL 深分页

让我们先来讨论深分页问题,这是一个经久不衰的问题。如果你还不了解 MySQL 深分页是什么,请继续阅读。

MySQL 大数据量深分页问题是指当使用 MySQL 数据库进行大数据量分页查询时,例如从一个包含数百万条记录的表中查询最后一小部分数据时,会出现性能问题。

具体来说,当使用 LIMIT 和 OFFSET 关键字来分页查询时,MySQL 会将查询结果集中的所有记录都加载到内存中,然后再将指定的分页数据返回给客户端。这种方式在处理小数据集时是有效的,但对于大数据集来说,它会导致大量的内存使用和性能问题。

当 OFFSET 数量很大时,MySQL 必须扫描并跳过所有之前的记录,这会导致非常慢的查询速度。此外,当使用 InnoDB 存储引擎时,随着 OFFSET 增加,I/O 操作会变得更加频繁,因为 MySQL 必须从磁盘中读取更多的数据页。

因此,当需要进行大数据量深分页查询时,应该避免使用 OFFSET 和 LIMIT,而是使用类似于“基于游标的分页”或“分段查询”等技术来解决性能问题。

1、书签关联

关于 LIMIT 深分页问题,核心在于 OFFSET 值,它会 导致 MySQL 扫描大量不需要的记录行然后抛弃掉。

我们可以先使用书签 记录获取上次取数据的位置,下次就可以直接从该位置开始扫描,这样可以 避免使用 OFFEST。

假设需要查询 3000000 行数据后的第 1 条记录,查询可以这么写。

select * from product where id < 3000000 limit 1;

因为我们的目标是同步所有商品库的数据,所以每次取 5000 条,记住最大的 ID,下次以该 ID 充当查询条件。

通过该方式完美解决深分页造成的性能损耗,也就是上文说的分段查询。

2、流式查询

上面的分段查询方式在我们流程中真的能够解决所有问题么?

直接得出结论:如果没有分表,该方法可以有效解决深度分页的问题。然而,一旦进行了分表,该方法将不再适用。这是因为商品表的分片键不是 ID 字段,如果执行此 SQL 语句,将会查询所有商品分表。

那么,就到了解决深分页的主角登场了,那就是流式查询。

流式查询通过在MySQL服务器端使用游标进行数据的逐行读取,可以避免一次性将所有数据加载到内存中,从而节省内存空间并提高查询效率。

简单来说,当你调用 MySQL 流式查询时,就像建立了一个“管道”一样,可以源源不断地将数据传输到客户端。

但是,流式查询不是银弹,当流式查询数据没有完全传输完时,当前的数据库连接是独占的,无法被其它线程所使用。

如果说需要频繁进行流式查询操作,可以单独拆一个数据库连接池,不要和正常业务逻辑共用一个连接池。

生产消费模型

1、模型定义

简单来说,生产者-消费者并发模型是由两类线程构成:

生产者线程:“生产”产品,并把产品放到一个队列里;消费者线程:“消费”产品。

有了这个队列,生产者就只需要关注生产,而不用管消费者的消费行为,更不用等待消费者线程执行完;消费者也只管消费,不用管生产者是怎么生产的,更不用等着生产者生产。

生产者生成一定量的数据放到缓冲区中,然后重复此过程;与此同时,消费者也在缓冲区消耗这些数据。生产者和消费者之间必须保持同步,要保证生产者不会在缓冲区满时放入数据,消费者也不会在缓冲区空时消耗数据。

2、阻塞队列实现

网上有很多种实现模式,基于 wait() / notify() 或者 await() / signal() 等等,这里直接通过阻塞队列充当商品容器。

import lombok.SneakyThrows;import java.util.concurrent.LinkedBlockingQueue;public class ProductSyncExecutor {    private static final LinkedBlockingQueue<Object> blockingQueue = new LinkedBlockingQueue<>(10);    public static void main(String[] args) {        Thread product1 = new Thread(new ProductProduce());        Thread product2 = new Thread(new ProductProduce());        Thread product3 = new Thread(new ProductProduce());        Thread consumer1 = new Thread(new ProductConsumer());        Thread consumer2 = new Thread(new ProductConsumer());        Thread consumer3 = new Thread(new ProductConsumer());        product1.setName("Thread-Product-1");        product1.start();        product2.setName("Thread-Product-2");        product2.start();        product3.setName("Thread-Product-3");        product3.start();        consumer1.setName("Thread-Consumer-1");        consumer1.start();        consumer2.setName("Thread-Consumer-2");        consumer2.start();        consumer3.setName("Thread-Consumer-3");        consumer3.start();    }    public static class ProductProduce implements Runnable {        @SneakyThrows        @Override        public void run() {            while (true) {                blockingQueue.put(new Object());                System.out.println("【生产者:" + Thread.currentThread().getName()                        + "】放入一个商品,现容量:" + blockingQueue.size());                Thread.sleep(100);            }        }    }    public static class ProductConsumer implements Runnable {        @SneakyThrows        @Override        public void run() {            while (true) {                blockingQueue.take();                System.out.println("【消费者:" + Thread.currentThread().getName()                        + "】消费了一个商品,现容量:" + blockingQueue.size());                Thread.sleep(200);            }        }    }}
流式查询&并发编程

上述内容已经聊清楚了,商品数据从数据库查出来需要使用流式查询,避免一次性把数据读入造成的 OOM 以及大数据量情况下的深分页慢查询。

其次,通过生产者-消费者并发模型,由流式查询充当生产者,消费者由线程池内的消费线程组成。

最终套用到商品大数据量同步三方数据库的代码就变成以下这种形态。

1、开发流式查询

import com.baomidou.mybatisplus.core.mapper.BaseMapper;import org.apache.ibatis.annotations.Options;import org.apache.ibatis.annotations.ResultType;import org.apache.ibatis.annotations.Select;import org.apache.ibatis.mapping.ResultSetType;import org.apache.ibatis.session.ResultHandler;import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO;/** * 商品 SKU 持久层 */public interface ProductSkuMapper extends BaseMapper<ProductSkuDO> {        /**     * 通过流式查询的方式获取所有商品 SKU     */    @Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = Integer.MIN_VALUE)    @ResultType(ProductSkuDO.class)    @Select("SELECT * FROM product_sku WHERE del_flag = '0'")    void listAllProductSkuStreamQuery(ResultHandler<ProductSkuDO> handler);}

2、并发同步程序

业务流程如下:

定义阻塞队列为 LinkedBlockingQueue,并设置最大容量,避免同步过程中因为消费缓慢造成内存溢出;流式查询执行返回数据插入到 blockingQueueCachePool 阻塞队列容器中;如果阻塞队列容器已满,则阻塞生产者线程,等待消费者线程消费阻塞容器;判断当前阻塞容器容量是否大于最小同步数量,如果大于则发起同步,不满足则跳过本次流程;如果流式查询已经将所有数据遍历一遍,那么 listAllProductSkuStreamQuery 流程就结束了。

Q:为什么会有兜底,将最后缓冲的任务执行操作?

A:为了避免频繁调用 ElasticSearch,所以将每次同步最大数据设置为 5000。所以,当最后 一条记录执行完后,可能当前阻塞容器里的数量不足五千,那么就可能不会同步。

余下的这些商品数据,会由兜底任务统一取出并同步 ElasticSearch。

Q:线程池参数如何设置?

A:核心线程数:CPU 核心线程数 / 0.2,最大线程数:核心线程数 + (核心线程数 / 2),阻塞队列:SynchronousQueue,拒绝策略:CallerRunsPolicy,使用当前主线程运行线程池任务。

线程池使用了 Hippo4j,可以在实际业务运行中去评估这个参数是否合理,如果合理,那么就不用变。不合理的话,比如 CPU 飙升,执行时间长等,再降低或者增加线程数。

多线程并发代码如下:

import com.alibaba.fastjson.JSON;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO;import org.opengoofy.congomall.biz.product.infrastructure.dao.mapper.ProductSkuMapper;import org.springframework.stereotype.Component;import org.springframework.web.bind.annotation.GetMapping;import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.LongAdder;import java.util.stream.Collectors;/** * 初始化商品任务,通过并发编程完成生产-消费模型,达到快速同步的效果 */@Slf4j@Component@RequiredArgsConstructorpublic class InitializeProductJobHandlerTwo {        /**     * 商品 SKU 持久层     */    private final ProductSkuMapper productSkuMapper;        /**     * Hippo4j 线程池,执行同步程序     */    private final ThreadPoolExecutor productSkuInitSyncThreadPoolExecutor;        /**     * 单次同步 ElasticSearch 数量     */    private static final Integer MAX_SYNC_SIZE = 5000;        /**     * 阻塞队列最大容量,相当于一个缓冲池大小     */    private static final Integer MAX_POOL_SIZE = 200000;    /**     * 记录同步     */    private static final AtomicInteger COUNT_NUM = new AtomicInteger(0);        /**     * 记录实际同步数量     */    private static final LongAdder SYNC_SUM = new LongAdder();        @GetMapping("/init/product")    public void execute() throws Exception {        BlockingQueue<ProductSkuDO> blockingQueueCachePool = new LinkedBlockingQueue<>(MAX_POOL_SIZE);        productSkuMapper.listAllProductSkuStreamQuery(resultContext -> {            // 记录流式查询总数量            COUNT_NUM.incrementAndGet();            // 每次向缓冲池添加 MAX_SYNC_SIZE 记录            try {                blockingQueueCachePool.put(resultContext.getResultObject());            } catch (Exception ex) {                log.error("商品SKU基础数据初始化流程, 添加阻塞队列缓冲池失败, 数据记录: {}",                        JSON.toJSONString(resultContext.getResultObject()), ex);            }            // 避免请求目标数据库(ElasticSearch 或其它)次数过多,所以建议每次 MAX_SYNC_SIZE 条数,虽然可能不够这个数            if (blockingQueueCachePool.size() >= MAX_SYNC_SIZE) {                productSkuInitSyncThreadPoolExecutor.execute(() -> executeTask(blockingQueueCachePool));            }        });        // 兜底,将最后缓冲的任务执行        productSkuInitSyncThreadPoolExecutor.execute(() -> lastOnceExecuteTask(blockingQueueCachePool));    }        private void executeTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {        List<ProductSkuDO> copyList = new ArrayList<>(MAX_SYNC_SIZE);        try {            int drainTo = blockingQueueCachePool.drainTo(copyList, MAX_SYNC_SIZE);            if (drainTo > 0) {                // 此处决定向何处同步数据                // ......                SYNC_SUM.add(drainTo);            }        } catch (Exception ex) {            log.error("商品SKU基础数据初始化流程执行失败", ex);        }    }        private void lastOnceExecuteTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {        List<ProductSkuDO> lastProductSkus = blockingQueueCachePool.stream().parallel().collect(Collectors.toList());        try {            SYNC_SUM.add(lastProductSkus.size());            // 此处决定向何处同步数据            // ......        } catch (Exception ex) {            log.error("商品SKU基础数据初始化流程执行最后一次同步失败", ex);        }    }}

3、任务进度监控

通过上述代码,已经可以完成咱们最初定的数据初始化同步目标。但是,总觉得缺点什么?

上亿条数据,显然不是短时间内可以同步完成的。那同步过程中,进度从哪里查看?如果不知道进度的话,总感觉心里没底。

这种同步进度可以通过向 Redis 这种中间件写入自增命令,或者通过定时线程池固定时间内打印同步进度,相对来说后者更容易些。

第二个版本更健壮的数据同步程序如下:

import com.alibaba.fastjson.JSON;import com.xxl.job.core.handler.IJobHandler;import com.xxl.job.core.handler.annotation.XxlJob;import lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.opengoofy.congomall.biz.product.infrastructure.dao.entity.ProductSkuDO;import org.opengoofy.congomall.biz.product.infrastructure.dao.mapper.ProductSkuMapper;import org.springframework.stereotype.Component;import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;import java.util.concurrent.atomic.LongAdder;import java.util.stream.Collectors;/** * 初始化商品任务,通过并发编程完成生产-消费模型,达到快速同步的效果 */@Slf4j@Component@RequiredArgsConstructorpublic class InitializeProductJobHandler extends IJobHandler {        /**     * 商品 SKU 持久层     */    private final ProductSkuMapper productSkuMapper;        /**     * Hippo4j 线程池,执行同步程序     */    private final ThreadPoolExecutor productSkuInitSyncThreadPoolExecutor;        /**     * 单次同步 ElasticSearch 数量     */    private static final Integer MAX_SYNC_SIZE = 5000;        /**     * 阻塞队列最大容量,相当于一个缓冲池大小     */    private static final Integer MAX_POOL_SIZE = 200000;        /**     * 记录开始时间     */    private static Long START_TIME = 0L;        /**     * 记录同步     */    private static final AtomicInteger COUNT_NUM = new AtomicInteger(0);        /**     * 记录实际同步数量     */    private static final LongAdder SYNC_SUM = new LongAdder();        /**     * 打印输出监控定时器     */    private static final ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newSingleThreadScheduledExecutor();        @XxlJob(value = "demoJobHandler")    @Override    public void execute() throws Exception {        // 定时打印执行进度        printPoolAndScheduledInfo();        // 执行商品 SKU 同步程序        executeProductSkuSync();        // 释放定时器、同步线程池资源        shutdownPoolAndPrintCountSize();    }        void executeProductSkuSync() {        BlockingQueue<ProductSkuDO> blockingQueueCachePool = new LinkedBlockingQueue<>(MAX_POOL_SIZE);        productSkuMapper.listAllProductSkuStreamQuery(resultContext -> {            // 记录流式查询总数量            COUNT_NUM.incrementAndGet();            // 每次向缓冲池添加 MAX_SYNC_SIZE 记录            try {                blockingQueueCachePool.put(resultContext.getResultObject());            } catch (Exception ex) {                log.error("商品SKU基础数据初始化流程, 添加阻塞队列缓冲池失败, 数据记录: {}",                        JSON.toJSONString(resultContext.getResultObject()), ex);            }            // 避免请求目标数据库(ElasticSearch 或其它)次数过多,所以建议每次 MAX_SYNC_SIZE 条数,虽然可能不够这个数            if (blockingQueueCachePool.size() >= MAX_SYNC_SIZE) {                productSkuInitSyncThreadPoolExecutor.execute(() -> executeTask(blockingQueueCachePool));            }        });        // 兜底,将最后缓冲的任务执行        productSkuInitSyncThreadPoolExecutor.execute(() -> lastOnceExecuteTask(blockingQueueCachePool));    }        private void executeTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {        List<ProductSkuDO> copyList = new ArrayList<>(MAX_SYNC_SIZE);        try {            int drainTo = blockingQueueCachePool.drainTo(copyList, MAX_SYNC_SIZE);            if (drainTo > 0) {                // 此处决定向何处同步数据                // ......                SYNC_SUM.add(drainTo);            }        } catch (Exception ex) {            log.error("商品SKU基础数据初始化流程执行失败", ex);        }    }        private void lastOnceExecuteTask(BlockingQueue<ProductSkuDO> blockingQueueCachePool) {        List<ProductSkuDO> lastProductSkus = blockingQueueCachePool.stream().parallel().collect(Collectors.toList());        try {            SYNC_SUM.add(lastProductSkus.size());            // 此处决定向何处同步数据            // ......        } catch (Exception ex) {            log.error("商品SKU基础数据初始化流程执行最后一次同步失败", ex);        }    }        private void printPoolAndScheduledInfo() {        START_TIME = System.currentTimeMillis();        SCHEDULED_EXECUTOR.scheduleAtFixedRate(() -> {            log.info("商品SKU基础数据初始化流程, 当前已同步总数量: {}", COUNT_NUM.get());            log.info("商品SKU基础数据初始化流程, 线程池状态打印, 当前活动线程数: {}, 当前排队任务数: {}, 执行完成线程数: {}, 线程池任务总数: {}",                    productSkuInitSyncThreadPoolExecutor.getActiveCount(),                    productSkuInitSyncThreadPoolExecutor.getQueue().size(),                    productSkuInitSyncThreadPoolExecutor.getCompletedTaskCount(),                    productSkuInitSyncThreadPoolExecutor.getTaskCount());        }, 30, 10, TimeUnit.SECONDS);    }        private void shutdownPoolAndPrintCountSize() {        // 关闭定时器线程池        SCHEDULED_EXECUTOR.shutdown();        // 关闭数据同步线程池        productSkuInitSyncThreadPoolExecutor.shutdown();        while (true) {            if (SCHEDULED_EXECUTOR.isTerminated() && productSkuInitSyncThreadPoolExecutor.isTerminated()) {                log.info("商品SKU基础数据初始化流程, 总条数: {}, 同步成功数: {}, 同步执行总耗时: {}",                        COUNT_NUM.get(),                        SYNC_SUM.longValue(),                        System.currentTimeMillis() - START_TIME);                break;            }        }    }}

细心的同学可能发现,除了增加定时任务线程池外,还添加调用 shutdownPoolAndPrintCountSize 关闭线程池资源的流程。使用过且不会再使用的资源,及时关闭可释放系统相关资源,这是个很好的编码习惯。

4、性能报告

之前生产环境通过文章中的流程跑过,大概一千多万的数据两分钟左右就搞定了。

大家在进行使用上述程序的时候,服务器资源、线程数量以及 ElasticSearch 配置都是性能考量点,每一个步骤都会影响最终完成时间,需要不断模拟各种参数进行逐步调优。

文末总结

在本文中,我们讨论了如何快速同步亿级商品数据到三方数据库的问题。

为了实现高效的数据同步,我们采用了并发编程和解决深分页问题的方法。具体来说,我们通过使用线程池和多线程技术,对数据同步过程进行了并发处理,从而提高了同步效率。同时,为了解决深分页问题,我们采用了 MySQL 流式查询技术,避免了一次性将大量数据加载到内存中的问题,提高了查询效率和减少了内存占用。

总之,通过合理的并发编程和深度分页技术,我们可以在处理大量数据时提高效率和性能,并有效地解决数据同步的问题。这些方法和技术可以在处理其他类似的大规模数据问题时发挥重要作用。

作者:马丁玩编程

链接:

来源:稀土掘金

标签: #java计算sku算法