news 2026/4/24 21:18:54

python之redis 发布订阅、集群连接、Redis 集群Docker版常用命令、集群的创建、删除、重启、常用运维命令等等

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
python之redis 发布订阅、集群连接、Redis 集群Docker版常用命令、集群的创建、删除、重启、常用运维命令等等

一、redis 发布订阅

1.1 redis 发布订阅

redis 发布订阅,消息发布后,如果订阅者没有启动,错过即错过。不会再数据库中持久化
在 Redis 的发布-订阅系统中,消息类型有两种主要形式:messagepmessage。这两种类型的消息分别对应不同的订阅方式:

message:这是标准的消息类型,出现这种类型的消息是因为订阅者使用了标准的subscribe方法订阅了具体的频道。

{'type':'message','pattern':None,'channel':b'111','data':b'{"content": "1"}'}

pmessage:这种消息类型出现是因为订阅者使用了psubscribe方法订阅了频道模式(pattern)。模式订阅允许使用通配符订阅多个频道,比如psubscribe('news.*')会订阅所有以news.开头的频道。

如果你收到的是pmessage类型的消息,说明你是在使用模式订阅(psubscribe),而不是直接订阅具体的频道(subscribe)。

{'type':'pmessage','pattern':b'111','channel':b'111','data':b'{"content": "1"}'}
importtimefrom redisimportStrictRedis,ConnectionPool from typingimportOptionalclassRedisManager:"""管理 Redis 发布/订阅操作的类"""def__init__(self,host:str='127.0.0.1',port:int=6379,db:int=0,password:str='123456789'):"""初始化 Redis 连接池和客户端"""pool=ConnectionPool(host=host,port=port,db=db,password=password,decode_responses=True # 自动将响应解码为字符串)self.redis=StrictRedis(connection_pool=pool)defpublish(self,channel:str,message:str)->bool:"""向指定频道发布消息"""try:self.redis.publish(channel,message)returnTrue except Exception as e:# 发布消息失败returnFalse defsubscribe(self,channel:str,callback:Optional[callable]=None)->None:"""标准模式-订阅频道并处理消息"""pubsub=self.redis.pubsub()pubsub.subscribe(channel)# 已订阅频道formessage in pubsub.listen():ifmessage['type']=='message':data=message['data']self.logger.info(f"在 {channel} 接收到消息: {data}")ifcallback:callback(data)else:print(f"消息: {data}")defpsubscribe(self,channel:str,callback:Optional[callable]=None)->None:"""频道模式-订阅频道并处理消息"""pubsub=self.redis.pubsub()pubsub.psubscribe(channel)# 已订阅频道formessage in pubsub.listen():print(message)defunsubscribe(self,channel:str)->None:"""取消订阅频道"""pubsub=self.redis.pubsub()pubsub.unsubscribe(channel)# 已取消订阅频道 defmain():# 初始化 Redis 管理器 redis_manager=RedisManager()channel="er_test"# 示例:发布消息fori inrange(5):redis_manager.publish(channel,str(i))time.sleep(1)# 示例:订阅消息 redis_manager.subscribe(channel)if__name__=='__main__':main()

二、redis_集群链接

卸载冲突的 redis-py-cluster
pip install redis-py-cluster

确保 redis 是最新版(7.1.0 已内置集群和异步功能)
pip install --upgrade redis

importtracebackfromredis.clusterimportRedisCluster,ClusterNode# 注意这里fromredis.exceptionsimportRedisErrordefadd_devices_to_online_set(device_ids,set_key="online_devices"):""" 连接到 Redis 集群,并使用 pipeline 批量将设备ID添加到 Set 中。 :param device_ids: List[str] 设备ID列表 :param set_key: str Set的Key名称 :return: dict 添加结果统计(成功新增、已存在) """result_summary={"added":0,"existed":0,"errors":0,"details":[]}try:startup_nodes2=[ClusterNode("11.22.33.44",6381),ClusterNode("11.22.33.44",6382),ClusterNode("11.22.33.44",6383),]redis_db=RedisCluster(startup_nodes=startup_nodes,decode_responses=True,skip_full_coverage_check=True,password='xxxxxxxxxx')redis_db.ping()print("成功连接到 Redis 集群!")withredis_db.pipeline(transaction=False)aspipe:fordevice_idindevice_ids:print(f"准备向 Set '{set_key}' 添加设备 '{device_id}'...")pipe.sadd(set_key,device_id)pipe_result=pipe.execute()# pipe_result是设备列表对应的结果,每个是0或1fordevice_id,sadd_resinzip(device_ids,pipe_result):ifsadd_res==1:print(f"成功添加新设备 '{device_id}' 到 '{set_key}'。")result_summary["added"]+=1result_summary["details"].append({"device_id":device_id,"status":"added"})elifsadd_res==0:print(f"设备 '{device_id}' 已存在于 '{set_key}' 中。")result_summary["existed"]+=1result_summary["details"].append({"device_id":device_id,"status":"existed"})else:print(f"设备 '{device_id}' 添加出现未知结果:{sadd_res}")
fromredisclusterimportRedisClusterdefprint_hi():# 构建所有的节点startup_nodes=[{"host":"127.0.0.1","port":6370},{"host":"127.0.0.2","port":6371},{"host":"127.0.0.3","port":6373}]# 构建StrictRedisCluster对象redis_db=RedisCluster(startup_nodes=startup_nodes,decode_responses=True,password='123456789')withredis_db.pipeline(transaction=False)aspipe:withopen('./Port.csv','r')asf:pipe.sadd('DN2010161015893863')result=pipe.execute()if__name__=='__main__':print_hi()

三、全新一键部署正常集群(解决所有连接拒绝问题)

0. 编写一键部署脚本(redis-set-password.sh)

vimredis-set-password.shchmod+x redis-set-password.sh ./redis-set-password.sh# 1. 清理之前的错误容器dockerrm-f$(dockerps-aq-f"name=redis-node")2>/dev/null# 2. 启动 6 个容器,使用宿主机网络(--network host 代替桥接,不再使用 -p 映射)echo"使用宿主机网络启动 6 个共享 127.0.0.1 的 Redis 节点..."forportin{7001..7006};dodockerrun-d--nameredis-node-${port}\--networkhost\redis:7.2.5\redis-server--port${port}\--cluster-enabledyes\--cluster-config-file nodes-${port}.conf\--cluster-node-timeout5000\--appendonlyyes\--requirepass"mock@mock"\--masterauth"mock@mock"\--protected-mode nodoneecho"等待容器启动完成..."

1. 组建集群

在宿主机终端直接执行以下命令,将这 6 个独立节点合并为集群(3 主 3 从):

redis-cli-a"mock@mock"--clustercreate\127.0.0.1:7001127.0.0.1:7002127.0.0.1:7003\127.0.0.1:7004127.0.0.1:7005127.0.0.1:7006\--cluster-replicas1--cluster-yes

执行成功后,控制台会输出[OK] All 16384 slots covered.

2. 验证集群是否创建成功

组建完成后,使用以下命令进行验证:

查看集群状态(首选验证):

redis-cli-p7001-a"mock@mock"cluster info

如果输出中的第一行是cluster_state:ok,说明集群已完全正常。

查看节点主从分配信息:

redis-cli-p7001-a"mock@mock"cluster nodes

此命令会列出所有 6 个节点,并明确标识哪些是master,哪些是slave

写入/读取数据测试:

redis-cli-c-p7001-a"mock@mock"settest_key123redis-cli-c-p7001-a"mock@mock"get test_key

如果集群或者容器挂掉(处于停止状态),你不需要重新组建集群。
因为节点配置文件(nodes-*.conf)和持久化数据仍然保留在对应的容器内部,只需重新启动这些容器,集群就会自动恢复

dockerstart redis-node-7001 redis-node-7002 redis-node-7003 redis-node-7004 redis-node-7005 redis-node-7006

启动完成后,等待几秒钟让节点间完成握手,即可再次验证集群状态:

redis-cli-p7001-a"mock@mock"cluster info
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/24 21:06:57

c++模板初识

目录 一、 泛型编程 二、 函数模板 1.函数模板概念 2.函数模板格式 3.函数模板的原理 4.函数模板的实例化 1. 隐式实例化&#xff1a;让编译器根据实参推演模板参数的实际类型 2.显式实例化&#xff1a;在函数名后的<>中指定模板参数的实际类型 5.模板参数的…

作者头像 李华
网站建设 2026/4/24 21:04:20

手机信令数据

01、数据简介数据整理2023年全国手机信令OD数据&#xff0c;精度到区县&#xff0c;时间段为春节&#xff08;1月21日-1月27日&#xff09;、工作日时段4月10日-4月16日&#xff09;、五一&#xff08;4月29日-5月3日&#xff09;、十一&#xff08;9月29日-10月6日&#xff09…

作者头像 李华
网站建设 2026/4/24 21:02:55

告别轮询卡顿!在iMX6ULL上实战Linux串口中断接收(附完整C代码)

嵌入式Linux串口性能优化实战&#xff1a;iMX6ULL中断接收全解析 在资源受限的嵌入式系统中&#xff0c;串口通信的实时性和效率往往成为系统性能的关键瓶颈。当开发者使用传统轮询方式处理串口数据时&#xff0c;经常会遇到CPU占用率飙升、响应延迟等问题。本文将深入探讨如何…

作者头像 李华