【Redis】哨兵初始化和主观下线
在的redis启动函数main(server.c文件)中,对哨兵模式进行了检查,如果是哨兵模式,将调用initSentinelConfig和initSentinel进行初始化,initServer函数中会注册哨兵的时间事件,最后调用sentinelIsRunning运行哨兵实例,
int main(int argc, char **argv) { // 省略... // 检查哨兵模式 server.sentinel_mode = checkForSentinelMode(argc,argv); initServerConfig(); // 省略... if (server.sentinel_mode) { initSentinelConfig(); // 初始化哨兵配置 initSentinel(); // 初始化哨兵 } // 省略... // 初始化服务 initServer(); // 省略... if (!server.sentinel_mode) { // 非哨兵模式 // 省略... } else { ACLLoadUsersAtStartup(); InitServerLast(); // 运行哨兵实例 sentinelIsRunning(); if (server.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd(STATUS=Ready to accept connections\n); redisCommunicateSystemd(READY=1\n); } } // 省略... aeMain(server.el); aeDeleteEventLoop(server.el); return 0; }
哨兵初始化
哨兵模式校验
checkForSentinelMode
checkForSentinelMode函数在server.c文件中,用于校验是否是哨兵模式,可以看到有两种方式校验哨兵模式:
- 直接执行
redis-sentinel
命令 - 执行
redis-server
命令,命令参数中带有–sentinel
参数
int checkForSentinelMode(int argc, char **argv) { int j; // 直接执行redis-sentinel if (strstr(argv[0],redis-sentinel) != NULL) return 1; for (j = 1; j < argc; j++) if (!strcmp(argv[j],--sentinel)) return 1; // 执行的命令参数中,是否有–sentinel return 0; }
初始化配置项
initSentinelConfig
initSentinelConfig函数在sentinel.c文件中,用于初始化哨兵配置:
- 将哨兵实例的端口号设置为
REDIS_SENTINEL_PORT
,默认值26379 - 将
protected_mode
设置为0,表示允许外部链接哨兵实例,而不是只能通过127.0.0.1本地连接 server
#define REDIS_SENTINEL_PORT 26379 void initSentinelConfig(void) { server.port = REDIS_SENTINEL_PORT; /* 设置默认端口 */ server.protected_mode = 0; /* 允许外部链接哨兵实例 */ }
initSentinel
在看initSentinel函数之前,首先看下Redis中哨兵sentinel对象对应的结构体sentinelState:
- current_epoch:当前纪元,投票选举Leader时使用,纪元可以理解为投票的轮次
- masters:监控的master节点哈希表,Key为master节点名称, value为master节点对应sentinelRedisInstance实例的指针
struct sentinelState { char myid[CONFIG_RUN_ID_SIZE+1]; /* sentinel ID */ uint64_t current_epoch; /* 当前的纪元(投票轮次)*/ dict *masters; /* 监控的master节点哈希表,Key为master节点名称, value为master节点对应的实例对象的指针 */ int tilt; /* TILT模式 */ int running_scripts; mstime_t tilt_start_time; mstime_t previous_time; list *scripts_queue; char *announce_ip; int announce_port; unsigned long simfailure_flags; int deny_scripts_reconfig; char *sentinel_auth_pass; char *sentinel_auth_user; int resolve_hostnames; int announce_hostnames; } sentinel;
sentinelRedisInstance是一个通用的结构体,在sentinel.c文件中定义,它既可以表示主节点,也可以表示从节点或者其他哨兵实例,从中选选出了一些主要的内容:
typedef struct { int flags; /* 一些状态标识 */ char *name; /* Master name from the point of view of this sentinel. */ char *runid; /* 实例的运行ID */ uint64_t config_epoch; /* 配置的纪元. */ mstime_t s_down_since_time; /* 主观下线时长 */ mstime_t o_down_since_time; /* 客观下线时长 */ dict *sentinels; /* 监控同一主节点的其他哨兵实例 */ dict *slaves; /* slave节点(从节点) */ /* 故障切换 */ char *leader; /* 如果是master节点,保存了需要执行故障切换的哨兵leader的runid,如果是一个哨兵,保存的是这个哨兵投票选举的leader*/ uint64_t leader_epoch; /* leader纪元 */ uint64_t failover_epoch; int failover_state; /* 故障切换状态 */ // 省略... } sentinelRedisInstance;
initSentinel函数同样在sentinel.c文件中,用于初始化哨兵,由于哨兵实例与普通Redis实例不一样,所以需要替换Redis中的命令,添加哨兵实例命令,哨兵实例使用的命令在sentinelcmds中定义:
- 将server.commands和server.orig_commands保存的常规Redis命令清除
- 遍历sentinelcmds哨兵实例专用命令,将命令添加到server.commands和server.orig_commands中
- 初始化sentinel实例中的数据项
// 哨兵实例下的命令 struct redisCommand sentinelcmds[] = { {ping,pingCommand,1,fast @connection,0,NULL,0,0,0,0,0}, {sentinel,sentinelCommand,-2,admin,0,NULL,0,0,0,0,0}, {subscribe,subscribeCommand,-2,pub-sub,0,NULL,0,0,0,0,0}, {unsubscribe,unsubscribeCommand,-1,pub-sub,0,NULL,0,0,0,0,0}, {psubscribe,psubscribeCommand,-2,pub-sub,0,NULL,0,0,0,0,0}, {punsubscribe,punsubscribeCommand,-1,pub-sub,0,NULL,0,0,0,0,0}, {publish,sentinelPublishCommand,3,pub-sub fast,0,NULL,0,0,0,0,0}, {info,sentinelInfoCommand,-1,random @dangerous,0,NULL,0,0,0,0,0}, {role,sentinelRoleCommand,1,fast read-only @dangerous,0,NULL,0,0,0,0,0}, {client,clientCommand,-2,admin random @connection,0,NULL,0,0,0,0,0}, {shutdown,shutdownCommand,-1,admin,0,NULL,0,0,0,0,0}, {auth,authCommand,-2,no-auth fast @connection,0,NULL,0,0,0,0,0}, {hello,helloCommand,-1,no-auth fast @connection,0,NULL,0,0,0,0,0}, {acl,aclCommand,-2,admin,0,NULL,0,0,0,0,0,0}, {command,commandCommand,-1, random @connection, 0,NULL,0,0,0,0,0,0} }; /* 初始化哨兵 */ void initSentinel(void) { unsigned int j; /* 将常规的Redis命令移除,增加哨兵实例专用的命令 */ dictEmpty(server.commands,NULL); dictEmpty(server.orig_commands,NULL); ACLClearCommandID(); for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) { int retval; struct redisCommand *cmd = sentinelcmds+j; cmd->id = ACLGetCommandID(cmd->name); // 添加到server.commands retval = dictAdd(server.commands, sdsnew(cmd->name), cmd); serverAssert(retval == DICT_OK); // 添加到server.orig_commands retval = dictAdd(server.orig_commands, sdsnew(cmd->name), cmd); serverAssert(retval == DICT_OK); if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR) serverPanic(Unsupported command flag); } /* 初始化其他数据项 */ // current_epoch初始化为0 sentinel.current_epoch = 0; // 监控的master节点实例对象 sentinel.masters = dictCreate(&instancesDictType,NULL); sentinel.tilt = 0; sentinel.tilt_start_time = 0; sentinel.previous_time = mstime(); sentinel.running_scripts = 0; sentinel.scripts_queue = listCreate(); sentinel.announce_ip = NULL; sentinel.announce_port = 0; sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE; sentinel.deny_scripts_reconfig = SENTINEL_DEFAULT_DENY_SCRIPTS_RECONFIG; sentinel.sentinel_auth_pass = NULL; sentinel.sentinel_auth_user = NULL; sentinel.resolve_hostnames = SENTINEL_DEFAULT_RESOLVE_HOSTNAMES; sentinel.announce_hostnames = SENTINEL_DEFAULT_ANNOUNCE_HOSTNAMES; memset(sentinel.myid,0,sizeof(sentinel.myid)); server.sentinel_config = NULL; }
启动哨兵实例
sentinelIsRunning
sentinelIsRunning函数在sentinel.c文件中,用于启动哨兵实例:
- 校验是否设置了哨兵实例的ID,如果未设置,将随机生成一个ID
- 调用sentinelGenerateInitialMonitorEvents向监控的主节点发送+monitor事件
void sentinelIsRunning(void) { int j; /* 校验myid是否为0 */ for (j = 0; j < CONFIG_RUN_ID_SIZE; j++) if (sentinel.myid[j] != 0) break; if (j == CONFIG_RUN_ID_SIZE) { /* 随机生成ID */ getRandomHexChars(sentinel.myid,CONFIG_RUN_ID_SIZE); sentinelFlushConfig(); } serverLog(LL_WARNING,Sentinel ID is %s, sentinel.myid); /* 向监控的主节点发送+monitor事件 */ sentinelGenerateInitialMonitorEvents(); } /* 向监控的主节点发布事件 */ void sentinelGenerateInitialMonitorEvents(void) { dictIterator *di; dictEntry *de; // 获取监控的主节点 di = dictGetIterator(sentinel.masters); while((de = dictNext(di)) != NULL) { sentinelRedisInstance *ri = dictGetVal(de); // 向主节点发送监控事件 sentinelEvent(LL_WARNING,+monitor,ri,%@ quorum %d,ri->quorum); } dictReleaseIterator(di); }
哨兵时间事件
在initServer函数中,调用aeCreateTimeEvent注册了时间事件,周期性的执行serverCron函数,serverCron函数中通过server.sentinel_mode判断是否是哨兵模式,如果是哨兵模式,调用sentinelTimer执行哨兵事件:
void initServer(void) { // 省略... if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic(Can't create event loop timers.); exit(1); } // 省略... } int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { // 省略... // 如果是哨兵模式,调用sentinelTimer执行哨兵事件 if (server.sentinel_mode) sentinelTimer(); // 省略... }
sentinelTimer
sentinelTimer在sentinel.c文件中,sentinelTimer函数会周期性的执行:
void sentinelTimer(void) { sentinelCheckTiltCondition(); // 处理RedisInstances,传入的参数是当前哨兵实例维护的主节点的哈希表,里面记录当前节点监听的主节点 sentinelHandleDictOfRedisInstances(sentinel.masters); sentinelRunPendingScripts(); sentinelCollectTerminatedScripts(); sentinelKillTimedoutScripts(); // 调整sentinelTimer的执行频率 server.hz = CONFIG_DEFAULT_HZ + rand() % CONFIG_DEFAULT_HZ; }
sentinelHandleDictOfRedisInstances
sentinelHandleDictOfRedisInstances函数中会对传入的当前哨兵实例监听的主节点哈希表进行遍历:
- 获取哈希表中的每一个节点,节点类型是sentinelRedisInstance
- 调用sentinelHandleRedisInstance检测哨兵监听节点的状态
- 如果sentinelHandleRedisInstance是主节点,由于主节点里面保存了监听该主节点的其他哨兵实例以及从节点,所以递归调用sentinelHandleDictOfRedisInstances对其他的节点进行检测
/* sentinelHandleDictOfRedisInstances */ void sentinelHandleDictOfRedisInstances(dict *instances) { dictIterator *di; dictEntry *de; sentinelRedisInstance *switch_to_promoted = NULL; di = dictGetIterator(instances); // 遍历所有的sentinelRedisInstance实例 while((de = dictNext(di)) != NULL) { // 获取每一个sentinelRedisInstance sentinelRedisInstance *ri = dictGetVal(de); // 调用sentinelHandleRedisInstance检测哨兵监听节点的状态 sentinelHandleRedisInstance(ri); // 如果是sentinelRedisInstance是主节点,主节点里面保存了监听该主节点的其他哨兵实例以及从节点 if (ri->flags & SRI_MASTER) { // 递归调用,处理从节点 sentinelHandleDictOfRedisInstances(ri->slaves); // 递归调用,处理其他哨兵实例 sentinelHandleDictOfRedisInstances(ri->sentinels); if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) { switch_to_promoted = ri; } } } if (switch_to_promoted) sentinelFailoverSwitchToPromotedSlave(switch_to_promoted); dictReleaseIterator(di); }
检测哨兵监听的节点状态
sentinelHandleRedisInstance
sentinelHandleRedisInstance函数对传入的sentinelRedisInstance实例,进行状态检测,主要处理逻辑如下:
- 调用sentinelReconnectInstance对实例的连接状态进行判断,如果**连接中断尝试重新与实例建立连接 **
- 调用sentinelSendPeriodicCommands向实例发送PING INFO等命令
- sentinelCheckSubjectivelyDown判断实例是否主观下线
- 如果实例是master节点,调用sentinelCheckObjectivelyDown判断是否客观下线、是否需要执行故障切换
/* Perform scheduled operations for the specified Redis instance. */ void sentinelHandleRedisInstance(sentinelRedisInstance *ri) { // 如果监听的节点连接中断,尝试重新建立连接 sentinelReconnectInstance(ri); // 发送PING INFO等命令 sentinelSendPeriodicCommands(ri); // 检查是否是TILT模式 if (sentinel.tilt) { if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return; sentinel.tilt = 0; sentinelEvent(LL_WARNING,-tilt,NULL,#tilt mode exited); } // 判断主观下线 sentinelCheckSubjectivelyDown(ri); /* Masters and slaves */ if (ri->flags & (SRI_MASTER|SRI_SLAVE)) { } // 如果是master节点 if (ri->flags & SRI_MASTER) { // 判断客观下线 sentinelCheckObjectivelyDown(ri); // 是否需要启动故障切换 if (sentinelStartFailoverIfNeeded(ri)) sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED); // 执行故障切换 sentinelFailoverStateMachine(ri); // 获取其他哨兵实例对master节点状态的判断 sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS); } }
重新连接
sentinelReconnectInstance
sentinelReconnectInstance函数用于检测实例的连接状态,如果中断进行重连,主要处理逻辑如下:
-
检测连接是否中断,如果未中断直接返回
-
检查端口是否为0,0被认为是不合法的端口
-
从sentinelRedisInstance实例中获取instanceLink,instanceLink的定义在sentinel.c文件中,里面记录了哨兵和主节点的两个连接,分别为用于发送命令的连接cc和用于发送Pub/Sub消息的连接pc:
typedef struct instanceLink { int refcount; int disconnected; int pending_commands; redisAsyncContext *cc; /* 用于发送命令的连接 */ redisAsyncContext *pc; /* 用于发送Pub/Sub消息的连接 */ mstime_t cc_conn_time; /* cc 连接时间 */ mstime_t pc_conn_time; /* pc 连接时间 */ mstime_t pc_last_activity; mstime_t last_avail_time; /* 上一次收到实例回复PING命令(需要被认定为合法)的时间 */ mstime_t act_ping_time; /* 当收到PONG消息的时候会设置为0,在下次发送PING命令时设置为当前时间 */ mstime_t last_ping_time; /* 上次发送PING命令时间,在出现故障时可以通过判断发送时间避免多次发送PING命令 */ mstime_t last_pong_time; /* 上次收到PONG消息的时间 */ mstime_t last_reconn_time; /* 上次执行重连的时间 */ } instanceLink;
-
校验距离上次重连时间是否小于PING的检测周期SENTINEL_PING_PERIOD,如果小于说明距离上次重连时间过近,直接返回即可
SENTINEL_PING_PERIOD在server.c中定义,默认1000毫秒
#define SENTINEL_PING_PERIOD 1000
-
对用于发送命令的连接判断,如果连接为NULL,调用redisAsyncConnectBind函数进行重连
-
对用于处理发送 Pub/Sub 消息的连接进行判断,如果连接为NULL,调用redisAsyncConnectBind函数进行重连
void sentinelReconnectInstance(sentinelRedisInstance *ri) { // 检查连接是否中断 if (ri->link->disconnected == 0) return; if (ri->addr->port == 0) return; /* 检查端口是否为0,0被认为是不合法的端口 */ // 获取instanceLink instanceLink *link = ri->link; mstime_t now = mstime(); // 校验距离上次重连时间是否小于哨兵PING的周期设置 if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return; ri->link->last_reconn_time = now; /* 处理用于发送命令的连接 */ if (link->cc == NULL) { // 进行连接 link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); if (link->cc && !link->cc->err) anetCloexec(link->cc->c.fd); // 省略... } /* 处理用于发送 Pub/Sub 消息的连接 */ if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) { link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR); if (link->pc && !link->pc->err) anetCloexec(link->pc->c.fd); // 省略... } if (link->cc && (ri->flags & SRI_SENTINEL || link->pc)) link->disconnected = 0; }
发送命令
sentinelSendPeriodicCommands
sentinelSendPeriodicCommands用于向实例发送命令:
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) { mstime_t now = mstime(); mstime_t info_period, ping_period; int retval; // 省略... /* 向主节点和从节点发送INFO命令 */ if ((ri->flags & SRI_SENTINEL) == 0 && (ri->info_refresh == 0 || (now - ri->info_refresh) > info_period)) { // 发送INFO命令 retval = redisAsyncCommand(ri->link->cc, sentinelInfoReplyCallback, ri, %s, sentinelInstanceMapCommand(ri,INFO)); if (retval == C_OK) ri->link->pending_commands++; } if ((now - ri->link->last_pong_time) > ping_period && (now - ri->link->last_ping_time) > ping_period/2) { // 向实例发送PING命令 sentinelSendPing(ri); } /* PUBLISH hello messages to all the three kinds of instances. */ if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) { // 发送PUBLISH命令 sentinelSendHello(ri); } }
主观下线
sentinelCheckSubjectivelyDown
sentinelCheckSubjectivelyDown函数用于判断是否主观下线。
标记主观下线的两个条件
- 距离上次发送PING命令的时长超过了down_after_period的值,down_after_period的值在sentinel.conf 配置文件中配置,对应的配置项为down-after-milliseconds ,默认值30s
- 哨兵认为实例是主节点(ri->flags & SRI_MASTE),但是实例向哨兵返回的角色是从节点(ri->role_reported == SRI_SLAVE) 并且当前时间-实例报告消息的时间role_reported_time大于down_after_period加上SENTINEL_INFO_PERIOD乘以2的时间 ,SENTINEL_INFO_PERIOD 是发送INFO命令的时间间隔,也就是说实例上次成功向哨兵报告角色的时间,已经超过了限定时间(down_after_period加上SENTINEL_INFO_PERIOD*2)
满足以上两个条件之一哨兵将会把sentinelRedisInstance判断为主观下线,flag标记会添加SRI_S_DOWN状态。
void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) { mstime_t elapsed = 0; // 如果act_ping_time不为0 if (ri->link->act_ping_time) elapsed = mstime() - ri->link->act_ping_time; // 计算距离上次发送PING命令的间隔时间 else if (ri->link->disconnected) // 如果连接断开 elapsed = mstime() - ri->link->last_avail_time; // 计算距离最近一次收到PING命令回复的间隔时间 if (ri->link->cc && (mstime() - ri->link->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && ri->link->act_ping_time != 0 && (mstime() - ri->link->act_ping_time) > (ri->down_after_period/2) && (mstime() - ri->link->last_pong_time) > (ri->down_after_period/2)) { instanceLinkCloseConnection(ri->link,ri->link->cc); } if (ri->link->pc && (mstime() - ri->link->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD && (mstime() - ri->link->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3)) { instanceLinkCloseConnection(ri->link,ri->link->pc); } /* * 标记主观下线的两个条件(或的关系) * 1) 距离上次发送PING命令的时长超过了down_after_period * 2) 哨兵认为实例是主节点(ri->flags & SRI_MASTE),但是实例向哨兵返回的角色是从节点(ri->role_reported == SRI_SLAVE) 并且当前时间-实例返回消息的时间大于down_after_period加上SENTINEL_INFO_PERIOD*2的时间 */ if (elapsed > ri->down_after_period || (ri->flags & SRI_MASTER && ri->role_reported == SRI_SLAVE && mstime() - ri->role_reported_time > (ri->down_after_period+SENTINEL_INFO_PERIOD*2))) { /* 主观下线 */ if ((ri->flags & SRI_S_DOWN) == 0) { // 发送+sdown事件 sentinelEvent(LL_WARNING,+sdown,ri,%@); ri->s_down_since_time = mstime(); ri->flags |= SRI_S_DOWN; // 更改状态 } } else { /* Is subjectively up */ if (ri->flags & SRI_S_DOWN) { sentinelEvent(LL_WARNING,-sdown,ri,%@); ri->flags &= ~(SRI_S_DOWN|SRI_SCRIPT_KILL_SENT); } } }
客观下线
如果是主节点,将会调用sentinelCheckObjectivelyDown函数判断客观下线,之后调用sentinelStartFailoverIfNeeded判断是否需要执行故障切换。
总结
参考
Redis版本:redis-6.2.5