前言:
今天我们对“mysqlfor循环插入”大体比较关注,各位老铁们都需要分析一些“mysqlfor循环插入”的相关文章。那么小编也在网摘上网罗了一些对于“mysqlfor循环插入””的相关资讯,希望姐妹们能喜欢,各位老铁们一起来学习一下吧!问题:在业务中,有些接口只为了保存数据,而且是高频操作,比如日志处理、核心功能的记录上报。
解决方案:通过抽象将核心的队列,参数, 任务调度提取出来形成一个抽象类, 这样做的好处是:
其他的相同的批量处理任务可以重用方便管理,有问题排查起来比较方便抽象代码
/** * @Description: 定义批处理抽闲类,所有需要批处理的业务都继承这个基类。 */@Slf4jpublic abstract class AbstractBatchQueue<T> { /** * 定义队列的长度 */ protected Integer queueSize = 1000; /** * 最多批量存储1000条数据 */ protected BlockingQueue<T> blockingQueue = new LinkedBlockingQueue( queueSize ); /** * 延迟多长时间启用 , 单位毫秒 */ protected int initialDelay = 2000; /** * 间隔多长进行任务重新执行 */ protected int period = 500; /** * 初始化任务调度器, 定义模板由子类实现具体的批处理任务 */ @PostConstruct public void startTask(){ log.info( "初始化任务调度 , 200毫秒进行一次上传===> , 业务={}" , getServiceName() ); Executors.newScheduledThreadPool(1) .scheduleAtFixedRate(()->{ try { Long currentTime = System.currentTimeMillis(); int queueSize = blockingQueue.size() ; if( queueSize <= 0 ){ return; } log.info( "当前队列的数据大小 size={}" ,queueSize ); List<T> dataList = new LinkedList<>(); while ( queueSize -- > 0 ){ dataList.add( blockingQueue.poll() ); } int insertSize = batchInsertProcess( dataList ); log.info( "当前批量插入成功数量:size={} , 批量插入耗时={}ms ,业务 = 【{}】" , insertSize , System.currentTimeMillis() - currentTime, getServiceName() ); } catch (Exception e) { log.error( "任务调度执行异常{}" , e ); e.printStackTrace(); } } , initialDelay , period , TimeUnit.MILLISECONDS ); } /** * 批处理具体执行逻辑 , 由子类实现 * @param list * @return */ protected abstract int batchInsertProcess(List<T> list); /** * 定义业务名称, 方便后续多个子类的时候容易区分 * @return */ protected abstract String getServiceName(); /** * 将任务添加到消费队列 * @param t * @throws IllegalStateException */ protected void addQueue(T t) throws IllegalStateException{ blockingQueue.add( t ); }}测试结果
@Testpublic void batchSave() { long s1 = System.currentTimeMillis(); List<TAppStudyCompanion> list = new ArrayList<>(); for ( int i=0 ; i<1000 ; i++ ){ TAppStudyCompanion t = new TAppStudyCompanion(); t.setId(SnowflakeIdWorkerUtils.getId()); t.setQid( "VCCNBLVT" ); t.setUserId(7736126574367517497L); t.setCreateTime(LocalDateTime.now()); t.setUpdateTime(LocalDateTime.now()); t.setStartTime(LocalDateTime.now()); t.setEndTime(LocalDateTime.now()); t.setStartIsP2p(1); t.setEndIsP2p(1); t.setRate(455656L); t.setDuration(1000L); t.setDFlag(DataStatusEnums.NORNAL.getCode()); //studyCompanionService.save( t ); list.add( t ); } appStudyCompanionMapper.batchSave( list); System.out.println( "插入耗时 = " + (System.currentTimeMillis() - s1) + "ms" ); //批量处理1000条 耗时611ms //单次插入1000条 耗时45545ms}
批量处理1000条数据保存到mysql ,耗时611ms。
如果是单条数据循环1000插入,耗时45545ms。
从测试结论上看,批量处理的方式插入效率远远高于单条数据的插入,从链接上看批量跟服务器只做了一次的IO, 这样不管是传输效率还是上下文切换上面节省了大量的时间。
总结
使用阻塞队列与单线程任务调度的方案,可以避免线程安全的问题,实现方式简单高效。
你们有其他更好的批量数据处理解决方案吗?
标签: #mysqlfor循环插入