아무거나

Spring Batch + Redis Pipeline 으로 구현한 성능 개선 본문

Java & Kotlin/Spring Batch & SCDF

Spring Batch + Redis Pipeline 으로 구현한 성능 개선

전봉근 2024. 8. 30. 14:31
반응형

Spring Batch + Redis Pipeline 으로 구현한 성능 개선

코드 참고는 https://github.com/bkjeon1614/java-example-code/tree/develop/spring-batch-mybatis-codebase 에서 참고 부탁드립니다.

Redis Pipeline 이란

Redis의 pipeline은 여러 개의 명령어를 한 번에 보내고, 그 결과를 한 번에 받아올 수 있는 메커니즘입니다. 이를 통해 네트워크 오버헤드를 줄이고 Redis 서버의 처리 성능을 최적화할 수 있다. 또한 주의해야할 점은, Redis 서버의 처리량(capacity)을 고려하여 pipeline의 chunk size를 결정해야 한다.

주의사항

  • Request Chunk Size: 먼저 요청하는 chunk size 에 대해 설명하자면 너무 작은 chunk size는 네트워크 오버헤드를 줄이지만 Redis 서버에 부하를 증가시킬 수 있고, 너무 큰 chunk size는 한 번에 처리해야 할 명령어 개수가 많아져서 오히려 성능을 저하시킬 수 있다. 공식문서 상에서는 명시적으로 정해진 값은 없지만, 일반적으로 100에서 1000개 사이의 명령어를 포함하는 것이 효율적인 경우가 많다고 한다.
  • Command Timeout: 네트워크 레이턴시를 고려하자면 Pipeline을 사용할 때는 네트워크 지연에 의한 영향을 줄일 수 있지만, 여전히 Redis 서버의 응답 시간은 중요하다. 일반적으로 5초에서 10초 사이의 commandTimeout 값을 고려할 수 있습니다. (너무 짧게 설정하면 Pipeline 이 타임아웃이 될 수 있다.)
    • Error Example: org.springframework.data.redis.connection.RedisPipelineException: Pipeline contained one or more invalid commands...

실습

RedisTemplate 의 executePipelined 메소드와 RedisCallback을 사용하여 파이프라이닝을 사용할 수 있다.

실습사양

  • Java 17
  • Gradle 8.8
  • Spring Batch 5.1.2

1. table schema 정의

    create table sample (
      id         bigint not null auto_increment,
      amount     bigint,
      tx_name     varchar(255),
      tx_date_time datetime,
      primary key (id)
    ) engine = InnoDB;

    create table sample_out (
      id         bigint not null auto_increment,
      amount     bigint,
      tx_name     varchar(255),
      tx_date_time datetime,
      primary key (id)
    ) engine = InnoDB;

 

 

2. RedisConfig 에 redisTemplate bean 추가

    ...

    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        return redisTemplate;
    }

 

 

3. Job 생성
[MybatisSampleRedisJobConfig.java]

