news 2026/6/23 1:14:49

从零实现一个分布式锁:Redis与Zookeeper

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
从零实现一个分布式锁:Redis与Zookeeper

前言

你有没有想过:在分布式系统中,多个服务同时操作同一份数据时,怎么保证数据一致性?

比如秒杀系统中,1000个人同时抢1个商品,怎么保证不会超卖?

分布式锁是解决分布式环境下资源竞争的核心方案。

今天我们用C语言从零实现两种分布式锁:

1. 基于Redis的分布式锁(Redlock算法)
2. 基于Zookeeper的分布式锁(临时顺序节点)

---

一、分布式锁核心原理

1. Redis分布式锁

```
┌─────────┐ SET lock_key value NX PX 10000 ┌─────────┐
│ 客户端A │ ───────────────────────────────────────→ │ Redis │
└─────────┘ └─────────┘
│ │
│ 执行业务逻辑 │
│ │
▼ ▼
┌─────────┐ DEL lock_key ┌─────────┐
│ 客户端A │ ───────────────────────────────────────→ │ Redis │
└─────────┘ └─────────┘
```

2. Zookeeper分布式锁

```
┌─────────┐ 创建临时顺序节点 ┌─────────────┐
│ 客户端A │ ──────────────────────────────────────→ │ Zookeeper │
└─────────┘ │ /locks/ │
│ │ /lock-001│
│ 检查是否最小节点 │ /lock-002│
▼ └─────────────┘
┌─────────┐ 是 → 获得锁
│ 客户端A │ ─── 否 → 监听前一个节点
└─────────┘
```

3. 核心要求

要求 说明
互斥性 同一时刻只有一个客户端持有锁
防死锁 锁有超时机制,客户端异常时自动释放
可重入 同一客户端可重复获取同一把锁
高可用 锁服务本身不能单点故障

---

二、完整代码实现

1. 通用数据结构

```c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>

#define MAX_KEY_LEN 128
#define MAX_VALUE_LEN 256
#define MAX_HOST_LEN 64
#define MAX_RETRY 3

// 分布式锁统一接口
typedef struct distributed_lock {
char lock_key[MAX_KEY_LEN];
char lock_value[MAX_VALUE_LEN];
int acquired;
time_t expire_time;
void *backend_data;
int (*lock)(struct distributed_lock *self, int timeout_ms);
int (*unlock)(struct distributed_lock *self);
int (*renew)(struct distributed_lock *self);
void (*destroy)(struct distributed_lock *self);
} distributed_lock_t;
```

2. Redis客户端基础

```c
// Redis连接结构
typedef struct redis_connection {
char host[MAX_HOST_LEN];
int port;
int sock_fd;
} redis_conn_t;

// Redis命令执行
int redis_connect(redis_conn_t *conn, const char *host, int port) {
conn->sock_fd = socket(AF_INET, SOCK_STREAM, 0);
if (conn->sock_fd < 0) return -1;

struct hostent *server = gethostbyname(host);
if (!server) {
close(conn->sock_fd);
return -1;
}

struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
memcpy(&addr.sin_addr.s_addr, server->h_addr, server->h_length);

if (connect(conn->sock_fd, (struct sockaddr*)&addr, sizeof(addr)) < 0) {
close(conn->sock_fd);
return -1;
}

strcpy(conn->host, host);
conn->port = port;
return 0;
}

int redis_command(redis_conn_t *conn, const char *cmd, char *response, int resp_size) {
send(conn->sock_fd, cmd, strlen(cmd), 0);
send(conn->sock_fd, "\r\n", 2, 0);

int n = recv(conn->sock_fd, response, resp_size - 1, 0);
if (n > 0) {
response[n] = '\0';
return n;
}
return -1;
}

void redis_disconnect(redis_conn_t *conn) {
if (conn->sock_fd > 0) {
close(conn->sock_fd);
conn->sock_fd = -1;
}
}
```

3. Redis分布式锁实现

