공부/Spring

[Spring Reactive Redis] Sorted Set 데이터 추가시 nx 옵션 설정하는 방법 (대기열 시스템 with Spring Webflux)

leejinwoo1126 2025. 5. 6. 22:26
반응형

📋 개요

 최근 통신사 이슈로 인해 대기열 시스템을 접할 기회가 생겼고, 개인적인 호기심에 직접 대기열 시스템을 만들어보기로 했다. 아키텍처는 간단히 Redis(Sorted Set), Spring Webflux, Mvc를 활용해 구성했다. 먼저 Webflux 학습 테스트 후 개발을 진행하였고, 테스트와 함께 점진적으로 기능 구현을 수행했다.

 

그런데 개발을 하던 중 대기 페이지에서 새로고침을 할 때마다 사용자 (userId = 105)의 순서가 뒤로 밀리는 현상을 확인💩했다.

 

 

🔎 코드 레벨 분석

Webflux 모듈에 대기열 추가 로직대기열 순위 확인 로직을 아래와 같이 구현했다. 대기열 큐(WAITING_QUEUE)에 사용자 추가를 하게 되면 Mono<Boolean>을 반환하게 된다. 이때 대기열 큐에 이미 추가된 사용자라면 예외를 던지고, 처음 등록한 사용자라면 대기열 순위를 반환한다.

@Service
@RequiredArgsConstructor
public class QueueService {
    private final RedisRepository redisRepository;

    public Mono<Long> enqueueWaitingQueue(Long userId) {
        long unixTimestamp = Instant.now().getEpochSecond();
        String queue = WAITING_QUEUE.getKey();
        return redisRepository.addZSet(queue, userId, unixTimestamp)
                .filter(i -> i)
                .switchIfEmpty(Mono.error(ALREADY_RESISTER_USER.build()))
                .flatMap(i -> redisRepository.zRank(queue, userId))
                .map(i -> i >= 0 ? i + 1 : i);
    }
    
    // ..
    
    public Mono<Long> checked(Long userId) {
        return isAllowed(userId)     // 진입 허용된 사용자인가?
                .filter(Boolean::booleanValue)
                .flatMap(allowed -> Mono.just(0L)) // 맞다면 O을 반환
                .switchIfEmpty(enqueueWaitingQueue(userId)
                                .onErrorResume(ex -> redisRepository.zRank(WAITING_QUEUE.getKey(), userId).map(i -> i >= 0 ? i + 1 : i))
                ); // 아닌 경우 WAITING QUEUE에 추가하거나 WAITING QUEUE에서 순위를 반환
    }
}

 

 

ReactiveRedisTemplate를 사용해서 대기열(Sorted Set)에 사용자를 추가한다

@Repository
@RequiredArgsConstructor
public class RedisRepositoryImpl implements RedisRepository {
    private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;

    @Override
    public Mono<Boolean> addZSet(String queue, Long userId, Long timestamp) {
        return reactiveRedisTemplate.opsForZSet()
                .add(queue, userId.toString(), timestamp);
    }
    
    
    // ..
}

 

 

테스트 재실행해도 정상적으로 통과해서 이상없는 줄 알았다.

@SpringBootTest
@Import(EmbeddedRedis.class)
@ActiveProfiles("test")
public class RedisRepositoryTest {

    @Autowired
    private ReactiveRedisTemplate<String, String> reactiveRedisTemplate;

    private RedisRepositoryImpl redisRepository;

    @BeforeEach
    void setUp() {
        redisRepository = new RedisRepositoryImpl(reactiveRedisTemplate);

        ReactiveRedisConnection reactiveConnection = reactiveRedisTemplate.getConnectionFactory().getReactiveConnection();
        reactiveConnection.serverCommands().flushAll().subscribe();
    }

    @DisplayName("대기열 큐에 등록된 사용자의 경우 false를 반환한다")
    @Test
    void addZSetWhenDuplicated() {
        Long userId = 1L;
        String queue = QueueManager.WAITING_QUEUE.getKey();

        StepVerifier.create(redisRepository.addZSet(queue, userId, Instant.now().getEpochSecond()))
                .expectNext(true)
                .verifyComplete();

        StepVerifier.create(redisRepository.addZSet(queue, userId, 100L))
                .expectNext(false)
                .verifyComplete();
    }
}

 

 

