레디스의 pub/sub은 순서가 보장되는 at-most-once(최대 한 번) 성격을 가진 pub/sub 메시징 구현으로, 구독자(subscriber)는 하나 이상의 채널을 구독하고 메시지를 전달받을 수 있다. 이때 채널을 구독한 RESP2 클라이언트는 SUBSCRIBE, UNSUBSCRIBE를 제외한 명령어를 수행하면 안 된다. (RESP3은 구독 중이라도 명령어를 수행할 수 있다.)
앞서 언급했듯 레디스의 pub/sub은 메시지 전달 시 누락될 가능성이 있다(at-most-once) 따라서 메시지 전송에 있어 조건이 있다면 레디스의 stream을 활용하는 것이 좋다.
레디스의 pub/sub은 데이터베이스와 상관없이 채널명으로 분류된다. 즉 DB 0에 게시해도 DB 10의 구독자는 메시지를 수신받을 수 있다. 따라서 특정 범위 대상에게만 메시지를 전달하기 위해선 채널명 앞에 그룹을 지정하기 위한 이름을 붙여야 한다. 여기서 채널을 구독할 경우 구독할 채널의 이름과 완벽히 일치한 경우와 패턴 매칭을 통해 구독할 수 있는데, 만약 두 경우 모두 채널을 구독한 경우 메시지를 중복해서 받을 수 있다.
코드 레벨 분석하기
pub/sub과 관련된 코드는 src/pubsub.c를 참고하면 된다.
Subscribe
/* Add the channel to the client -> channels hash table */
if (dictAdd(type.clientPubSubChannels(c),channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(*type.serverPubSubChannels, channel);
if (de == NULL) {
clients = listCreate();
dictAdd(*type.serverPubSubChannels, channel, clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
가장 먼저 구독 명령어를 분석하면 다음과 같다. 구독을 하게 되면 클라이언트는 자신의 구조체 내 pubsub_channels에 구독한 채널 정보를 저장하고, 서버에 해당 채널을 생성 후 구독 요청한 클라이언트를 해당 채널 목록에 추가한다.
Publish
/* Send to clients listening for that channel */
de = dictFind(*type.serverPubSubChannels, channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
if (type.shard) {
/* Shard pubsub ignores patterns. */
return receivers;
}
/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
list *clients = dictGetVal(de);
if (!stringmatchlen((char*)pattern->ptr,
sdslen(pattern->ptr),
(char*)channel->ptr,
sdslen(channel->ptr),0)) continue;
listRewind(clients,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = listNodeValue(ln);
addReplyPubsubPatMessage(c,pattern,channel,message);
updateClientMemUsageAndBucket(c);
receivers++;
}
}
decrRefCount(channel);
dictReleaseIterator(di);
}
발행을 하게 되면, 서버 채널 목록에서 클라이언트 목록을 조회한 후 순회하며 전송하게 된다. 이후 shard flag 유무에 따라서 추가적으로 패턴 매칭된 채널에 추가 메시지를 발행한다.
10년 간 redis를 혼자 관리하며 단순한 코드 구조를 유지할 수 있었던 만큼, 작은 코드지만 분석하기 쉬웠다. 향후 c언어를 공부하게 된다면 좋은 참고 자료가 될 것 같다. 고마워요 Salvatore!
'공부' 카테고리의 다른 글
교육 AI (0) | 2024.06.09 |
---|---|
스프링에서 빈 등록 과정 분석해보기 (0) | 2024.02.18 |
🍎 레디스 톺아보기 - 데이터 타입 이해하기 (0) | 2023.12.17 |
JWT 인가 처리, 이거 모르면 제발 Spring security 쓰지 마세요. (0) | 2023.11.19 |
제네릭 (0) | 2023.11.10 |