```c
// Redis锁结构
typedef struct redis_lock {
distributed_lock_t base;
redis_conn_t *connections; // 多Redis节点(Redlock)
int node_count;
int quorum; // 多数派
long long expire_ms;
} redis_lock_t;

// 生成唯一锁值(客户端标识)
void generate_lock_value(char *buf, int size) {
pid_t pid = getpid();
time_t now = time(NULL);
snprintf(buf, size, "%d-%ld-%d", pid, now, rand());
}

// 单个Redis节点加锁
int redis_node_lock(redis_conn_t *conn, const char *key, const char *value,
long long ttl_ms, char *error) {
char cmd[512];
snprintf(cmd, sizeof(cmd),
"SET %s %s NX PX %lld", key, value, ttl_ms);

char response[256];
if (redis_command(conn, cmd, response, sizeof(response)) < 0) {
strcpy(error, "redis command failed");
return -1;
}

// Redis返回+OK表示成功
if (strncmp(response, "+OK", 3) == 0) {
return 0;
}

strcpy(error, response);
return -1;
}

// Redis节点解锁
int redis_node_unlock(redis_conn_t *conn, const char *key, const char *value) {
// 使用Lua脚本保证原子性(检查value匹配才删除)
char cmd[512];
snprintf(cmd, sizeof(cmd),
"EVAL \"if redis.call('get', KEYS[1]) == ARGV[1] then "
"return redis.call('del', KEYS[1]) else return 0 end\" 1 %s %s",
key, value);

char response[256];
if (redis_command(conn, cmd, response, sizeof(response)) < 0) {
return -1;
}

return (strstr(response, ":1") != NULL) ? 0 : -1;
}

// Redlock加锁(多Redis节点)
int redlock_lock(distributed_lock_t *base, int timeout_ms) {
redis_lock_t *lock = (redis_lock_t*)base;
time_t start = time(NULL);
int acquired_count = 0;

// 计算每个节点的超时时间(总超时/节点数)
int per_node_timeout = timeout_ms / lock->node_count;
if (per_node_timeout < 10) per_node_timeout = 10;

// 尝试在所有节点上加锁
for (int i = 0; i < lock->node_count; i++) {
char error[256];
int ret = redis_node_lock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value,
lock->expire_ms, error);
if (ret == 0) {
acquired_count++;
}

// 检查是否超时
time_t now = time(NULL);
if ((now - start) * 1000 > timeout_ms) {
break;
}
}

// 判断是否获得多数派
if (acquired_count >= lock->quorum) {
lock->base.acquired = 1;
lock->base.expire_time = time(NULL) + lock->expire_ms / 1000;
return 0;
}

// 加锁失败,释放已获取的锁
for (int i = 0; i < lock->node_count; i++) {
redis_node_unlock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value);
}

return -1;
}

// Redlock解锁
int redlock_unlock(distributed_lock_t *base) {
redis_lock_t *lock = (redis_lock_t*)base;
if (!lock->base.acquired) return -1;

for (int i = 0; i < lock->node_count; i++) {
redis_node_unlock(&lock->connections[i],
lock->base.lock_key,
lock->base.lock_value);
}

lock->base.acquired = 0;
return 0;
}

// 锁续期
int redlock_renew(distributed_lock_t *base) {
redis_lock_t *lock = (redis_lock_t*)base;
if (!lock->base.acquired) return -1;

// 在大部分节点上续期
int renewed = 0;
for (int i = 0; i < lock->node_count; i++) {
// 使用PEXPIRE续期
char cmd[256];
snprintf(cmd, sizeof(cmd), "PEXPIRE %s %lld",
lock->base.lock_key, lock->expire_ms);
char response[64];
if (redis_command(&lock->connections[i], cmd, response, sizeof(response)) >= 0) {
if (strstr(response, ":1")) renewed++;
}
}

if (renewed >= lock->quorum) {
lock->base.expire_time = time(NULL) + lock->expire_ms / 1000;
return 0;
}
return -1;
}

// 创建Redis分布式锁
distributed_lock_t *create_redis_lock(const char *key,
const char **hosts, const int *ports,
int node_count, long long expire_ms) {
redis_lock_t *lock = malloc(sizeof(redis_lock_t));
memset(lock, 0, sizeof(redis_lock_t));

strcpy(lock->base.lock_key, key);
generate_lock_value(lock->base.lock_value, sizeof(lock->base.lock_value));
lock->base.acquired = 0;
lock->base.expire_time = 0;
lock->node_count = node_count;
lock->quorum = node_count / 2 + 1;
lock->expire_ms = expire_ms;

// 连接所有Redis节点
lock->connections = malloc(sizeof(redis_conn_t) * node_count);
for (int i = 0; i < node_count; i++) {
if (redis_connect(&lock->connections[i], hosts[i], ports[i]) < 0) {
// 连接失败处理
lock->connections[i].sock_fd = -1;
}
}

lock->base.lock = redlock_lock;
lock->base.unlock = redlock_unlock;
lock->base.renew = redlock_renew;
lock->base.destroy = NULL; // 会在外部处理

return (distributed_lock_t*)lock;
}
```

