一、redis 发布订阅
1.1 redis 发布订阅
redis 发布订阅,消息发布后,如果订阅者没有启动,错过即错过。不会再数据库中持久化
在 Redis 的发布-订阅系统中,消息类型有两种主要形式:message和pmessage。这两种类型的消息分别对应不同的订阅方式:
message:这是标准的消息类型,出现这种类型的消息是因为订阅者使用了标准的subscribe方法订阅了具体的频道。
{'type':'message','pattern':None,'channel':b'111','data':b'{"content": "1"}'}pmessage:这种消息类型出现是因为订阅者使用了psubscribe方法订阅了频道模式(pattern)。模式订阅允许使用通配符订阅多个频道,比如psubscribe('news.*')会订阅所有以news.开头的频道。
如果你收到的是pmessage类型的消息,说明你是在使用模式订阅(psubscribe),而不是直接订阅具体的频道(subscribe)。
{'type':'pmessage','pattern':b'111','channel':b'111','data':b'{"content": "1"}'}importtimefrom redisimportStrictRedis,ConnectionPool from typingimportOptionalclassRedisManager:"""管理 Redis 发布/订阅操作的类"""def__init__(self,host:str='127.0.0.1',port:int=6379,db:int=0,password:str='123456789'):"""初始化 Redis 连接池和客户端"""pool=ConnectionPool(host=host,port=port,db=db,password=password,decode_responses=True # 自动将响应解码为字符串)self.redis=StrictRedis(connection_pool=pool)defpublish(self,channel:str,message:str)->bool:"""向指定频道发布消息"""try:self.redis.publish(channel,message)returnTrue except Exception as e:# 发布消息失败returnFalse defsubscribe(self,channel:str,callback:Optional[callable]=None)->None:"""标准模式-订阅频道并处理消息"""pubsub=self.redis.pubsub()pubsub.subscribe(channel)# 已订阅频道formessage in pubsub.listen():ifmessage['type']=='message':data=message['data']self.logger.info(f"在 {channel} 接收到消息: {data}")ifcallback:callback(data)else:print(f"消息: {data}")defpsubscribe(self,channel:str,callback:Optional[callable]=None)->None:"""频道模式-订阅频道并处理消息"""pubsub=self.redis.pubsub()pubsub.psubscribe(channel)# 已订阅频道formessage in pubsub.listen():print(message)defunsubscribe(self,channel:str)->None:"""取消订阅频道"""pubsub=self.redis.pubsub()pubsub.unsubscribe(channel)# 已取消订阅频道 defmain():# 初始化 Redis 管理器 redis_manager=RedisManager()channel="er_test"# 示例:发布消息fori inrange(5):redis_manager.publish(channel,str(i))time.sleep(1)# 示例:订阅消息 redis_manager.subscribe(channel)if__name__=='__main__':main()二、redis_集群链接
卸载冲突的 redis-py-cluster
pip install redis-py-cluster确保 redis 是最新版(7.1.0 已内置集群和异步功能)
pip install --upgrade redis
importtracebackfromredis.clusterimportRedisCluster,ClusterNode# 注意这里fromredis.exceptionsimportRedisErrordefadd_devices_to_online_set(device_ids,set_key="online_devices"):""" 连接到 Redis 集群,并使用 pipeline 批量将设备ID添加到 Set 中。 :param device_ids: List[str] 设备ID列表 :param set_key: str Set的Key名称 :return: dict 添加结果统计(成功新增、已存在) """result_summary={"added":0,"existed":0,"errors":0,"details":[]}try:startup_nodes2=[ClusterNode("11.22.33.44",6381),ClusterNode("11.22.33.44",6382),ClusterNode("11.22.33.44",6383),]redis_db=RedisCluster(startup_nodes=startup_nodes,decode_responses=True,skip_full_coverage_check=True,password='xxxxxxxxxx')redis_db.ping()print("成功连接到 Redis 集群!")withredis_db.pipeline(transaction=False)aspipe:fordevice_idindevice_ids:print(f"准备向 Set '{set_key}' 添加设备 '{device_id}'...")pipe.sadd(set_key,device_id)pipe_result=pipe.execute()# pipe_result是设备列表对应的结果,每个是0或1fordevice_id,sadd_resinzip(device_ids,pipe_result):ifsadd_res==1:print(f"成功添加新设备 '{device_id}' 到 '{set_key}'。")result_summary["added"]+=1result_summary["details"].append({"device_id":device_id,"status":"added"})elifsadd_res==0:print(f"设备 '{device_id}' 已存在于 '{set_key}' 中。")result_summary["existed"]+=1result_summary["details"].append({"device_id":device_id,"status":"existed"})else:print(f"设备 '{device_id}' 添加出现未知结果:{sadd_res}")fromredisclusterimportRedisClusterdefprint_hi():# 构建所有的节点startup_nodes=[{"host":"127.0.0.1","port":6370},{"host":"127.0.0.2","port":6371},{"host":"127.0.0.3","port":6373}]# 构建StrictRedisCluster对象redis_db=RedisCluster(startup_nodes=startup_nodes,decode_responses=True,password='123456789')withredis_db.pipeline(transaction=False)aspipe:withopen('./Port.csv','r')asf:pipe.sadd('DN2010161015893863')result=pipe.execute()if__name__=='__main__':print_hi()三、全新一键部署正常集群(解决所有连接拒绝问题)
0. 编写一键部署脚本(redis-set-password.sh)
vimredis-set-password.shchmod+x redis-set-password.sh ./redis-set-password.sh# 1. 清理之前的错误容器dockerrm-f$(dockerps-aq-f"name=redis-node")2>/dev/null# 2. 启动 6 个容器,使用宿主机网络(--network host 代替桥接,不再使用 -p 映射)echo"使用宿主机网络启动 6 个共享 127.0.0.1 的 Redis 节点..."forportin{7001..7006};dodockerrun-d--nameredis-node-${port}\--networkhost\redis:7.2.5\redis-server--port${port}\--cluster-enabledyes\--cluster-config-file nodes-${port}.conf\--cluster-node-timeout5000\--appendonlyyes\--requirepass"mock@mock"\--masterauth"mock@mock"\--protected-mode nodoneecho"等待容器启动完成..."1. 组建集群
在宿主机终端直接执行以下命令,将这 6 个独立节点合并为集群(3 主 3 从):
redis-cli-a"mock@mock"--clustercreate\127.0.0.1:7001127.0.0.1:7002127.0.0.1:7003\127.0.0.1:7004127.0.0.1:7005127.0.0.1:7006\--cluster-replicas1--cluster-yes执行成功后,控制台会输出[OK] All 16384 slots covered.
2. 验证集群是否创建成功
组建完成后,使用以下命令进行验证:
查看集群状态(首选验证):
redis-cli-p7001-a"mock@mock"cluster info如果输出中的第一行是cluster_state:ok,说明集群已完全正常。
查看节点主从分配信息:
redis-cli-p7001-a"mock@mock"cluster nodes此命令会列出所有 6 个节点,并明确标识哪些是master,哪些是slave。
写入/读取数据测试:
redis-cli-c-p7001-a"mock@mock"settest_key123redis-cli-c-p7001-a"mock@mock"get test_key如果集群或者容器挂掉(处于停止状态),你不需要重新组建集群。
因为节点配置文件(nodes-*.conf)和持久化数据仍然保留在对应的容器内部,只需重新启动这些容器,集群就会自动恢复
dockerstart redis-node-7001 redis-node-7002 redis-node-7003 redis-node-7004 redis-node-7005 redis-node-7006启动完成后,等待几秒钟让节点间完成握手,即可再次验证集群状态:
redis-cli-p7001-a"mock@mock"cluster info