package com.bkjeon.job;

    import com.bkjeon.feature.entity.sample.Sample;
    import com.bkjeon.feature.entity.sample.SampleOut;
    import com.bkjeon.feature.mapper.sample.SampleMapper;
    import java.time.LocalDateTime;
    import java.util.ArrayList;
    import java.util.List;
    import lombok.RequiredArgsConstructor;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.ibatis.session.SqlSessionFactory;
    import org.mybatis.spring.batch.MyBatisBatchItemWriter;
    import org.mybatis.spring.batch.MyBatisPagingItemReader;
    import org.mybatis.spring.batch.builder.MyBatisBatchItemWriterBuilder;
    import org.mybatis.spring.batch.builder.MyBatisPagingItemReaderBuilder;
    import org.springframework.batch.core.Job;
    import org.springframework.batch.core.Step;
    import org.springframework.batch.core.configuration.annotation.JobScope;
    import org.springframework.batch.core.job.builder.JobBuilder;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.step.builder.StepBuilder;
    import org.springframework.batch.core.step.tasklet.Tasklet;
    import org.springframework.batch.item.ItemProcessor;
    import org.springframework.batch.repeat.RepeatStatus;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.StringRedisConnection;
    import org.springframework.data.redis.core.RedisCallback;
    import org.springframework.data.redis.core.StringRedisTemplate;
    import org.springframework.transaction.PlatformTransactionManager;

    /**
    * --job.name=MYBATIS_SAMPLE_REDIS_JOB requestDate=20240701
    */
    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    public class MybatisSampleRedisJobConfig {

        private static final String JOB_NAME_PREFIX = "MYBATIS_SAMPLE_REDIS";
        private static final String REDIS_KEY_PREFIX = "SAMPLE_";
        private static final int CHUNK_SIZE = 1000;

        private final StringRedisTemplate redisTemplate;
        private final SqlSessionFactory sqlSessionFactory;

        @Bean
        public Job mybatisSampleRedisJob(JobRepository jobRepository,
            Step mybatisSampleRedisJobStep1, Step mybatisSampleRedisJobStep2) {
            return new JobBuilder(JOB_NAME_PREFIX + "_JOB", jobRepository)
                .start(mybatisSampleRedisJobStep1)
                .next(mybatisSampleRedisJobStep2)
                .build();
        }

        @Bean
        @JobScope
        public Step mybatisSampleRedisJobStep1(JobRepository jobRepository,
            Tasklet mybatisSampleDataToRedisTasklet, PlatformTransactionManager platformTransactionManager) {
            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mybatisSampleRedisJobStep1");
            return new StepBuilder(JOB_NAME_PREFIX + "_JOB_STEP2", jobRepository)
                .tasklet(mybatisSampleDataToRedisTasklet, platformTransactionManager).build();
        }

        /**
        * Redis Pipeline Test (pipeline 1000 단위로 처리)
        * [사양]
        * - redis version: 7.0.5
        * 1. 10만건
        *    - asis: 117초
        *    - tobe: 7초(1000)
        * 2. 100만건
        *    - asis: 1225초
        *    - tobe: 64초(1000) - Redis 사용량에 따라 성능이 달라질 수 있다.
        * * 약 19배 증가
        * 또한 레디스를 사용하여 DB 에서의 집계가 아닌 코드에서 Aggregation 결과를 Redis 에 넣는 방식으로 성능 개선한 사례들이 있다.
        */
        @Bean
        public Tasklet mybatisSampleDataToRedisTasklet() {
            return ((contribution, chunkContext) -> {
                log.info(">>>>> This is mybatisSampleDataToRedisTasklet");

                // Mock Data
                List<Sample> sampleList = new ArrayList<>();
                for (long c=1; c <= 1000000; c++) {
                    sampleList.add(
                        Sample.builder().id(c).txName("TEST" + c).amount(c * 1000).txDateTime(LocalDateTime.now()).build());
                }

                long beforeTime = System.currentTimeMillis();
                int size = sampleList.size();
                for (int i = 0; i < size; i += CHUNK_SIZE) {
                    List<Sample> chunk = sampleList.subList(i, Math.min(size, i + CHUNK_SIZE));

                    redisTemplate.executePipelined(
                        (RedisCallback<Object>) connection -> {
                            StringRedisConnection stringRedisConn = (StringRedisConnection) connection;
                            for (Sample sample: chunk) {
                                stringRedisConn.set(REDIS_KEY_PREFIX + sample.getId(), "value" + sample.getTxName());
                            }
                            return null;
                        });
                }

                long afterTime = System.currentTimeMillis();
                long secDiffTime = (afterTime - beforeTime) / 1000;
                log.info(">>>>>>>>>>>>>>>>>>>>>>>>>> Redis 실행시간(s): {}", secDiffTime);
                return RepeatStatus.FINISHED;
            });
        }

        @Bean
        @JobScope
        public Step mybatisSampleRedisJobStep2(JobRepository jobRepository,
            PlatformTransactionManager platformTransactionManager,
            @Value("#{jobParameters[requestDate]}") String requestDate) {
            log.info(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mybatisSampleRedisJobStep2 requestDate:{} ", requestDate);
            return new StepBuilder(JOB_NAME_PREFIX + "_JOB_STEP2", jobRepository)
                .<Sample, SampleOut>chunk(CHUNK_SIZE, platformTransactionManager)
                .reader(mybatisSampleRedisToRdbPagingItemReader())
                .processor(mybatisSampleToSampleOutItemProcessor())
                .writer(mybatisSampleRedisToRdbItemWriter())
                .build();
        }

        @Bean
        public MyBatisPagingItemReader<Sample> mybatisSampleRedisToRdbPagingItemReader() {
            return new MyBatisPagingItemReaderBuilder<Sample>()
                .pageSize(CHUNK_SIZE)
                .sqlSessionFactory(sqlSessionFactory)
                .queryId("com.bkjeon.feature.mapper.sample.SampleMapper.selectZeroOffsetSampleList")
                .build();
        }

        @Bean
        public ItemProcessor<Sample, SampleOut> mybatisSampleToSampleOutItemProcessor() {
            return item -> SampleOut.builder()
                .id(item.getId())
                .amount(item.getAmount())
                .txName(redisTemplate.opsForValue().get(REDIS_KEY_PREFIX + item.getId()))
                .txDateTime(LocalDateTime.now())
                .build();
        }

        @Bean
        public MyBatisBatchItemWriter<SampleOut> mybatisSampleRedisToRdbItemWriter() {
            return new MyBatisBatchItemWriterBuilder<SampleOut>()
                .sqlSessionFactory(sqlSessionFactory)
                .statementId("com.bkjeon.feature.mapper.sample.SampleMapper.insertSample")
                .build();
        }

    }

 

 

4. Mapper

[SampleMapper.java]

    import com.bkjeon.feature.entity.sample.Sample;
    import com.bkjeon.feature.entity.sample.SampleOut;
    import java.util.List;
    import org.apache.ibatis.annotations.Mapper;

    @Mapper
    public interface SampleMapper {
        ...

        List<Sample> selectZeroOffsetSampleList();        
        void insertSample(SampleOut sample);

    }

 

 

5. Mapper xml

    ...

    <select id="selectZeroOffsetSampleList" resultMap="selectSampleListMap">
        SELECT
            id,
            amount,
            tx_name,
            tx_date_time
        FROM sample
        WHERE 1=1
        AND id > #{_skiprows}
        LIMIT 0, #{_pagesize}
    </select>   

    <insert id="insertSample" parameterType="com.bkjeon.feature.entity.sample.SampleOut">
        INSERT INTO sample_out (
            amount,
            tx_name,
            tx_date_time
        ) VALUES (
            #{amount},
            #{txName},
            #{txDateTime}
        )
    </insert>

 

 

6. Entity 생성

[Sample.java]

    package com.bkjeon.feature.entity.sample;

    import java.time.LocalDateTime;
    import lombok.Builder;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.ToString;

    @ToString
    @Getter
    @NoArgsConstructor
    public class Sample {

        private Long id;
        private Long amount;
        private String txName;
        private LocalDateTime txDateTime;

        @Builder
        public Sample(Long id, Long amount, String txName, LocalDateTime txDateTime) {
            this.id = id;
            this.amount = amount;
            this.txName = txName;
            this.txDateTime = txDateTime;
        }

    }

 

[SampleOut.java]

    package com.bkjeon.feature.entity.sample;

    import java.time.LocalDateTime;
    import lombok.Builder;
    import lombok.Getter;
    import lombok.NoArgsConstructor;
    import lombok.ToString;

    @ToString
    @Getter
    @NoArgsConstructor
    public class SampleOut {

        private Long id;
        private Long amount;
        private String txName;
        private LocalDateTime txDateTime;

        @Builder
        public SampleOut(Long id, Long amount, String txName, LocalDateTime txDateTime) {
            this.id = id;
            this.amount = amount;
            this.txName = txName;
            this.txDateTime = txDateTime;
        }

    }

 

 

완료 후 batch 를 동작시켜서 pipeline 을 사용한것과 안한것의 차이를 비교해보면 chunk size 1000 개 기준 약 19배의 성능이 증가한걸 확인할 수 있다.

반응형
Comments