项目需求
近日需要实现用户推荐相关的功能,也就是说向用户推荐他可能喜欢的东西。
我们的数据分析工程师会将用户以及用户可能喜欢的东西整理成文档给我,我只需要将数据从文档中读取出来,然后对数据进行进一步的清洗(例如去掉特殊符号,长度如果太长则截取)。然后将处理后的数据存入数据库(Mysql)。
所以分为三步:
- 读取文档获得数据
- 对获得的数据进行处理
- 更新数据库(新增或更新)
考虑到这个数据量以后会越来越大,这里没有使用 poi 来读取数据,而直接使用了 SpringBatch。
实现步骤
本文假设读者已经能够使用 SpringBoot 连接处理 Mysql,所以这部分文中会省略。
1、创建 Maven 项目,并在 pom.xml 中添加依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.2.RELEASE</version> </parent> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>1.2.0</version> </dependency> <!-- 工具类依赖--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.12.6</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <!-- 数据库相关依赖 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.0.26</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies></div>
这里是这个小项目中用到的所有依赖,包括连接数据库的依赖以及工具类等。
2、编写 Model 类
我们要从文档中读取的有效列就是 uid,tag,type,就是用户 ID,用户可能包含的标签(用于推送),用户类别(用户用户之间互相推荐)。
UserMap.java 中的 @Entity,@Column 注解,是为了利用 JPA 生成数据表而写的,可要可不要。
UserMap.java
@Data
@EqualsAndHashCode
@NoArgsConstructor
@AllArgsConstructor
//@Entity(name = "user_map")
public class UserMap extends BaseModel {
@Column(name = "uid", unique = true, nullable = false)
private Long uid;
@Column(name = "tag")
private String tag;
@Column(name = "type")
private Integer type;
}
</div>
3、实现批处理配置类
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("prodDataSource")
DataSource prodDataSource;
@Bean
public FlatFileItemReader<UserMap> reader() {
FlatFileItemReader<UserMap> reader = new FlatFileItemReader<>();
reader.setResource(new ClassPathResource("c152.txt"));
reader.setLineMapper(new DefaultLineMapper<UserMap>() {{
setLineTokenizer(new DelimitedLineTokenizer("|") {{
setNames(new String[]{"uid", "tag", "type"});
}});
setFieldSetMapper(new BeanWrapperFieldSetMapper<UserMap>() {{
setTargetType(UserMap.class);
}});
}});
return reader;
}
@Bean
public JdbcBatchItemWriter<UserMap> importWriter() {
JdbcBatchItemWriter<UserMap> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)");
writer.setDataSource(prodDataSource);
return writer;
}
@Bean
public JdbcBatchItemWriter<UserMap> updateWriter() {
JdbcBatchItemWriter<UserMap> writer = new JdbcBatchItemWriter<>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>());
writer.setSql("UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)");
writer.setDataSource(prodDataSource);
return writer;
}
@Bean
public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) {
return new UserMapItemProcessor(processStatus);
}
@Bean
public Job importUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("importUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(importStep())
.end()
.build();
}
@Bean
public Step importStep() {
return stepBuilderFactory.get("importStep")
.<UserMap, UserMap>chunk(100)
.reader(reader())
.processor(processor(IMPORT))
.writer(importWriter())
.build();
}
@Bean
public Job updateUserJob(JobCompletionNotificationListener listener) {
return jobBuilderFactory.get("updateUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener)
.flow(updateStep())
.end()
.build();
}
@Bean
public Step updateStep() {
return stepBuilderFactory.get("updateStep")
.<UserMap, UserMap>chunk(100)
.reader(reader())
.processor(processor(UPDATE))
.writer(updateWriter())
.build();
}
}
</div>
prodDataSource 是假设用户已经设置好的,如果不知道怎么配置,也可以参考之前的文章进行配置:Springboot 集成 Mybatis。
reader(),这方法从文件中读取数据,并且设置了一些必要的参数。紧接着是写操作 importWriter() 和 updateWriter() ,读者看其中一个就好,因为我这里是需要更新或者修改的,所以分为两个。
processor(ProcessStatus status) ,该方法是对我们处理数据的类进行实例化,这里我根据 status 是 IMPORT 还是 UPDATE 来获取不同的处理结果。
其他的看代码就可以看懂了,哈哈,不详细说了。
4、将获得的数据进行清洗
UserMapItemProcessor.java
public class UserMapItemProcessor implements ItemProcessor<UserMap, UserMap> {
private static final int MAX_TAG_LENGTH = 200;
private ProcessStatus processStatus;
public UserMapItemProcessor(ProcessStatus processStatus) {
this.processStatus = processStatus;
}
@Autowired
IUserMapService userMapService;
private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\\u4E00-\\u9FA5_-]+$";
public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR);
private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor.class);
@Override
public UserMap process(UserMap