4. Zookeeper分布式锁实现

```c
// Zookeeper节点结构(模拟)
typedef struct zk_node {
char path[256];
char data[256];
int ephemeral;
int sequential;
int seq_num;
struct zk_node *children;
struct zk_node *next;
} zk_node_t;

// Zookeeper模拟服务器
typedef struct zk_server {
zk_node_t *root;
pthread_mutex_t mutex;
} zk_server_t;

zk_server_t *g_zk_server = NULL;

// 初始化模拟ZK
void zk_init() {
g_zk_server = malloc(sizeof(zk_server_t));
g_zk_server->root = malloc(sizeof(zk_node_t));
strcpy(g_zk_server->root->path, "/");
g_zk_server->root->children = NULL;
pthread_mutex_init(&g_zk_server->mutex, NULL);
}

// 创建ZNode
zk_node_t *zk_create_node(const char *path, int ephemeral, int sequential) {
zk_node_t *node = malloc(sizeof(zk_node_t));
strcpy(node->path, path);
node->ephemeral = ephemeral;
node->sequential = sequential;
node->seq_num = 0;
node->children = NULL;
node->next = NULL;

if (sequential) {
node->seq_num = rand() % 10000;
}

return node;
}

// 创建顺序节点
int zk_create_sequential(const char *base_path, const char *data, char *result_path) {
pthread_mutex_lock(&g_zk_server->mutex);

// 查找父节点
zk_node_t *parent = g_zk_server->root;
char *path_copy = strdup(base_path);
char *token = strtok(path_copy, "/");
while (token) {
zk_node_t *child = parent->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
parent = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
// 创建中间节点
char new_path[256];
snprintf(new_path, sizeof(new_path), "%s/%s", parent->path, token);
zk_node_t *new_node = zk_create_node(new_path, 0, 0);
new_node->next = parent->children;
parent->children = new_node;
parent = new_node;
}
token = strtok(NULL, "/");
}
free(path_copy);

// 创建顺序节点
int seq = parent->children ? parent->children->seq_num + 1 : 1;
char seq_path[256];
snprintf(seq_path, sizeof(seq_path), "%s/lock-%05d", base_path, seq);

zk_node_t *node = zk_create_node(seq_path, 1, 1);
node->seq_num = seq;
strcpy(node->data, data);
node->next = parent->children;
parent->children = node;

strcpy(result_path, seq_path);

pthread_mutex_unlock(&g_zk_server->mutex);
return seq;
}

// 获取子节点列表(按顺序)
void zk_get_children(const char *path, char ***children, int *count) {
pthread_mutex_lock(&g_zk_server->mutex);

// 查找节点
zk_node_t *node = g_zk_server->root;
char *path_copy = strdup(path);
char *token = strtok(path_copy, "/");
while (token) {
zk_node_t *child = node->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
node = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
free(path_copy);
*count = 0;
pthread_mutex_unlock(&g_zk_server->mutex);
return;
}
token = strtok(NULL, "/");
}
free(path_copy);

// 收集子节点
*count = 0;
zk_node_t *child = node->children;
while (child) {
(*count)++;
child = child->next;
}

*children = malloc(sizeof(char*) * (*count));
child = node->children;
int idx = 0;
while (child) {
(*children)[idx++] = strdup(child->path);
child = child->next;
}

pthread_mutex_unlock(&g_zk_server->mutex);
}

// 删除节点
void zk_delete(const char *path) {
pthread_mutex_lock(&g_zk_server->mutex);

// 查找父节点
zk_node_t *parent = g_zk_server->root;
char *path_copy = strdup(path);
char *last_token = NULL;
char *token = strtok(path_copy, "/");
while (token) {
last_token = token;
zk_node_t *child = parent->children;
int found = 0;
while (child) {
if (strcmp(child->path + 1, token) == 0) {
parent = child;
found = 1;
break;
}
child = child->next;
}
if (!found) {
free(path_copy);
pthread_mutex_unlock(&g_zk_server->mutex);
return;
}
token = strtok(NULL, "/");
}

// 删除节点
zk_node_t *prev = NULL;
zk_node_t *child = parent->children;
while (child) {
if (strcmp(child->path, path) == 0) {
if (prev) {
prev->next = child->next;
} else {
parent->children = child->next;
}
free(child);
break;
}
prev = child;
child = child->next;
}

free(path_copy);
pthread_mutex_unlock(&g_zk_server->mutex);
}

// Zookeeper锁结构
typedef struct zk_lock {
distributed_lock_t base;
char lock_path[256];
char node_path[256];
char watch_path[256];
int node_seq;
} zk_lock_t;

// Zookeeper加锁
int zk_lock(distributed_lock_t *base, int timeout_ms) {
zk_lock_t *lock = (zk_lock_t*)base;
time_t start = time(NULL);

// 创建临时顺序节点
snprintf(lock->lock_path, sizeof(lock->lock_path), "/locks/%s",
lock->base.lock_key);

char data[256];
snprintf(data, sizeof(data), "client-%d", getpid());
char result_path[256];
int seq = zk_create_sequential(lock->lock_path, data, result_path);
if (seq < 0) return -1;

strcpy(lock->node_path, result_path);
lock->node_seq = seq;

// 检查是否是最小节点
while (1) {
char **children;
int count;
zk_get_children(lock->lock_path, &children, &count);

// 找到最小序号
int min_seq = 99999;
for (int i = 0; i < count; i++) {
int s;
sscanf(children[i], "%*[^-]-%d", &s);
if (s < min_seq) min_seq = s;
free(children[i]);
}
free(children);

if (lock->node_seq == min_seq) {
// 获得锁
lock->base.acquired = 1;
return 0;
}

// 监听前一个节点(简化:轮询)
usleep(100 * 1000);

time_t now = time(NULL);
if ((now - start) * 1000 > timeout_ms) {
zk_delete(lock->node_path);
return -1;
}
}
}

// Zookeeper解锁
int zk_unlock(distributed_lock_t *base) {
zk_lock_t *lock = (zk_lock_t*)base;
if (!lock->base.acquired) return -1;

zk_delete(lock->node_path);
lock->base.acquired = 0;
return 0;
}

// 创建Zookeeper分布式锁
distributed_lock_t *create_zk_lock(const char *key) {
zk_lock_t *lock = malloc(sizeof(zk_lock_t));
memset(lock, 0, sizeof(zk_lock_t));

strcpy(lock->base.lock_key, key);
lock->base.acquired = 0;
lock->base.lock = zk_lock;
lock->base.unlock = zk_unlock;
lock->base.renew = NULL; // ZK锁不需要续期(会话保持)

// 确保锁路径存在
char path[256];
snprintf(path, sizeof(path), "/locks/%s", key);
strcpy(lock->lock_path, path);

return (distributed_lock_t*)lock;
}
```

