Redis 지연된 큐의 간략화
Grace Collins
Solutions Engineer · Leapcell

지연된 큐는 기본적으로 실행을 지연시키는 메시지 큐입니다. 어떤 비즈니스 시나리오에서 유용할까요?
실제 시나리오
- 주문 결제 실패 시, 사용자에게 주기적으로 알림을 보냅니다.
- 사용자 동시성 발생 시, 사용자에게 이메일 발송을 2분 지연시킬 수 있습니다.
Redis를 사용하여 기본적인 메시지 큐 구현하기
아시다시피, Kafka 및 RabbitMQ와 같은 전문 메시지 큐 미들웨어의 경우, 소비자는 메시지를 소비하기 전에 일련의 복잡한 단계를 거쳐야 합니다.
예를 들어, RabbitMQ에서는 메시지를 보내기 전에 Exchange를 생성하고, Queue를 생성한 다음, 일부 라우팅 규칙으로 Queue와 Exchange를 바인딩하고, 메시지를 보낼 때 라우팅 키를 지정하고, 메시지 헤더 정보를 제어해야 합니다. 그러나 대부분의 경우 메시지 큐에 단일 소비자만 있는 경우에도 위의 프로세스를 거쳐야 합니다.
Redis를 사용하면 소비자 그룹이 하나뿐인 메시지 큐의 경우 훨씬 간단해집니다. Redis는 전문 메시지 큐가 아니며 고급 기능이 부족합니다. 즉, ACK 보장이 없습니다. 따라서 메시지에 대한 엄격한 안정성 요구 사항이 있는 경우 Redis가 적합하지 않을 수 있습니다.
비동기 메시지 큐의 기본 구현
Redis의 list
데이터 구조는 일반적으로 비동기 메시지 큐에 사용됩니다. rpush
또는 lpush
를 사용하여 항목을 큐에 넣고 lpop
또는 rpop
을 사용하여 큐에서 제거할 수 있습니다.
> rpush queue Leapcell_1 Leapcell_2 Leapcell_3 (integer) 3 > lpop queue "Leapcell_1" > llen queue (integer) 2
문제 1: 큐가 비어 있으면 어떻게 될까요?
클라이언트는 pop 작업을 사용하여 메시지를 가져와 처리합니다. 처리가 끝나면 다음 메시지를 가져와 계속 처리합니다. 이 주기가 반복됩니다. 이것이 큐 소비자의 수명 주기입니다.
그러나 큐가 비어 있으면 클라이언트는 pop 작업의 데드 루프에 진입합니다. 즉, 데이터 없이 계속해서 pop 작업을 반복합니다. 이것은 낭비적이고 비효율적인 폴링입니다. 클라이언트의 CPU 사용량이 급증할 뿐만 아니라 Redis QPS도 증가시킵니다. 수십 개의 클라이언트가 이처럼 폴링하면 Redis에 상당한 수의 느린 쿼리가 발생할 수 있습니다.
일반적으로 스레드를 1초 동안 절전 모드로 전환하는 sleep 작업을 통해 이 문제를 해결합니다. 이렇게 하면 클라이언트 측의 CPU 사용량이 줄어들고 Redis QPS도 낮아집니다.
문제 2: 큐 대기 시간
절전 모드는 문제를 해결하는 데 도움이 되지만 소비자가 하나뿐인 경우 지연 시간은 1초입니다. 소비자가 여러 명이면 절전 시간이 엇갈리기 때문에 지연 시간이 다소 줄어듭니다.
이 대기 시간을 획기적으로 줄일 수 있는 방법이 있을까요?
예, blpop
/brpop
을 사용하면 됩니다.
b
접두사는 blocking(차단)을 의미합니다. 즉, 읽기 차단입니다.
큐에 데이터가 없으면 읽기 차단으로 인해 스레드가 즉시 절전 모드로 전환되고 데이터가 도착하는 즉시 깨어납니다. 이렇게 하면 메시지 대기 시간이 거의 0으로 줄어듭니다. lpop
/rpop
을 blpop
/brpop
으로 바꾸면 위의 문제를 완벽하게 해결할 수 있습니다.
문제 3: 유휴 연결이 자동으로 끊어짐
해결해야 할 또 다른 문제가 있습니다. 바로 유휴 연결입니다.
스레드가 너무 오랫동안 차단된 상태로 유지되면 Redis 클라이언트 연결이 유휴 상태가 됩니다. 대부분의 서버는 리소스 사용량을 줄이기 위해 유휴 연결을 적극적으로 닫습니다. 이 경우 blpop
/brpop
에서 예외가 발생합니다.
따라서 클라이언트 측 소비자 로직을 작성할 때는 예외를 catch하고 재시도 로직을 구현해야 합니다.
분산 잠금 충돌 처리
요청을 처리하는 동안 클라이언트가 분산 잠금을 획득하지 못하면 어떻게 될까요?
일반적으로 잠금 획득 실패를 처리하는 세 가지 전략이 있습니다.
- 예외를 직접 throw하고 사용자에게 나중에 다시 시도하라고 알립니다.
- 잠시 절전 모드로 전환한 다음 다시 시도합니다.
- 요청을 지연된 큐로 이동하여 나중에 다시 시도합니다.
특정 유형의 예외 직접 Throw
이 접근 방식은 사용자 시작 요청에 적합합니다. 사용자에게 오류 대화 상자가 표시되면 일반적으로 메시지를 읽고 "다시 시도"를 클릭하여 자연스럽게 지연을 만듭니다. 사용자 경험을 개선하기 위해 프런트엔드 코드는 사용자에 의존하는 대신 이 재시도 지연을 처리할 수 있습니다. 기본적으로 이 전략은 현재 요청을 포기하고 다시 시작할지 여부를 사용자가 결정하도록 남겨둡니다.
Sleep
sleep
을 사용하면 현재 메시지 처리 스레드가 차단되어 큐의 후속 메시지 처리가 지연됩니다. 충돌이 자주 발생하거나 큐에 메시지가 많은 경우 sleep
이 적합하지 않을 수 있습니다. 잠금 획득 실패가 데드락된 키로 인해 발생한 경우 스레드가 완전히 멈춰 더 이상 메시지를 처리할 수 없습니다.
지연된 큐
이 전략은 비동기 메시지 처리에 더 적합합니다. 충돌하는 요청을 다른 큐에 넣어 나중에 처리하여 즉각적인 경합을 피합니다.
지연된 큐 구현
Redis의 zset
데이터 구조를 사용하여 요소를 정렬하기 위해 타임스탬프를 점수로 할당할 수 있습니다. zadd score1 value1 ...
명령을 사용하여 메모리에서 메시지를 계속 생성합니다. 그런 다음 zrangebyscore
를 사용하여 처리할 준비가 된 모든 작업을 쿼리합니다. 이러한 작업을 반복하여 하나씩 실행할 수 있습니다. zrangebyscore key min max withscores limit 0 1
을 사용하여 가장 빠른 작업만 쿼리하여 소비할 수도 있습니다.
private Jedis jedis; public void redisDelayQueueTest() { String key = "delay_queue"; // 실제 애플리케이션에서는 비즈니스 ID와 임의로 생성된 고유 ID를 값으로 사용하는 것이 좋습니다. // 고유 ID는 메시지 고유성을 보장하고 비즈니스 ID는 값에 너무 많은 데이터를 담는 것을 방지합니다. String orderId1 = UUID.randomUUID().toString(); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId1); String orderId2 = UUID.randomUUID().toString(); jedis.zadd(queueKey, System.currentTimeMillis() + 5000, orderId2); new Thread() { @Override public void run() { while (true) { Set<String> resultList; // 첫 번째 항목만 가져오기 (비파괴적 읽기) resultList = jedis.zrangebyscore(key, System.currentTimeMillis(), 0, 1); if (resultList.size() == 0) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); break; } } else { // 가져온 데이터 제거 if (jedis.zrem(key, resultList.iterator().next()) > 0) { String orderId = resultList.iterator().next(); log.info("orderId = {}", orderId); this.handleMsg(orderId); } } } } }.start(); } public void handleMsg(T msg) { System.out.println(msg); }
위의 구현은 다중 스레드 시나리오에서도 잘 작동합니다. 스레드가 T1과 T2, 그리고 더 많을 수도 있다고 가정해 보겠습니다. 로직은 다음과 같이 진행되어 하나의 스레드만 메시지를 처리하도록 보장합니다.
- T1, T2 및 기타 스레드는
zrangebyscore
를 호출하고 메시지 A를 검색합니다. - T1이 메시지 A 삭제를 시작합니다. 이는 원자적 작업이므로 T2 및 기타 스레드는 T1이
zrem
을 완료할 때까지 기다린 후 진행합니다. - T1이 메시지 A를 성공적으로 삭제하고 처리합니다.
- T2 및 기타 스레드는 메시지 A를 삭제하려고 시도하지만 이미 제거되었으므로 실패하고 처리를 포기합니다.
또한 handleMsg
에 예외 처리를 추가하여 하나의 잘못된 작업으로 인해 전체 처리 루프가 중단되지 않도록 해야 합니다.
추가 최적화
위의 알고리즘에서는 동일한 작업을 여러 프로세스에서 가져올 수 있으며 그 중 하나만 zrem
을 사용하여 삭제에 성공합니다. 다른 프로세스는 헛되이 작업을 가져왔습니다. 이는 낭비입니다. 이를 개선하기 위해 Lua 스크립팅을 사용하여 zrangebyscore
및 zrem
을 서버 측에서 단일 원자적 작업으로 결합하여 로직을 최적화할 수 있습니다. 이렇게 하면 동일한 작업을 두고 경쟁하는 여러 프로세스가 불필요한 페치를 수행하지 않습니다.
Lua 스크립트를 사용하여 추가 최적화
Lua 스크립트는 만료된 메시지를 확인하고 제거하며 삭제에 성공하면 메시지를 반환합니다. 그렇지 않으면 빈 문자열을 반환합니다.
String luaScript = "local resultArray = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'limit', 0, 1)\n" + "if #resultArray > 0 then\n" + " if redis.call('zrem', KEYS[1], resultArray[1]) > 0 then\n" + " return resultArray[1]\n" + " else\n" + " return ''\n" + " end\n" + "else\n" + " return ''\n" + "end"; jedis.eval(luaScript, ScriptOutputType.VALUE, new String[]{key}, String.valueOf(System.currentTimeMillis()));
Redis 기반 지연된 큐의 장점
Redis는 지연된 큐를 구현하는 데 사용할 때 다음과 같은 장점을 제공합니다.
- Redis
zset
은 고성능 점수 기반 정렬을 제공합니다. - Redis는 메모리에서 작동하므로 매우 빠릅니다.
- Redis는 클러스터링을 지원합니다. 메시지가 많으면 클러스터를 통해 메시지 처리 속도와 가용성을 향상시킬 수 있습니다.
- Redis는 영구성을 지원합니다. 오류 발생 시 AOF 또는 RDB를 사용하여 데이터를 복구하여 안정성을 보장할 수 있습니다.
Redis 기반 지연된 큐의 단점
그러나 Redis 기반 지연된 큐에는 몇 가지 제한 사항도 있습니다.
- 메시지 영구성 및 안정성은 여전히 문제입니다. Redis는 영구성을 지원하지만 전용 MQ만큼 안정적이지는 않습니다.
- 재시도 메커니즘 없음 – 메시지 처리 중에 예외가 발생하면 Redis는 기본 제공 재시도 메커니즘을 제공하지 않습니다. 재시도 횟수 관리를 포함하여 직접 구현해야 합니다.
- ACK 메커니즘 없음 – 예를 들어 클라이언트가 메시지를 검색하여 삭제했지만 처리 중에 충돌이 발생하면 메시지가 손실됩니다. 반대로 메시지 큐(MQ)는 메시지를 제거하기 전에 성공적인 처리를 확인하기 위해 승인이 필요합니다.
메시지 안정성이 중요한 경우에는 전용 MQ를 사용하는 것이 좋습니다.
Redisson으로 지연된 큐 구현
Redisson 기반 분산 지연된 큐(RDelayedQueue
)는 RQueue
인터페이스를 기반으로 구축되었으며 큐에 항목 추가를 지연시키는 기능을 제공합니다. 이는 기하급수적으로 증가하거나 감소하는 메시지 전달 전략을 구현하는 데 사용할 수 있습니다.
RQueue<String> destinationQueue = ... RDelayedQueue<String> delayedQueue = getDelayedQueue(destinationQueue); // 10초 후에 큐에 메시지 보내기 delayedQueue.offer("msg1", 10, TimeUnit.SECONDS); // 1분 후에 큐에 메시지 보내기 delayedQueue.offer("msg2", 1, TimeUnit.MINUTES);
개체가 더 이상 필요하지 않으면 적극적으로 제거해야 합니다. 연결된 Redisson 개체가 종료되는 경우에만 수동으로 제거하지 않는 것이 허용됩니다.
RDelayedQueue<String> delayedQueue = ... delayedQueue.destroy();
정말 편리하지 않나요?
저희 Leapcell은 백엔드 프로젝트 호스팅에 있어 최고의 선택입니다.
Leapcell은 웹 호스팅, 비동기 작업 및 Redis를 위한 차세대 서버리스 플랫폼입니다.
다국어 지원
- Node.js, Python, Go 또는 Rust로 개발하세요.
무제한 프로젝트를 무료로 배포
- 사용량에 대해서만 비용을 지불합니다. 요청도 없고 요금도 없습니다.
최고의 비용 효율성
- 유휴 요금 없이 사용한 만큼만 지불합니다.
- 예: 25달러로 평균 응답 시간 60ms에서 694만 건의 요청을 지원합니다.
간소화된 개발자 경험
- 간편한 설정을 위한 직관적인 UI
- 완전 자동화된 CI/CD 파이프라인 및 GitOps 통합
- 실행 가능한 통찰력을 위한 실시간 메트릭 및 로깅
손쉬운 확장성 및 고성능
- 높은 동시성을 쉽게 처리할 수 있도록 자동 확장됩니다.
- 운영 오버헤드가 없습니다. 구축에만 집중하세요.
설명서에서 자세히 알아보세요!
X에서 팔로우하세요: @LeapcellHQ