龙空技术网

SpringBoot项目中应用Spring Batch批处理框架,处理大数据新方案

Spring全家桶实战案例 2965

前言:

现时大家对“spring事务分批提交”大约比较关怀,大家都需要学习一些“spring事务分批提交”的相关内容。那么小编也在网上网罗了一些有关“spring事务分批提交””的相关文章,希望看官们能喜欢,各位老铁们一起来学习一下吧!

环境:Springboot2.3.12RELEASE + Spring Batch4.2.7

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。Spring Batch可以提供大量的,可重复的数据处理功能,包括日志记录/跟踪,事务管理,作业处理统计工作重新启动、跳过,和资源管理等重要功能。

业务场景:

定期提交批处理。并行批处理:作业的并行处理分阶段、企业消息驱动的处理大规模并行批处理故障后手动或计划重新启动相关步骤的顺序处理(扩展到工作流驱动的批处理)部分处理:跳过记录(例如,回滚时)整批事务,适用于小批量或现有存储过程/脚本的情况

技术目标:

批处理开发人员使用Spring编程模型:专注于业务逻辑,让框架负责基础设施。基础架构、批处理执行环境和批处理应用程序之间的关注点清晰分离。提供通用的核心执行服务,作为所有项目都可以实现的接口。提供可“开箱即用”的核心执行接口的简单和默认实现。通过在所有层中利用spring框架,可以轻松配置、定制和扩展服务。所有现有的核心服务都应该易于替换或扩展,而不会对基础架构层造成任何影响。提供一个简单的部署模型,使用Maven构建的架构JAR与应用程序完全分离。

Spring Batch的结构:

此分层体系结构突出了三个主要的高级组件:应用程序、核心和基础架构。该应用程序包含开发人员使用SpringBatch编写的所有批处理作业和自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括JobLauncher、Job和Step的实现。应用程序和核心都构建在公共基础架构之上。此基础结构包含公共读写器和服务(如RetryTemplate),应用程序开发人员(读写器,如ItemReader和ItemWriter)和核心框架本身(retry,它是自己的库)都使用这些服务。

下面介绍开发流程

本例完成 读取文件内容,经过处理后,将数据保存到数据库中

引入依赖

<dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-batch</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-web</artifactId></dependency><dependency>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-data-jpa</artifactId></dependency><dependency>  <groupId>mysql</groupId>  <artifactId>mysql-connector-java</artifactId></dependency><dependency>  <groupId>org.hibernate</groupId>  <artifactId>hibernate-validator</artifactId>  <version>6.0.7.Final</version></dependency>
应用配置文件
spring:  datasource:    driverClassName: com.mysql.cj.jdbc.Driver    url: jdbc:mysql://localhost:3306/batch?serverTimezone=GMT%2B8    username: root    password: *******    type: com.zaxxer.hikari.HikariDataSource    hikari:      minimumIdle: 10      maximumPoolSize: 200      autoCommit: true      idleTimeout: 30000      poolName: MasterDatabookHikariCP      maxLifetime: 1800000      connectionTimeout: 30000      connectionTestQuery: SELECT 1---spring:  jpa:    generateDdl: false    hibernate:      ddlAuto: update    openInView: true    show-sql: true---spring:  batch:    job:      enabled: false #是否自动执行任务    initialize-schema: always  #自动为我们创建数据库脚本
开启批处理功能
@Configuration@EnableBatchProcessingpublic class BatchConfig extends DefaultBatchConfigurer{}
任务启动器

接着上一步的配置类BatchConfig重写对应方法

@Overrideprotected JobLauncher createJobLauncher() throws Exception {  SimpleJobLauncher jobLauncher = new SimpleJobLauncher();  jobLauncher.setJobRepository(createJobRepository());  jobLauncher.afterPropertiesSet();  return jobLauncher;}
任务存储

接着上一步的配置类BatchConfig重写对应方法

