龙空技术网

最近java项目学会使用ForkJoinPool在业务场景中使用,多线程解决

不断进步中 451

前言:

而今各位老铁们对“java的join使用方法”大致比较关切,大家都想要了解一些“java的join使用方法”的相关内容。那么小编在网络上汇集了一些对于“java的join使用方法””的相关知识,希望兄弟们能喜欢,看官们快快来学习一下吧!

ForkJoinPool项目中使用记录下,最大工作线程数、工作线程工厂、拒绝任务的Handler和同步模式。可以让我们很方便地将一个大任务拆散成小任务,并行地执行,提高CPU的使用率

个人体会

优点:处理速度提升N倍

缺点:突发报错不容易发现问题,需要跟踪观察处理

入口类注解线程数量

@Bean("taxreFundforkJoinPool")

public ForkJoinPool taxreFundforkJoinPool(){

return new ForkJoinPool(80);

}

DataConfirmContext dataConfirmContext=new DataConfirmContext(getDataItemAddMap(checkTmp),userName);

//按确认数据

forkJoinPool.submit(

()->

checkTmp.parallelStream().collect(Collectors.groupingBy(t->

// checkTmp.stream().collect(Collectors.groupingBy(t-> //单线程

TaxRefundDataKey.builder()

.areaCode(t.getAreaCode())

.contactNo(t.getContactNo())

.netCode(org.apache.commons.lang3.StringUtils.defaultString(t.getNetCode()).toUpperCase())

.serviceCode(t.getServiceCode())

.supplyCode(t.getSupplyCode())

.month(t.getMonth())

.item(t.getItem())

.refundType(t.getRefundType())

.positionType(t.getPositionType())

.val(t.getVal())

.build()

))

.entrySet().parallelStream().forEach(kv->

// .entrySet().stream().forEach(kv->

taxRefunDataConfirmService.comfirmForSingleEmploy(kv,dataConfirmContext)

)

).join();

/**

* 在一个事物中确认单个雇员的TaxRefundDataTmp到TaxRefundData表,确保数据一致性

* @param kv

* @param dataConfirmContext

*/

@Transactional

public void comfirmForSingleEmploy(Entry<TaxRefundDataKey, List<TaxRefundDataTmp>> kv, DataConfirmContext dataConfirmContext) {

log.info("comfirmForSingleEmploy001确认税返{}数据",JsonUtil.toJson(kv));

log.info("comfirmForSingleEmploy002确认税返{}数据",JsonUtil.toJson(dataConfirmContext));

// if(kv.getValue().isEmpty()) {

// log.info("雇员{}数据为空",JsonUtil.toJson(kv.getKey()));

// return;

// }

//

Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> sameDataKeyItems = convert2DataItemKv(kv,dataConfirmContext);

//数据模版

TaxRefunConfirmDataTemplate template=TaxRefunConfirmDataTemplate.from(kv.getValue().get(0));

mergeDataItemsToImportBillDataByDataKey(sameDataKeyItems,dataConfirmContext.getUserName(),template);

updateImportBillDataTmpStatus(kv,dataConfirmContext.getUserName());

}

/**

* 数据确认前,统一进行类型转换做接口适配

* @param kv

* @param dataConfirmContext

* @return

*/

public Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> convert2DataItemKv(Entry<TaxRefundDataKey, List<TaxRefundDataTmp>> kv, DataConfirmContext dataConfirmContext) {

List<TaxRefundDataItemCodeAndValue> dataItemCodeAndValues = kv.getValue().stream().map(e->{

return new TaxRefundDataItemCodeAndValue(e.getItem(),e.getVal(),dataConfirmContext.getDataItemAddMap().get(e.getItem()));

}).collect(Collectors.toList());

return Maps.immutableEntry(kv.getKey(), dataItemCodeAndValues);

}

/**

* 合并同一个Key的数据到ImportBillData表,进ImportBillData的通用方法

* @param sameDataKeyItems

* @param updateBy 合并相关的额外数据

* @param employName

*/

@Transactional

public void mergeDataItemsToImportBillDataByDataKey(Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> sameDataKeyItems,

String updateBy,TaxRefunConfirmDataTemplate template) {

//检查到技术的修改为已计算的……

int count=dataKeyIsExists(sameDataKeyItems.getKey());

log.info("mergeDataItemsToImportBillDataByDataKey数据{}",JsonUtil.toJson(count));

log.info("mergeDataItemsToImportBillDataByDataKey001{}数据",JsonUtil.toJson(sameDataKeyItems));

if(count==0) {

insertSameDataKeyItemsToBillImportData(sameDataKeyItems,updateBy,template);

}

else {

updateSameDataKeyItemsToBillImportData(sameDataKeyItems,updateBy);

}

}

/**

* 汇总更新相同dataKey的items到BillImportData表

* @param sameDataKeyItems 相同dataKey的items

* @param updateBy 更新人

*/

private void updateSameDataKeyItemsToBillImportData(Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> dataItemKv, String updateBy) {

log.info("updateSameDataKeyItem00001数据{}",JsonUtil.toJson(dataItemKv));

TaxRefundData billImportData= taxRefundDataMapper.selectForUpdate(dataItemKv.getKey());

log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(billImportData));

BigDecimal newValue = new BigDecimal(0);

