前言:
而今各位老铁们对“java的join使用方法”大致比较关切,大家都想要了解一些“java的join使用方法”的相关内容。那么小编在网络上汇集了一些对于“java的join使用方法””的相关知识,希望兄弟们能喜欢,看官们快快来学习一下吧!ForkJoinPool项目中使用记录下,最大工作线程数、工作线程工厂、拒绝任务的Handler和同步模式。可以让我们很方便地将一个大任务拆散成小任务,并行地执行,提高CPU的使用率
个人体会
优点:处理速度提升N倍
缺点:突发报错不容易发现问题,需要跟踪观察处理
入口类注解线程数量
@Bean("taxreFundforkJoinPool")
public ForkJoinPool taxreFundforkJoinPool(){
return new ForkJoinPool(80);
}
DataConfirmContext dataConfirmContext=new DataConfirmContext(getDataItemAddMap(checkTmp),userName);
//按确认数据
forkJoinPool.submit(
()->
checkTmp.parallelStream().collect(Collectors.groupingBy(t->
// checkTmp.stream().collect(Collectors.groupingBy(t-> //单线程
TaxRefundDataKey.builder()
.areaCode(t.getAreaCode())
.contactNo(t.getContactNo())
.netCode(org.apache.commons.lang3.StringUtils.defaultString(t.getNetCode()).toUpperCase())
.serviceCode(t.getServiceCode())
.supplyCode(t.getSupplyCode())
.month(t.getMonth())
.item(t.getItem())
.refundType(t.getRefundType())
.positionType(t.getPositionType())
.val(t.getVal())
.build()
))
.entrySet().parallelStream().forEach(kv->
// .entrySet().stream().forEach(kv->
taxRefunDataConfirmService.comfirmForSingleEmploy(kv,dataConfirmContext)
)
).join();
/**
* 在一个事物中确认单个雇员的TaxRefundDataTmp到TaxRefundData表,确保数据一致性
* @param kv
* @param dataConfirmContext
*/
@Transactional
public void comfirmForSingleEmploy(Entry<TaxRefundDataKey, List<TaxRefundDataTmp>> kv, DataConfirmContext dataConfirmContext) {
log.info("comfirmForSingleEmploy001确认税返{}数据",JsonUtil.toJson(kv));
log.info("comfirmForSingleEmploy002确认税返{}数据",JsonUtil.toJson(dataConfirmContext));
// if(kv.getValue().isEmpty()) {
// log.info("雇员{}数据为空",JsonUtil.toJson(kv.getKey()));
// return;
// }
//
Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> sameDataKeyItems = convert2DataItemKv(kv,dataConfirmContext);
//数据模版
TaxRefunConfirmDataTemplate template=TaxRefunConfirmDataTemplate.from(kv.getValue().get(0));
mergeDataItemsToImportBillDataByDataKey(sameDataKeyItems,dataConfirmContext.getUserName(),template);
updateImportBillDataTmpStatus(kv,dataConfirmContext.getUserName());
}
/**
* 数据确认前,统一进行类型转换做接口适配
* @param kv
* @param dataConfirmContext
* @return
*/
public Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> convert2DataItemKv(Entry<TaxRefundDataKey, List<TaxRefundDataTmp>> kv, DataConfirmContext dataConfirmContext) {
List<TaxRefundDataItemCodeAndValue> dataItemCodeAndValues = kv.getValue().stream().map(e->{
return new TaxRefundDataItemCodeAndValue(e.getItem(),e.getVal(),dataConfirmContext.getDataItemAddMap().get(e.getItem()));
}).collect(Collectors.toList());
return Maps.immutableEntry(kv.getKey(), dataItemCodeAndValues);
}
/**
* 合并同一个Key的数据到ImportBillData表,进ImportBillData的通用方法
* @param sameDataKeyItems
* @param updateBy 合并相关的额外数据
* @param employName
*/
@Transactional
public void mergeDataItemsToImportBillDataByDataKey(Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> sameDataKeyItems,
String updateBy,TaxRefunConfirmDataTemplate template) {
//检查到技术的修改为已计算的……
int count=dataKeyIsExists(sameDataKeyItems.getKey());
log.info("mergeDataItemsToImportBillDataByDataKey数据{}",JsonUtil.toJson(count));
log.info("mergeDataItemsToImportBillDataByDataKey001{}数据",JsonUtil.toJson(sameDataKeyItems));
if(count==0) {
insertSameDataKeyItemsToBillImportData(sameDataKeyItems,updateBy,template);
}
else {
updateSameDataKeyItemsToBillImportData(sameDataKeyItems,updateBy);
}
}
/**
* 汇总更新相同dataKey的items到BillImportData表
* @param sameDataKeyItems 相同dataKey的items
* @param updateBy 更新人
*/
private void updateSameDataKeyItemsToBillImportData(Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> dataItemKv, String updateBy) {
log.info("updateSameDataKeyItem00001数据{}",JsonUtil.toJson(dataItemKv));
TaxRefundData billImportData= taxRefundDataMapper.selectForUpdate(dataItemKv.getKey());
log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(billImportData));
BigDecimal newValue = new BigDecimal(0);
for (TaxRefundDataItemCodeAndValue tmp :dataItemKv.getValue()) {
log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(tmp));
if(tmp.getOptAdd()) {
newValue= newValue.add(new BigDecimal(tmp.getValue().toString()));
log.info("updateSameDataKeyItem00AA数据{}",JsonUtil.toJson(newValue));
}
log.info("updateSameDataKeyItem00003数据{}",JsonUtil.toJson(newValue));
}
// dataItemKv.getValue().stream().forEach(tmp->{
// log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(tmp));
// if(tmp.getOptAdd()) {
// newValue= newValue.add(new BigDecimal(tmp.getValue().toString()));
// log.info("updateSameDataKeyItem00002数据{}",JsonUtil.toJson(newValue));
// }
// log.info("updateSameDataKeyItem00003数据{}",JsonUtil.toJson(newValue));
// });
//已计算与未计算处理
if("Y".equals(billImportData.getState())) {
billImportData.setState("Y");
}else {
billImportData.setState("N");
}
billImportData.setUpdateTime(new Date());
billImportData.setUpdateBy(updateBy);
newValue= newValue.add(billImportData.getItem());
billImportData.setItem(newValue);
taxRefundDataMapper.updateById(billImportData);
}
private void updateImportBillDataTmpStatus(Entry<TaxRefundDataKey, List<TaxRefundDataTmp>> kv, String userName) {
List<Long> ids = kv.getValue().stream().map(TaxRefundDataTmp::getId).collect(Collectors.toList());
taxRefundDataTmpMapper.updateStatusByIdsAndMonth(ids,kv.getKey().getMonth(),ColumnConstants.E,userName);
}
/**
* 汇总插入相同dataKey的items到BillImportData表
* @param sameDataKeyItems 相同dataKey的items
* @param updateBy 更新人
*/
private void insertSameDataKeyItemsToBillImportData(
Entry<TaxRefundDataKey, List<TaxRefundDataItemCodeAndValue>> sameDataKeyItems,
String updateBy,TaxRefunConfirmDataTemplate template) {
// ExtJsonBuffer extBuffer=new ExtJsonBuffer();
// sameDataKeyItems.getValue().stream().forEach(tmp->{
// if(tmp.getOptAdd()) {
// extBuffer.add(tmp.getDataItemCode(),tmp.getValue());
// }
// else {
// extBuffer.replace(tmp.getDataItemCode(),tmp.getValue());
// }
// });
BigDecimal newValue = new BigDecimal(0);
for (TaxRefundDataItemCodeAndValue tmp :sameDataKeyItems.getValue()) {
log.info("insertSameDataKeyItem00001数据{}",JsonUtil.toJson(tmp));
if(tmp.getOptAdd()) {
newValue= newValue.add(new BigDecimal(tmp.getValue().toString()));
log.info("insertSameDataKeyItem00AA数据{}",JsonUtil.toJson(newValue));
}
log.info("insertSameDataKeyItem00002数据{}",JsonUtil.toJson(newValue));
}
TaxRefundDataKey dataKey=sameDataKeyItems.getKey();
log.info("insertSameDataKeyItemsToBillImportData001数据{}",JsonUtil.toJson(dataKey));
TaxRefundData billImportData= new TaxRefundData();
billImportData.setAreaCode(dataKey.getAreaCode());
billImportData.setCompanyCode(template.getCompanyCode());
billImportData.setContactNo(dataKey.getContactNo());
// billImportData.setDataRangeCode(template.getDataRangeCode());
// billImportData.setEmpCode(dataKey.getEmpCode());
// billImportData.setIsEmployer(template.getIsEmployer());
// billImportData.setEmpName(template.getEmployName());
billImportData.setNetCode(dataKey.getNetCode());
billImportData.setRefundType(dataKey.getRefundType());
billImportData.setPositionType(dataKey.getPositionType());
billImportData.setMonth(dataKey.getMonth());
billImportData.setServiceCode(dataKey.getServiceCode());
billImportData.setState("N");
billImportData.setSupplyCode(dataKey.getSupplyCode());
billImportData.setItem(newValue);
//billImportData.setSupplyName(template.getSupplyName());
//数据第一次插入
billImportData.setCreateTime(new Date());
billImportData.setUpdateTime(new Date());
billImportData.setCreateBy(updateBy);
billImportData.setUpdateBy(updateBy);
log.info("插入insertSameDataKeyItemsToTaxRefundData数据 {} - {}",billImportData.getSupplyCode(),billImportData.getContactNo());
taxRefundDataMapper.insert(billImportData);
}
/**
* 判断TaxRefundData表是否已经存在key对应的行
* @param key
* @return
*/
private int dataKeyIsExists(TaxRefundDataKey key) {
log.info("dataKeyIsExists000数据{}",JsonUtil.toJson(key));
QueryWrapper<TaxRefundData> query = new QueryWrapper<>();
query.eq(ColumnConstants.CONTACT_NO,key.getContactNo());
query.eq("service_code",key.getServiceCode());
query.eq("area_code",key.getAreaCode());
query.eq("supply_code",key.getSupplyCode());
query.eq("NET_CODE",key.getNetCode());
query.eq("refund_type",key.getRefundType());
query.eq(ColumnConstants.MONTH, key.getMonth());
int a=taxRefundDataMapper.selectCount(query);
return a;
}
标签: #java的join使用方法