5. 使用示例

```c
// 模拟共享资源
int shared_counter = 0;
pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;

void *worker_thread(void *arg) {
distributed_lock_t *lock = (distributed_lock_t*)arg;

for (int i = 0; i < 10; i++) {
// 加锁
if (lock->lock(lock, 5000) == 0) {
// 临界区
pthread_mutex_lock(&counter_mutex);
shared_counter++;
int current = shared_counter;
pthread_mutex_unlock(&counter_mutex);

printf("Thread %lu: counter = %d\n", pthread_self(), current);
usleep(10000);

// 解锁
lock->unlock(lock);
} else {
printf("Thread %lu: 获取锁失败\n", pthread_self());
}
usleep(5000);
}
return NULL;
}

int main() {
printf("=== 分布式锁测试 ===\n\n");
srand(time(NULL));

// 初始化Zookeeper模拟服务器
zk_init();

// 测试Redis锁
printf("--- Redis分布式锁 ---\n");
const char *hosts[] = {"127.0.0.1", "127.0.0.1", "127.0.0.1"};
int ports[] = {6379, 6380, 6381};

distributed_lock_t *redis_lock = create_redis_lock("my_resource",
hosts, ports, 3, 10000);

// 创建多个线程竞争锁
pthread_t threads[5];
for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, worker_thread, redis_lock);
}

for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}

// 测试Zookeeper锁
printf("\n--- Zookeeper分布式锁 ---\n");
shared_counter = 0;

distributed_lock_t *zk_lock = create_zk_lock("my_resource");

for (int i = 0; i < 5; i++) {
pthread_create(&threads[i], NULL, worker_thread, zk_lock);
}

for (int i = 0; i < 5; i++) {
pthread_join(threads[i], NULL);
}

return 0;
}
```

