news 2026/4/18 10:00:30

一个只能通过压测发现Bug

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
一个只能通过压测发现Bug

🍅点击文末小卡片,免费获取软件测试全套资料,资料在手,涨薪更快

最近在做大项目性能测试的工作,又发现了几个比较有意思的Bug,本期分享其中的一个,涉及Redis在并发场景下的应用。

有意思的是,这个Bug因为没有代码语法错误,并发量少的情况下,下游的监控也不会出现报警,所以光靠功能测试是没有办法发现,只能通过压测(性能测试)或者下游的监控报警才能发现

我们先来看一段Go语言实现的代码,没有代码基础也没关系,先解释一下这段代码的意思,就是先获取(Get) Redis Key 的值,这个值只有true 或者 false 两种情况 ,如果是true 则直接返回,不执行后续代码逻辑,如果是 false 则 先设置(Set) Redis Key 的值为true,再执行后续代码逻辑

var flag = false // 获取锁 redisCache := redis.NewCache() if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil { ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err) } // flag = true说明已经有实例在请求了, 直接返回 // 否则 设置redis锁 if flag { ctx.Notice("request_user_size_flag is true, return") return } else { ctx.Notice("request_user_size_flag is false, request im to flush room user size") if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil { ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err) } } // ... 后续具体的业务代码逻辑,可忽略

然而单纯利用从Redis 当中获取一个Bool值,以此来充当互斥锁,这种实现方案,在同一时刻只有一个用户请求能满足需求,但是在并发场景会出现无法锁住的情况,如下图,在初始条件下,即Redis Key 还从来没有被Set时(Key不存在时),当3个用户同时从Redis 读取到的值均为False ,就有3个用户同时去Set Redis Key,并且走到后续的代码逻辑

所以并发场景下,“锁”失效了

"锁"失效了有什么影响,继续给出完整代码逻辑,这段代码其实是定时任务的一部分,在执行期间,会请求下游服务获得相关数据

在并发场景下,“锁”失效了会导致下游的服务压力上涨,假设下游只能抗50QPS,现在QPS 已经到5000了,严重情况下还会出现IO打满,CPU和内存打满,服务宕机等风险

