龙空技术网

我是这样做批量插入数据模板设计的,你们呢?

歆绎 340

前言:

今天我们对“mysqlfor循环插入”大体比较关注,各位老铁们都需要分析一些“mysqlfor循环插入”的相关文章。那么小编也在网摘上网罗了一些对于“mysqlfor循环插入””的相关资讯,希望姐妹们能喜欢,各位老铁们一起来学习一下吧!

问题:在业务中,有些接口只为了保存数据,而且是高频操作,比如日志处理、核心功能的记录上报。

解决方案:通过抽象将核心的队列,参数, 任务调度提取出来形成一个抽象类, 这样做的好处是:

其他的相同的批量处理任务可以重用方便管理,有问题排查起来比较方便抽象代码

/** * @Description:  定义批处理抽闲类,所有需要批处理的业务都继承这个基类。 */@Slf4jpublic abstract class AbstractBatchQueue<T> {    /**     * 定义队列的长度     */    protected Integer queueSize = 1000;    /**     * 最多批量存储1000条数据     */    protected BlockingQueue<T> blockingQueue = new LinkedBlockingQueue( queueSize );    /**     * 延迟多长时间启用 , 单位毫秒     */    protected int initialDelay = 2000;    /**     * 间隔多长进行任务重新执行     */    protected int period = 500;    /**     * 初始化任务调度器, 定义模板由子类实现具体的批处理任务     */    @PostConstruct    public void startTask(){        log.info( "初始化任务调度 , 200毫秒进行一次上传===> , 业务={}" ,  getServiceName() );        Executors.newScheduledThreadPool(1)                .scheduleAtFixedRate(()->{                    try {                        Long currentTime = System.currentTimeMillis();                        int queueSize = blockingQueue.size() ;                        if( queueSize <= 0 ){                            return;                        }                        log.info( "当前队列的数据大小 size={}" ,queueSize  );                        List<T> dataList = new LinkedList<>();                        while ( queueSize -- > 0 ){                            dataList.add( blockingQueue.poll() );                        }                        int insertSize  = batchInsertProcess( dataList );                        log.info( "当前批量插入成功数量:size={} , 批量插入耗时={}ms ,业务 = 【{}】" ,                                insertSize ,                                System.currentTimeMillis() -  currentTime,                                getServiceName()                        );                    } catch (Exception e) {                        log.error( "任务调度执行异常{}" , e );                        e.printStackTrace();                    }                } , initialDelay , period , TimeUnit.MILLISECONDS );    }    /**     * 批处理具体执行逻辑 , 由子类实现     * @param list     * @return     */    protected abstract int batchInsertProcess(List<T> list);    /**     * 定义业务名称, 方便后续多个子类的时候容易区分     * @return     */    protected abstract String getServiceName();    /**     * 将任务添加到消费队列     * @param t     * @throws IllegalStateException     */    protected void addQueue(T t)  throws IllegalStateException{        blockingQueue.add( t );    }}
测试结果
@Testpublic void batchSave() {    long s1 = System.currentTimeMillis();    List<TAppStudyCompanion> list = new ArrayList<>();    for ( int i=0 ; i<1000 ; i++ ){        TAppStudyCompanion t = new TAppStudyCompanion();        t.setId(SnowflakeIdWorkerUtils.getId());        t.setQid( "VCCNBLVT" );        t.setUserId(7736126574367517497L);        t.setCreateTime(LocalDateTime.now());        t.setUpdateTime(LocalDateTime.now());        t.setStartTime(LocalDateTime.now());        t.setEndTime(LocalDateTime.now());        t.setStartIsP2p(1);        t.setEndIsP2p(1);        t.setRate(455656L);        t.setDuration(1000L);        t.setDFlag(DataStatusEnums.NORNAL.getCode());        //studyCompanionService.save( t );        list.add( t );    }    appStudyCompanionMapper.batchSave( list);    System.out.println( "插入耗时 = " +   (System.currentTimeMillis() - s1) + "ms" );    //批量处理1000条 耗时611ms    //单次插入1000条 耗时45545ms}

批量处理1000条数据保存到mysql ,耗时611ms。

如果是单条数据循环1000插入,耗时45545ms。

从测试结论上看,批量处理的方式插入效率远远高于单条数据的插入,从链接上看批量跟服务器只做了一次的IO, 这样不管是传输效率还是上下文切换上面节省了大量的时间。

总结

使用阻塞队列与单线程任务调度的方案,可以避免线程安全的问题,实现方式简单高效。

你们有其他更好的批量数据处理解决方案吗?

标签: #mysqlfor循环插入