@Resourceprivate PlatformTransactionManager transactionManager ;@Overrideprotected JobRepository createJobRepository() throws Exception {  JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();  factory.setDatabaseType("mysql");  factory.setTransactionManager(transactionManager);  factory.setDataSource(dataSource);  factory.afterPropertiesSet();  return factory.getObject();}
定义JOB
@Beanpublic Job myJob(JobBuilderFactory builder, @Qualifier("myStep")Step step){  return builder.get("myJob")             .incrementer(new RunIdIncrementer())             .flow(step)             .end()             .listener(jobExecutionListener)             .build();}
定义ItemReader读取器
@Beanpublic ItemReader<Person> reader(){  FlatFileItemReader<Person> reader = new FlatFileItemReader<>();  reader.setResource(new ClassPathResource("cvs/persons.cvs"));  reader.setLineMapper(new DefaultLineMapper<Person>() {    // 代码块    {      setLineTokenizer(new DelimitedLineTokenizer(",") {        {          setNames("id", "name");        }    }) ;  setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {    {      setTargetType(Person.class) ;    }    });  }  });  return reader;}
定义ItemProcessor处理器
@Beanpublic ItemProcessor<Person, Person2> processorPerson(){  return new ItemProcessor<Person, Person2>() {      @Override      public Person2 process(Person item) throws Exception {        Person2 p = new Person2() ;        p.setId(item.getId()) ;        p.setName(item.getName() + ", pk");        return p ;      }  } ;}
定义ItemWriter写数据
@Resourceprivate Validator<Person> validator ;@Resourceprivate EntityManagerFactory entityManagerFactory ;@Beanpublic ItemWriter<Person2> writerPerson(){  JpaItemWriter<Person2> writer = null ;  JpaItemWriterBuilder<Person2> builder = new JpaItemWriterBuilder<>() ;  builder.entityManagerFactory(entityManagerFactory) ;  writer = builder.build() ;  return writer;}
定义Step
@Beanpublic Step myStep(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor){  return stepBuilderFactory             .get("myStep")             .<Person, Person>chunk(2) // Chunk的机制(即每次读取一条数据,再处理一条数据,累积到一定数量后再一次性交给writer进行写入操作)             .reader(reader).faultTolerant().retryLimit(3).retry(Exception.class).skip(Exception.class).skipLimit(2)             .listener(new MyReadListener())             .processor(processor)             .writer(writer).faultTolerant().skip(Exception.class).skipLimit(2)             .listener(new MyWriteListener())             .build();}
定义相应的监听器
public class MyReadListener implements ItemReadListener<Person> {	private Logger logger = LoggerFactory.getLogger(MyReadListener.class);	@Override	public void beforeRead() {	}	@Override	public void afterRead(Person item) {		System.out.println("reader after: " + Thread.currentThread().getName()) ;	}	@Override	public void onReadError(Exception ex) {		logger.info("读取数据错误:{}", ex);	}}
@Componentpublic class MyWriteListener implements ItemWriteListener<Person> {	    private Logger logger = LoggerFactory.getLogger(MyWriteListener.class);     @Override    public void beforeWrite(List<? extends Person> items) {    }        @Override    public void afterWrite(List<? extends Person> items) {    	System.out.println("writer after: " + Thread.currentThread().getName()) ;    }        @Override    public void onWriteError(Exception exception, List<? extends Person> items) {        try {            logger.info(format("%s%n", exception.getMessage()));            for (Person item : items) {                logger.info(format("Failed writing BlogInfo : %s", item.toString()));            }        } catch (Exception e) {            e.printStackTrace();        }    }}

person.cvs文件内容

实体类:

@Entity@Table(name = "t_person")public class Person {		@Id	@GeneratedValue(strategy = GenerationType.IDENTITY)	private Integer id ;	private String name ;}

启动任务执行

@RestController@RequestMapping("/demo")public class DemoController {  @Resource	@Qualifier("myJob")	private Job job ;  @Resource	private JobLauncher launcher ;  @GetMapping("/index")	public Object index() {		JobParameters jobParameters = new JobParametersBuilder().toJobParameters() ;		try {			launcher.run(job, jobParameters) ;		} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException				| JobParametersInvalidException e) {			e.printStackTrace();		}		return "success" ;	}}

启动服务,自动为我们创建了表

执行任务

查看表情况

完毕!!!

公众:Springboot实战案例锦集

标签: #spring事务分批提交