for (TaxRefundDataItemCodeAndValue tmp :dataItemKv.getValue()) {

log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(tmp));

if(tmp.getOptAdd()) {

newValue= newValue.add(new BigDecimal(tmp.getValue().toString()));

log.info("updateSameDataKeyItem00AA数据{}",JsonUtil.toJson(newValue));

}

log.info("updateSameDataKeyItem00003数据{}",JsonUtil.toJson(newValue));

}

// dataItemKv.getValue().stream().forEach(tmp->{

// log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(tmp));

// if(tmp.getOptAdd()) {

// newValue= newValue.add(new BigDecimal(tmp.getValue().toString()));

// log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(newValue));

// }

// log.info("updateSameDataKeyItem00003数据{}",JsonUtil.toJson(newValue));

// });

//已计算与未计算处理

if("Y".equals(billImportData.getState())) {

billImportData.setState("Y");

}else {

billImportData.setState("N");

}

billImportData.setUpdateTime(new Date());

billImportData.setUpdateBy(updateBy);

newValue= newValue.add(billImportData.getItem());

billImportData.setItem(newValue);

taxRefundDataMapper.updateById(billImportData);

}

private void updateImportBillDataTmpStatus(Entry<TaxRefundDataKey, List<TaxRefundDataTmp>> kv, String userName) {

List<Long> ids = kv.getValue().stream().map(TaxRefundDataTmp::getId).collect(Collectors.toList());

taxRefundDataTmpMapper.updateStatusByIdsAndMonth(ids,kv.getKey().getMonth(),ColumnConstants.E,userName);

}

/**

* 汇总插入相同dataKey的items到BillImportData表

* @param sameDataKeyItems 相同dataKey的items

* @param updateBy 更新人

*/

private void insertSameDataKeyItemsToBillImportData(

Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> sameDataKeyItems,

String updateBy,TaxRefunConfirmDataTemplate template) {

// ExtJsonBuffer extBuffer=new ExtJsonBuffer();

// sameDataKeyItems.getValue().stream().forEach(tmp->{

// if(tmp.getOptAdd()) {

// extBuffer.add(tmp.getDataItemCode(),tmp.getValue());

// }

// else {

// extBuffer.replace(tmp.getDataItemCode(),tmp.getValue());

// }

// });

BigDecimal newValue = new BigDecimal(0);

for (TaxRefundDataItemCodeAndValue tmp :sameDataKeyItems.getValue()) {

log.info("insertSameDataKeyItem00001数据{}",JsonUtil.toJson(tmp));

if(tmp.getOptAdd()) {

newValue= newValue.add(new BigDecimal(tmp.getValue().toString()));

log.info("insertSameDataKeyItem00AA数据{}",JsonUtil.toJson(newValue));

}

log.info("insertSameDataKeyItem00002数据{}",JsonUtil.toJson(newValue));

}

TaxRefundDataKey dataKey=sameDataKeyItems.getKey();

log.info("insertSameDataKeyItemsToBillImportData001数据{}",JsonUtil.toJson(dataKey));

TaxRefundData billImportData= new TaxRefundData();

billImportData.setAreaCode(dataKey.getAreaCode());

billImportData.setCompanyCode(template.getCompanyCode());

billImportData.setContactNo(dataKey.getContactNo());

// billImportData.setDataRangeCode(template.getDataRangeCode());

// billImportData.setEmpCode(dataKey.getEmpCode());

// billImportData.setIsEmployer(template.getIsEmployer());

// billImportData.setEmpName(template.getEmployName());

billImportData.setNetCode(dataKey.getNetCode());

billImportData.setRefundType(dataKey.getRefundType());

billImportData.setPositionType(dataKey.getPositionType());

billImportData.setMonth(dataKey.getMonth());

billImportData.setServiceCode(dataKey.getServiceCode());

billImportData.setState("N");

billImportData.setSupplyCode(dataKey.getSupplyCode());

billImportData.setItem(newValue);

//billImportData.setSupplyName(template.getSupplyName());

//数据第一次插入

billImportData.setCreateTime(new Date());

billImportData.setUpdateTime(new Date());

billImportData.setCreateBy(updateBy);

billImportData.setUpdateBy(updateBy);

log.info("插入insertSameDataKeyItemsToTaxRefundData数据 {} - {}",billImportData.getSupplyCode(),billImportData.getContactNo());

taxRefundDataMapper.insert(billImportData);

}

/**

* 判断TaxRefundData表是否已经存在key对应的行

* @param key

* @return

*/

private int dataKeyIsExists(TaxRefundDataKey key) {

log.info("dataKeyIsExists000数据{}",JsonUtil.toJson(key));

QueryWrapper<TaxRefundData> query = new QueryWrapper<>();

query.eq(ColumnConstants.CONTACT_NO,key.getContactNo());

query.eq("service_code",key.getServiceCode());

query.eq("area_code",key.getAreaCode());

query.eq("supply_code",key.getSupplyCode());

query.eq("NET_CODE",key.getNetCode());

query.eq("refund_type",key.getRefundType());

query.eq(ColumnConstants.MONTH, key.getMonth());

int a=taxRefundDataMapper.selectCount(query);

return a;

}

标签: #java的join使用方法