---

三、编译和运行

```bash
gcc -o distributed_lock distributed_lock.c -lpthread
./distributed_lock
```

---

四、Redis vs Zookeeper分布式锁

特性 Redis Zookeeper
一致性 最终一致 强一致(ZAB协议)
性能 高(内存操作) 低(需要选举)
可用性 高(主从切换) 高(集群)
死锁预防 超时自动释放 临时节点自动删除
实现复杂度 简单 复杂

---

五、常见问题

问题 解决方案
锁过期业务未完成 锁续期(看门狗机制)
Redis主从切换锁丢失 Redlock算法
死锁 设置合理的超时时间
可重入 在锁值中记录持有者信息

---

六、总结

通过这篇文章,你学会了:

· 分布式锁的核心要求(互斥、防死锁、高可用)
· Redis分布式锁(SET NX PX + Redlock)
· Zookeeper分布式锁(临时顺序节点)
· 锁续期、超时处理
· 完整的测试示例

分布式锁是分布式系统的必备工具。掌握它,你就理解了秒杀系统、分布式任务调度的核心设计。

下一篇预告:《从零实现一个分布式事务:TCC与Saga模式》

---

评论区分享一下你用分布式锁解决过什么场景~

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/23 0:59:53

基于ATtiny28的RC5红外遥控发射器设计与实现

1. 项目缘起&#xff1a;为什么是ATtiny28和RC5&#xff1f;最近在整理工作室的旧零件箱&#xff0c;翻出来几片ATtiny28&#xff0c;这玩意儿现在可不多见了。它属于AVR家族里比较“古典”的一位&#xff0c;引脚少&#xff0c;资源也有限&#xff0c;但胜在结构简单&#xff…

作者头像 李华
网站建设 2026/6/23 0:59:12

微信聊天记录备份指南:使用WeChatExporter轻松保存您的珍贵回忆

微信聊天记录备份指南&#xff1a;使用WeChatExporter轻松保存您的珍贵回忆 【免费下载链接】WeChatExporter 一个可以快速导出、查看你的微信聊天记录的工具 项目地址: https://gitcode.com/gh_mirrors/wec/WeChatExporter 在数字时代&#xff0c;微信聊天记录承载着我…

作者头像 李华
网站建设 2026/6/23 0:58:57

Ubuntu 20.04 正确安装 Docker Compose v2 的完整指南

1. 项目概述&#xff1a;为什么 Ubuntu 20.04 用户必须亲手安装 Docker Compose&#xff08;而不是靠 apt&#xff09;“Comment installer Docker Compose sur Ubuntu 20.04 [Dmarrage rapide]”——这个法语标题直译是“如何在 Ubuntu 20.04 上快速安装 Docker Compose”&…

作者头像 李华
网站建设 2026/6/23 0:55:40

OpenVAS漏洞扫描结果精准评估:从海量告警到可行动风险矩阵

1. 项目概述&#xff1a;从“扫描完成”到“风险落地”的鸿沟“扫描完成&#xff0c;报告生成&#xff0c;然后呢&#xff1f;” 这大概是很多安全工程师和运维同学在收到一份动辄几百上千条告警的OpenVAS扫描报告后&#xff0c;内心最真实的独白。OpenVAS&#xff08;Open Vul…

作者头像 李华
网站建设 2026/6/23 0:54:34

MPC5121e嵌入式处理器:异构多核架构与图形显示系统开发实战

1. 项目概述与核心价值 在嵌入式开发领域&#xff0c;选对一颗“心脏”——也就是主控处理器——往往决定了整个项目的成败。尤其是在那些既要“看得见”又要“连得上”的应用里&#xff0c;比如工业HMI触摸屏、医疗监护仪、或者车载中控娱乐系统&#xff0c;开发者面临的挑战是…

作者头像 李华
网站建设 2026/6/23 0:51:44

微信小程序渗透测试实战指南:从环境搭建到漏洞挖掘

1. 项目概述&#xff1a;为什么微信小程序也需要渗透测试&#xff1f; 你可能觉得&#xff0c;微信小程序运行在微信这个“超级App”的沙箱里&#xff0c;天然就比独立的App或网站更安全。这种想法在几年前或许还说得通&#xff0c;但随着小程序生态的爆炸式增长&#xff0c;它…

作者头像 李华