그런데 add(..) 문서를 살펴보니 sorted set에 key에 해당하는 value가 있는 경우 score를 update 한다고 적혀있었다.

 

혹시나 싶어서 redis-cli로 직접 zadd 명령어를 실행했고, 두번째 입력시 score가 갱신되고 0이 반환되는 것을 확인했다. 결국 같은 사용자로 대기열 큐에 두번째 add가 발생할 경우 score가 업데이트 처리되고 0을 반환하기 때문에 테스트가 문제없이 통과되었다는 것을 알게 되었다.

 

🛠️ 리팩터링 

이미 대기열(Sorted Set)에 추가된 사용자의 경우 score가 갱신되지 않도록 해야 했다. 처음에는 대기열에 사용자가 있는지 우선 조회하는 방법을 고려했으나, 좋지 않다고 생각했다. 그래서 공식 문서를 찾아보니 NX 옵션이 있다는 것을 알게 되어 이를 사용하기로 했다.

 

 

그런데 ReactiveRedisTemplate.opsForZset().add(..)에서는 nx 옵션을 추가할 수 없었다. 그래서 API를 살펴보니 nx 옵션이 있는 ReactiveZSetCommands.ZAddCommon 클래스를 찾았고, 빌더 패턴처럼 내가 원하는 커맨드를 생성할 수 있도록 지원해 준다는 걸 알 수 있었다. (기본 생성자는 private 접근 제어자로 선언되어 사용 ❌) 

@Repository
@RequiredArgsConstructor
public class RedisRepositoryImpl implements RedisRepository {
    private final ReactiveRedisTemplate<String, String> reactiveRedisTemplate;

    @Override
    public Mono<Boolean> addZSetIfAbsent(String queue, Long userId, Long timestamp) {
        ReactiveZSetCommands.ZAddCommand zAddCommand = ReactiveZSetCommands.ZAddCommand.tuple(Tuple.of(userId.toString().getBytes(), timestamp.doubleValue()))
                .nx().to(ByteBuffer.wrap(queue.getBytes()));

        return reactiveRedisTemplate.getConnectionFactory()
                .getReactiveConnection()
                .zSetCommands()
                .zAdd(Mono.just(zAddCommand))
                .next()
                .map(response -> response.getOutput() != null && response.getOutput().intValue() == 1); // 추가 성공: 1, 실패: 0
    }
	
    //..
}

 

 

zAdd(..) 에는 Publisher<ZAddCommand> 타입이 필요했는데, Mono 자체가 Publisher 인터페이스를 상속하고 있어서 Mono.just(..)로 처리할 수 있었다.

 

 

 

이후 테스트 코드도 수정했고, 의도한대로 정상 통과했다.

@DisplayName("대기열 큐에 등록된 사용자의 경우 false를 반환한다")
@Test
void addZSetWhenDuplicated() {
    Long userId = 1L;
    String queue = QueueManager.WAITING_QUEUE.getKey();

    StepVerifier.create(redisRepository.addZSetIfAbsent(queue, userId, Instant.now().getEpochSecond()))
            .expectNext(true)
            .verifyComplete();

    StepVerifier.create(redisRepository.addZSetIfAbsent(queue, userId, Instant.now().getEpochSecond()))
            .expectNext(false)
            .verifyComplete();
}

 

 

🙌 결과 

 사용자 (userId = 105) 접속시 대기열 페이지를 새로고침하더라도 순위가 밀리지 않게 되었다. Redis Sorted Set에 데이터 추가할 때 score가 갱신되는 걸 redis-cli로 학습 테스트를 했었는데, 별 생각없이 *template 클래스 사용하다가 실수했던거 같다. 개인적으로 이번 기회에 코드 레벨에서 분석해 볼 수 있었고, Redis 자료구조를 다시 공부해보는 계기가 되어서 좋았다.

 

 

반응형