package main import"time" var ( CrontabTime = 20// 每20s执行一次脚本 ScriptMaxRunTime = 150// 脚本最长运行时间150s ) func SetUserSize(ctx *gin.Context) { var flag = false // 获取锁 redisCache := redis.NewCache() if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil { ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err) } // flag = true说明已经有实例在请求了, 直接返回 // 否则 设置redis锁 if flag { ctx.Notice("request_user_size_flag is true, return") return } else { ctx.Notice("request_user_size_flag is false, request im to flush room user size") if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil { ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err) } } // 以下是定时任务的具体逻辑 ctx.NoticeF("run time start:%d", util.GetMilliSecond()) // 执行定时任务前,前置获取相关必要信息 info, err := GetInfo() if err != nil { ctx.WarningF("request info fail, error: %s", err) // 获取信息失败,提前释放锁 _ = redisCache.Del(SetUserSizeRedisKey) return } ctx.NoticeF("run time start req im:%d", util.GetMilliSecond()) // 定时任务具体逻辑 for _, people := range info { //请求下游 // ... } ctx.NoticeF("run time end:%d", util.GetMilliSecond()) }

如何解决请求下游数量超限这个问题呢,有两种解法:第一种是在请求下游前,增加判断当前QPS。第二种是使用Redis 分布式锁setnx

限定QPS

先看第一种方案是在请求下游前,判断是否超过最大的QPS,如何获取QPS呢,QPS是在做性能测试时,我们常用的性能指标,指每秒的查询数量,用来衡量系统每秒处理的请求数量

那么要获取QPS,自然要获得当前的秒数,如果是在同一秒请求,我们用当前秒数作为Redis Key ,值初始为0,同一秒内每有一次请求,就把Redis 的值加1,这样就拿到了QPS(见代码当中的GetLimitRequest方法)

now := util.GetSecond() nowMilli := util.GetMilliSecond() res := GetLimitRequest(ctx, now) // GetLimitRequest 获取当前qps func GetLimitRequest(now int64) int { key := fmt.Sprintf("limit_key_request_service_%v", now) redisCache := redis.NewCache(ctx) res, _ := redisCache.Incr(key) if res > 0 { go redisCache.Expire(key, 60) } return res }

那QPS超出限额了怎么办,得计算到下1秒还有多少时间,要精确计算的话,我们只能可以获取比秒更小的单位-毫秒进行计算,分别获取下1s的时间(毫秒为单位),以及当前这1s的时间(同样毫秒为单位),两者相减,这样就知道到下1秒还差多少毫秒(对应下面代码的变量gap),让系统sleep gap对应毫秒数,这样就可以使得请求的维持在最大的QPS范围内

完整的代码片段如下

package main import"time" var ( CrontabTime = 20// 每20s执行一次脚本 ScriptMaxRunTime = 150// 脚本最长运行时间150s ) func SetUserSize() { var flag = false // 获取锁 redisCache := redis.NewCache() if err := redisCache.Get(SetUserSizeRedisKey, &flag); err != nil { ctx.WarningF("request redis to get _user_size_flag fail, err: %s", err) } // flag = true说明已经有实例在请求了, 直接返回 // 否则 设置redis锁 if flag { ctx.Notice("request_user_size_flag is true, return") return } else { ctx.Notice("request_user_size_flag is false, request im to flush room user size") if err := redisCache.Set(SetUserSizeRedisKey, true, ScriptMaxRunTime); err != nil { ctx.WarningF("set request_user_size_flag to redis fail, err: %s", err) } } ctx.NoticeF("run time start:%d", util.GetMilliSecond()) // 执行定时任务前,前置获取相关必要信息 info, err := GetInfo() if err != nil { ctx.WarningF("request info fail, error: %s", err) // 获取信息失败,提前释放锁 _ = redisCache.Del(SetUserSizeRedisKey) return } ctx.NoticeF("run time start req im:%d", util.GetMilliSecond()) // 定时任务具体逻辑,可忽略 for _, people := range info { now := util.GetSecond() nowMilli := util.GetMilliSecond() res := GetLimitRequest(ctx, now) if res > MaxQps { gap := (now+1)*1000 - nowMilli if gap > 0 { time.Sleep(time.Duration(gap) * time.Millisecond) // 在sleep 期间 不再请求下游 } } elseif res == 0 { //异常 time.Sleep(time.Duration(40) * time.Millisecond) } } // 请求下游具体代码逻辑,可忽略 // ... // ... // 定时任务执行完毕,主动释放锁 _ = redisCache.Del(SetUserSizeRedisKey) ctx.NoticeF("run time end:%d", util.GetMilliSecond()) } / GetLimitRequest 获取当前qps func GetLimitRequest(now int64) int { key := fmt.Sprintf("limit_key_request_service_%v", now) redisCache := redis.NewCache(ctx) res, _ := redisCache.Incr(key) if res > 0 { go redisCache.Expire(key, 60) } return res }
使用Redis分布式锁

setnx是Redis的一个命令,它代表"Set if Not eXists"。这个命令尝试在Redis中设置一个键值对,但仅当指定的键不存在时才会成功。如果键已经存在,setnx操作将失败

我们可以使用setnx命令来创建Redis分布式锁,分布式锁是一种机制,用于确保在分布式系统中的多个节点或线程不会同时访问或修改共享资源,以避免竞态条件(race conditions)

分布式锁的主要目的是确保在分布式系统中,只有一个客户端(或线程)能够成功获得锁,以执行关键任务,而其他客户端必须等待

setnx的使用方式是,客户端通常会使用setnx命令尝试创建一个带有唯一标识的锁,然后在锁上设置一个过期时间,以防止锁被永久占用。当客户端不再需要锁时,可以使用del命令来释放锁

对于上面的并发问题,我们还可以使用SetNX来解决

func SetUserSize(ctx *gin.Context) { ExpireTime:= int64(3) client := redis.NewCache(ctx) key := fmt.Sprintf("set_locker_%s", "param_ex") res, err := client.SetNX(ctx, key, time.Now().Unix(), ExpireTime) //创建redis 分布式锁,ExpireTime过期时间为3秒 if err != nil { ctx.WarningF("SetQuestionStatus get lock fail, err: %v", err) errno.ErrRet(ctx, errno.ErrCallCacheFail) return } if res != true { errno.ErrRet(ctx, errno.ErrSetInfo) return } defer client.Del(ctx, key) //执行完删除Redis分布式锁,让其他线程能正常获取锁,避免永久等待 //... 执行后续逻辑 }

用一张图片再来对比一下两种实现方案的区别,使用Redis分布式锁能帮助解决高并发下互斥任务的问题,但需要注意设置过期时间,避免永久锁住资源


后续我会继续分享压测中发现的性能问题以及排查、调优实战解决方案

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于做【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴我走过了最艰难的路程,希望也能帮助到你!凡事要趁早,特别是技术行业,一定要提升技术功底。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/18 3:25:29

20万以内新能源SUV主动安全系统排行榜:实测满载跑高速,纯电动车型刹车与车道保持表现

随着新能源技术不断进化,20 万元以内的纯电 SUV 市场已经从单纯追求续航和性价比,转向对主动安全系统表现的深度考量。对于日常高速满载出行来说,刹车响应、车道保持稳定性、自适应巡航系统表现等主动安全功能,直接关系到行驶安全…

作者头像 李华
网站建设 2026/4/18 3:24:25

21、Python开发DB2应用程序全攻略

Python开发DB2应用程序全攻略 在Python开发中,与DB2数据库进行交互是一项常见的任务。为了实现高效、便捷的交互,我们可以使用 ibm_db 驱动,它能提供出色的性能和丰富的功能支持。下面将详细介绍如何使用 ibm_db 驱动进行DB2数据库的连接、数据操作等。 1. 环境准备 在…

作者头像 李华
网站建设 2026/4/18 3:25:52

我发现流式处理日志内存涨 后来才知道用流式分片并行解析

💓 博客主页:瑕疵的CSDN主页 📝 Gitee主页:瑕疵的gitee主页 ⏩ 文章专栏:《热点资讯》 目录被Node.js支配的痛,谁懂啊? 一、为什么我要和Node.js杠上 二、安装Node.js的血泪史 1. 官网下载的坑…

作者头像 李华
网站建设 2026/4/18 3:27:29

Qwen3-14B支持Function Calling,打通业务系统壁垒

Qwen3-14B支持Function Calling,打通业务系统壁垒 在企业智能化转型的浪潮中,一个现实问题始终困扰着技术团队:大语言模型看似“无所不能”,却常常止步于“说”,而无法真正“做”。用户问“我的订单到哪了?…

作者头像 李华
网站建设 2026/4/17 21:56:00

【收藏必备】智能体式RAG指南:从传统RAG到AI智能体驱动的检索增强生成

文章全面介绍了智能体式RAG(Agent-enhanced RAG)的概念、分类和实现方法。智能体式RAG通过引入AI智能体的自主决策和编排能力,解决了传统RAG在上下文整合、多步推理和可扩展性方面的局限。文章详细分析了单智能体、多智能体、层次化等多种智能…

作者头像 李华