일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
Tags
- AWS
- Spring Boot
- it
- jenkins
- JVM
- php
- redis
- 요리
- jsp
- Spring
- linux
- devops
- db
- tool
- ubuntu
- elasticsearch
- Web Server
- IntelliJ
- springboot
- 맛집
- Spring Batch
- ReactJS
- Gradle
- javascript
- laravel
- MySQL
- java
- Oracle
- Git
- Design Patterns
Archives
- Today
- Total
아무거나
Spring Batch + Redis Pipeline 으로 구현한 성능 개선 본문
반응형
Spring Batch + Redis Pipeline 으로 구현한 성능 개선
코드 참고는 https://github.com/bkjeon1614/java-example-code/tree/develop/spring-batch-mybatis-codebase 에서 참고 부탁드립니다.
Redis Pipeline 이란
Redis의 pipeline은 여러 개의 명령어를 한 번에 보내고, 그 결과를 한 번에 받아올 수 있는 메커니즘입니다. 이를 통해 네트워크 오버헤드를 줄이고 Redis 서버의 처리 성능을 최적화할 수 있다. 또한 주의해야할 점은, Redis 서버의 처리량(capacity)을 고려하여 pipeline의 chunk size를 결정해야 한다.
주의사항
- Request Chunk Size: 먼저 요청하는 chunk size 에 대해 설명하자면 너무 작은 chunk size는 네트워크 오버헤드를 줄이지만 Redis 서버에 부하를 증가시킬 수 있고, 너무 큰 chunk size는 한 번에 처리해야 할 명령어 개수가 많아져서 오히려 성능을 저하시킬 수 있다. 공식문서 상에서는 명시적으로 정해진 값은 없지만, 일반적으로 100에서 1000개 사이의 명령어를 포함하는 것이 효율적인 경우가 많다고 한다.
- Command Timeout: 네트워크 레이턴시를 고려하자면 Pipeline을 사용할 때는 네트워크 지연에 의한 영향을 줄일 수 있지만, 여전히 Redis 서버의 응답 시간은 중요하다. 일반적으로 5초에서 10초 사이의 commandTimeout 값을 고려할 수 있습니다. (너무 짧게 설정하면 Pipeline 이 타임아웃이 될 수 있다.)
- Error Example: org.springframework.data.redis.connection.RedisPipelineException: Pipeline contained one or more invalid commands...
실습
RedisTemplate 의 executePipelined 메소드와 RedisCallback을 사용하여 파이프라이닝을 사용할 수 있다.
실습사양
- Java 17
- Gradle 8.8
- Spring Batch 5.1.2
1. table schema 정의
create table sample (
id bigint not null auto_increment,
amount bigint,
tx_name varchar(255),
tx_date_time datetime,
primary key (id)
) engine = InnoDB;
create table sample_out (
id bigint not null auto_increment,
amount bigint,
tx_name varchar(255),
tx_date_time datetime,
primary key (id)
) engine = InnoDB;
2. RedisConfig 에 redisTemplate bean 추가
...
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new StringRedisSerializer());
return redisTemplate;
}
3. Job 생성
[MybatisSampleRedisJobConfig.java]
package com.bkjeon.job;
import com.bkjeon.feature.entity.sample.Sample;
import com.bkjeon.feature.entity.sample.SampleOut;
import com.bkjeon.feature.mapper.sample.SampleMapper;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobScope;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.StringRedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.transaction.PlatformTransactionManager;
/**
* --job.name=MYBATIS_SAMPLE_REDIS_JOB requestDate=20240701
*/
@Slf4j
@Configuration
@RequiredArgsConstructor
public class MybatisSampleRedisJobConfig {
private static final String JOB_NAME_PREFIX = "MYBATIS_SAMPLE_REDIS";
private static final String REDIS_KEY_PREFIX = "SAMPLE_";
private static final int CHUNK_SIZE = 1000;
private final StringRedisTemplate redisTemplate;
private final SqlSessionFactory sqlSessionFactory;
@Bean
public Job mybatisSampleRedisJob(JobRepository jobRepository,
Step mybatisSampleRedisJobStep1, Step mybatisSampleRedisJobStep2) {
return new JobBuilder(JOB_NAME_PREFIX + "_JOB", jobRepository)
.start(mybatisSampleRedisJobStep1)
.next(mybatisSampleRedisJobStep2)
.build();
}
@Bean
@JobScope
public Step mybatisSampleRedisJobStep1(JobRepository jobRepository,
Tasklet mybatisSampleDataToRedisTasklet, PlatformTransactionManager platformTransactionManager) {
log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mybatisSampleRedisJobStep1");
return new StepBuilder(JOB_NAME_PREFIX + "_JOB_STEP2", jobRepository)
.tasklet(mybatisSampleDataToRedisTasklet, platformTransactionManager).build();
}
/**
* Redis Pipeline Test (pipeline 1000 단위로 처리)
* [사양]
* - redis version: 7.0.5
* 1. 10만건
* - asis: 117초
* - tobe: 7초(1000)
* 2. 100만건
* - asis: 1225초
* - tobe: 64초(1000) - Redis 사용량에 따라 성능이 달라질 수 있다.
* * 약 19배 증가
* 또한 레디스를 사용하여 DB 에서의 집계가 아닌 코드에서 Aggregation 결과를 Redis 에 넣는 방식으로 성능 개선한 사례들이 있다.
*/
@Bean
public Tasklet mybatisSampleDataToRedisTasklet() {
return ((contribution, chunkContext) -> {
log.info(">>>>> This is mybatisSampleDataToRedisTasklet");
// Mock Data
List<Sample> sampleList = new ArrayList<>();
for (long c=1; c <= 1000000; c++) {
sampleList.add(
Sample.builder().id(c).txName("TEST" + c).amount(c * 1000).txDateTime(LocalDateTime.now()).build());
}
long beforeTime = System.currentTimeMillis();
int size = sampleList.size();
for (int i = 0; i < size; i += CHUNK_SIZE) {
List<Sample> chunk = sampleList.subList(i, Math.min(size, i + CHUNK_SIZE));
redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
for (Sample sample: chunk) {
stringRedisConn.set(REDIS_KEY_PREFIX + sample.getId(), "value" + sample.getTxName());
}
return null;
});
}
long afterTime = System.currentTimeMillis();
long secDiffTime = (afterTime - beforeTime) / 1000;
log.info(">>>>>>>>>>>>>>>>>>>>>>>>>> Redis 실행시간(s): {}", secDiffTime);
return RepeatStatus.FINISHED;
});
}
@Bean
@JobScope
public Step mybatisSampleRedisJobStep2(JobRepository jobRepository,
PlatformTransactionManager platformTransactionManager,
@Value("#{jobParameters[requestDate]}") String requestDate) {
log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mybatisSampleRedisJobStep2 requestDate:{} ", requestDate);
return new StepBuilder(JOB_NAME_PREFIX + "_JOB_STEP2", jobRepository)
.<Sample, SampleOut>chunk(CHUNK_SIZE, platformTransactionManager)
.reader(mybatisSampleRedisToRdbPagingItemReader())
.processor(mybatisSampleToSampleOutItemProcessor())
.writer(mybatisSampleRedisToRdbItemWriter())
.build();
}
@Bean
public MyBatisPagingItemReader<Sample> mybatisSampleRedisToRdbPagingItemReader() {
return new MyBatisPagingItemReaderBuilder<Sample>()
.pageSize(CHUNK_SIZE)
.sqlSessionFactory(sqlSessionFactory)
.queryId("com.bkjeon.feature.mapper.sample.SampleMapper.selectZeroOffsetSampleList")
.build();
}
@Bean
public ItemProcessor<Sample, SampleOut> mybatisSampleToSampleOutItemProcessor() {
return item -> SampleOut.builder()
.id(item.getId())
.amount(item.getAmount())
.txName(redisTemplate.opsForValue().get(REDIS_KEY_PREFIX + item.getId()))
.txDateTime(LocalDateTime.now())
.build();
}
@Bean
public MyBatisBatchItemWriter<SampleOut> mybatisSampleRedisToRdbItemWriter() {
return new MyBatisBatchItemWriterBuilder<SampleOut>()
.sqlSessionFactory(sqlSessionFactory)
.statementId("com.bkjeon.feature.mapper.sample.SampleMapper.insertSample")
.build();
}
}
4. Mapper
[SampleMapper.java]
import com.bkjeon.feature.entity.sample.Sample;
import com.bkjeon.feature.entity.sample.SampleOut;
import java.util.List;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface SampleMapper {
...
List<Sample> selectZeroOffsetSampleList();
void insertSample(SampleOut sample);
}
5. Mapper xml
...
<select id="selectZeroOffsetSampleList" resultMap="selectSampleListMap">
SELECT
id,
amount,
tx_name,
tx_date_time
FROM sample
WHERE 1=1
AND id > #{_skiprows}
LIMIT 0, #{_pagesize}
</select>
<insert id="insertSample" parameterType="com.bkjeon.feature.entity.sample.SampleOut">
INSERT INTO sample_out (
amount,
tx_name,
tx_date_time
) VALUES (
#{amount},
#{txName},
#{txDateTime}
)
</insert>
6. Entity 생성
[Sample.java]
package com.bkjeon.feature.entity.sample;
import java.time.LocalDateTime;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
@ToString
@Getter
@NoArgsConstructor
public class Sample {
private Long id;
private Long amount;
private String txName;
private LocalDateTime txDateTime;
@Builder
public Sample(Long id, Long amount, String txName, LocalDateTime txDateTime) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = txDateTime;
}
}
[SampleOut.java]
package com.bkjeon.feature.entity.sample;
import java.time.LocalDateTime;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
@ToString
@Getter
@NoArgsConstructor
public class SampleOut {
private Long id;
private Long amount;
private String txName;
private LocalDateTime txDateTime;
@Builder
public SampleOut(Long id, Long amount, String txName, LocalDateTime txDateTime) {
this.id = id;
this.amount = amount;
this.txName = txName;
this.txDateTime = txDateTime;
}
}
완료 후 batch 를 동작시켜서 pipeline 을 사용한것과 안한것의 차이를 비교해보면 chunk size 1000 개 기준 약 19배의 성능이 증가한걸 확인할 수 있다.
반응형
'Java & Kotlin > Spring Batch & SCDF' 카테고리의 다른 글
Spring Batch Partitioning 구현 (0) | 2024.10.07 |
---|---|
Spring Batch 성능 개선 사례 정리 (5) | 2024.07.22 |
Spring Batch 5 + Mybatis (JdbcItem Reader/Writer) 구현 (0) | 2024.07.16 |
SCDF(=Spring Cloud Data Flow) 입문 (1) | 2023.01.02 |
관리 도구로서의 Jenkins 를 통한 Spring Batch 운영 (0) | 2022.11.11 |
Comments