news 2026/4/18 3:51:48

【RabbitMQ】RPC模式(请求/回复)

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
【RabbitMQ】RPC模式(请求/回复)

本章目标

理解RabbitMQ RPC模式的工作原理和适用场景。

掌握回调队列(Callback Queue)和关联ID(Correlation Id)的使用。

实现基于RabbitMQ的异步RPC调用。

学习RPC模式下的错误处理和超时机制。

构建完整的微服务间同步通信解决方案。

一、理论部分

1. RPC模式简介

RPC(Remote Procedure Call)模式允许客户端应用程序调用远程服务器上的方法,就像调用本地方法一样。在RabbitMQ中,RPC是通过消息队列实现的异步RPC。

与传统HTTP RPC的区别:

HTTP RPC:同步,直接连接,需要服务端在线

消息队列RPC:异步,通过消息代理,支持解耦和负载均衡

2. RabbitMQ RPC核心组件

请求队列(Request Queue):客户端发送请求的队列

回复队列(Reply Queue):服务器返回响应的队列

关联ID(Correlation Id):匹配请求和响应的唯一标识

消息属性:使用IBasicProperties.ReplyTo和IBasicProperties.CorrelationId

3. RPC工作流程

复制代码

Client端:

1. 生成唯一CorrelationId

2. 创建临时回复队列

3. 发送请求到请求队列,设置ReplyTo和CorrelationId

4. 监听回复队列,等待匹配的CorrelationId

Server端:

1. 监听请求队列

2. 处理请求

3. 将响应发送到请求中的ReplyTo队列

4. 设置相同的CorrelationId

Client端:

5. 收到响应,根据CorrelationId匹配请求

6. 处理响应

复制代码

4. 适用场景

需要同步响应的异步操作

微服务间的同步通信

计算密集型任务的分布式处理

需要负载均衡的同步调用

二、实操部分:构建分布式计算服务

我们将创建一个分布式斐波那契数列计算服务,演示完整的RPC模式实现。

第1步:创建项目结构

复制代码

# 创建解决方案

dotnet new sln -n RpcSystem

# 创建项目

dotnet new webapi -n RpcClient.API

dotnet new classlib -n RpcClient.Core

dotnet new classlib -n RpcServer.Service

dotnet new classlib -n RpcShared

# 添加到解决方案

dotnet sln add RpcClient.API/RpcClient.API.csproj

dotnet sln add RpcClient.Core/RpcClient.Core.csproj

dotnet sln add RpcServer.Service/RpcServer.Service.csproj

dotnet sln add RpcShared/RpcShared.csproj

# 添加项目引用

dotnet add RpcClient.API reference RpcClient.Core

dotnet add RpcClient.API reference RpcShared

dotnet add RpcClient.Core reference RpcShared

dotnet add RpcServer.Service reference RpcShared

# 添加NuGet包

cd RpcClient.API

dotnet add package RabbitMQ.Client

cd ../RpcClient.Core

dotnet add package RabbitMQ.Client

cd ../RpcServer.Service

dotnet add package RabbitMQ.Client

复制代码

第2步:定义共享模型(RpcShared)

Models/RpcRequest.cs

View Code

Models/RpcResponse.cs

View Code

Messages/FibonacciRequest.cs

View Code

第3步:RPC客户端核心库(RpcClient.Core)

Services/IRpcClient.cs

View Code

Services/RpcClient.cs

复制代码

using System.Collections.Concurrent;

using System.Text;

using System.Text.Json;

using Microsoft.Extensions.Logging;

using RabbitMQ.Client;

using RabbitMQ.Client.Events;

using RpcShared.Models;

namespace RpcClient.Core.Services

{

public class RpcClient : IRpcClient

{

private readonly IConnection _connection;

private readonly IModel _channel;

private readonly ILogger<RpcClient> _logger;

private readonly string _replyQueueName;

private readonly ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>> _pendingRequests;

private readonly AsyncEventingBasicConsumer _consumer;

private bool _disposed = false;

public RpcClient(

IConnectionFactory connectionFactory,

ILogger<RpcClient> logger)

{

_logger = logger;

_pendingRequests = new ConcurrentDictionary<string, TaskCompletionSource<RpcResponse>>();

// 建立连接和通道

_connection = connectionFactory.CreateConnection();

_channel = _connection.CreateModel();

// 声明临时回复队列(排他性,连接关闭时自动删除)

_replyQueueName = _channel.QueueDeclare(

queue: "",

durable: false,

exclusive: true,

autoDelete: true,

arguments: null).QueueName;

// 创建消费者监听回复队列

_consumer = new AsyncEventingBasicConsumer(_channel);

_consumer.Received += OnResponseReceived;

// 开始消费回复队列

_channel.BasicConsume(

queue: _replyQueueName,

autoAck: false,

consumer: _consumer);

_logger.LogInformation("RPC Client initialized with reply queue: {ReplyQueue}", _replyQueueName);

}

public async Task<RpcResponse> CallAsync(RpcRequest request, TimeSpan timeout)

{

if (_disposed)

throw new ObjectDisposedException(nameof(RpcClient));

var tcs = new TaskCompletionSource<RpcResponse>();

var cancellationTokenSource = new CancellationTokenSource(timeout);

// 注册超时取消

cancellationTokenSource.Token.Register(() =>

{

if (_pendingRequests.TryRemove(request.RequestId, out var removedTcs))

{

removedTcs.TrySetException(new TimeoutException($"RPC call timed out after {timeout.TotalSeconds} seconds"));

_logger.LogWarning("RPC request {RequestId} timed out", request.RequestId);

}

});

// 将请求添加到待处理字典

if (!_pendingRequests.TryAdd(request.RequestId, tcs))

{

throw new InvalidOperationException($"Request with ID {request.RequestId} is already pending");

}

try

{

// 序列化请求

var requestJson = JsonSerializer.Serialize(request);

var requestBody = Encoding.UTF8.GetBytes(requestJson);

// 设置消息属性

var properties = _channel.CreateBasicProperties();

properties.ReplyTo = _replyQueueName;

properties.CorrelationId = request.RequestId;

properties.Persistent = true;

_logger.LogDebug("Sending RPC request {RequestId} to queue: rpc_queue", request.RequestId);

// 发布请求到RPC队列

_channel.BasicPublish(

exchange: "",

routingKey: "rpc_queue",

basicProperties: properties,

body: requestBody);

_logger.LogInformation("RPC request {RequestId} sent successfully", request.RequestId);

// 等待响应

return await tcs.Task;

}

catch (Exception ex)

{

// 发生异常时移除待处理请求

_pendingRequests.TryRemove(request.RequestId, out _);

_logger.LogError(ex, "Error sending RPC request {RequestId}", request.RequestId);

throw;

}

}

public async Task<TResponse?> CallAsync<TResponse>(RpcRequest request, TimeSpan timeout) where TResponse : class

{

var response = await CallAsync(request, timeout);

if (!response.Success)

{

throw new InvalidOperationException($"RPC call failed: {response.Error}");

}

return response.GetData<TResponse>();

}

private async Task OnResponseReceived(object sender, BasicDeliverEventArgs ea)

{

var responseBody = ea.Body.ToArray();

var responseJson = Encoding.UTF8.GetString(responseBody);

var correlationId = ea.BasicProperties.CorrelationId;

_logger.LogDebug("Received RPC response for correlation ID: {CorrelationId}", correlationId);

try

{

var response = JsonSerializer.Deserialize<RpcResponse>(responseJson);

if (response == null)

{

_logger.LogError("Failed to deserialize RPC response for correlation ID: {CorrelationId}", correlationId);

return;

}

// 查找匹配的待处理请求

if (_pendingRequests.TryRemove(correlationId, out var tcs))

{

tcs.TrySetResult(response);

_logger.LogDebug("RPC response for {CorrelationId} delivered to waiting task", correlationId);

}

else

{

_logger.LogWarning("Received response for unknown correlation ID: {CorrelationId}", correlationId);

}

// 手动确认消息

_channel.BasicAck(ea.DeliveryTag, false);

}

catch (Exception ex)

{

_logger.LogError(ex, "Error processing RPC response for correlation ID: {CorrelationId}", correlationId);

// 处理失败时拒绝消息(不重新入队)

_channel.BasicNack(ea.DeliveryTag, false, false);

// 如果反序列化失败,仍然通知等待的任务

if (_pendingRequests.TryRemove(correlationId, out var tcs))

{

tcs.TrySetException(new InvalidOperationException("Failed to process RPC response"));

}

}

await Task.CompletedTask;

}

public void Dispose()

{

if (!_disposed)

{

_disposed = true;

// 取消所有待处理的请求

foreach (var (requestId, tcs) in _pendingRequests)

{

tcs.TrySetCanceled();

}

_pendingRequests.Clear();

_channel?.Close();

_channel?.Dispose();

_connection?.Close();

_connection?.Dispose();

_logger.LogInformation("RPC Client disposed");

}

}

}

}

复制代码

Services/FibonacciRpcClient.cs

复制代码

using RpcClient.Core.Services;

using RpcShared.Messages;

using RpcShared.Models;

namespace RpcClient.Core.Services

{

public class FibonacciRpcClient

{

private readonly IRpcClient _rpcClient;

private readonly ILogger<FibonacciRpcClient> _logger;

public FibonacciRpcClient(IRpcClient rpcClient, ILogger<FibonacciRpcClient> logger)

{

_rpcClient = rpcClient;

_logger = logger;

}

public async Task<long> CalculateFibonacciAsync(int number, bool useOptimized = true, TimeSpan? timeout = null)

{

var request = new RpcRequest

{

Method = "fibonacci.calculate",

Timestamp = DateTime.UtcNow

}

.WithParameter("number", number)

.WithParameter("useOptimized", useOptimized);

timeout ??= TimeSpan.FromSeconds(30);

try

{

_logger.LogInformation("Calculating Fibonacci({Number}) with timeout {Timeout}s",

number, timeout.Value.TotalSeconds);

var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value);

if (response != null)

{

_logger.LogInformation(

"Fibonacci({Number}) = {Result} (calculated in {Time}ms)",

number, response.Result, response.CalculationTimeMs);

return response.Result;

}

throw new InvalidOperationException("Received null response from RPC server");

}

catch (TimeoutException ex)

{

_logger.LogError(ex, "Fibonacci calculation timed out for number {Number}", number);

throw;

}

catch (Exception ex)

{

_logger.LogError(ex, "Error calculating Fibonacci for number {Number}", number);

throw;

}

}

public async Task<FibonacciResponse> CalculateFibonacciDetailedAsync(int number, bool useOptimized = true, TimeSpan? timeout = null)

{

var request = new RpcRequest

{

Method = "fibonacci.calculate",

Timestamp = DateTime.UtcNow

}

.WithParameter("number", number)

.WithParameter("useOptimized", useOptimized);

timeout ??= TimeSpan.FromSeconds(30);

var response = await _rpcClient.CallAsync<FibonacciResponse>(request, timeout.Value);

return response ?? throw new InvalidOperationException("Received null response from RPC server");

}

}

}

复制代码

第4步:RPC客户端API(RpcClient.API)

Program.cs

View Code

Services/IMathRpcService.cs

View Code

Services/MathRpcService.cs

View Code

Controllers/MathController.cs

View Code

第5步:RPC服务器(RpcServer.Service)

Program.cs

View Code

Services/FibonacciCalculator.cs

View Code

Services/FibonacciRpcServer.cs

View Code

第6步:高级特性 - 带重试的RPC客户端

Services/ResilientRpcClient.cs

View Code

第7步:运行与测试

启动服务

复制代码

# 终端1:启动RPC服务器

cd RpcServer.Service

dotnet run

# 终端2:启动RPC客户端API

cd RpcClient.API

dotnet run

复制代码

测试API

复制代码

# 计算斐波那契数列

curl -X GET "https://localhost:7000/api/math/fibonacci/10"

curl -X GET "https://localhost:7000/api/math/fibonacci/20/detailed"

# 健康检查

curl -X GET "https://localhost:7000/api/math/health"

复制代码

测试错误场景

# 测试超时(设置很小的超时时间)

# 测试无效输入

curl -X GET "https://localhost:7000/api/math/fibonacci/-5"

curl -X GET "https://localhost:7000/api/math/fibonacci/100"

观察日志输出

客户端发送请求,生成CorrelationId

服务器接收请求,处理计算

服务器发送响应,使用相同的CorrelationId

客户端接收响应,匹配CorrelationId

第8步:性能测试和监控

创建性能测试控制器

复制代码

[ApiController]

[Route("api/[controller]")]

public class BenchmarkController : ControllerBase

{

private readonly IMathRpcService _mathService;

private readonly ILogger<BenchmarkController> _logger;

public BenchmarkController(IMathRpcService mathService, ILogger<BenchmarkController> logger)

{

_mathService = mathService;

_logger = logger;

}

[HttpPost("fibonacci/batch")]

public async Task<ActionResult> CalculateFibonacciBatch([FromBody] List<int> numbers)

{

var results = new List<object>();

var totalStopwatch = System.Diagnostics.Stopwatch.StartNew();

foreach (var number in numbers)

{

var stopwatch = System.Diagnostics.Stopwatch.StartNew();

try

{

var result = await _mathService.CalculateFibonacciAsync(number);

results.Add(new

{

number,

result,

success = true,

durationMs = stopwatch.ElapsedMilliseconds

});

}

catch (Exception ex)

{

results.Add(new

{

number,

success = false,

error = ex.Message,

durationMs = stopwatch.ElapsedMilliseconds

});

}

}

return Ok(new

{

totalDurationMs = totalStopwatch.ElapsedMilliseconds,

requests = numbers.Count,

results

});

}

}

复制代码

本章总结

在这一章中,我们完整实现了RabbitMQ的RPC模式:

RPC核心概念:理解了回调队列、关联ID、请求-响应模式。

客户端实现:创建了能够发送请求并异步等待响应的RPC客户端。

服务器实现:构建了处理请求并返回响应的RPC服务器。

错误处理:实现了超时控制、异常处理和重试机制。

性能优化:使用缓存和优化算法提高计算效率。

** resilience**:通过Polly实现了弹性重试策略。

RPC模式为微服务架构提供了强大的同步通信能力,结合消息队列的异步特性,既保持了

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/15 8:52:20

25、技术探索:从数据查询到包管理的全面指南

技术探索:从数据查询到包管理的全面指南 在软件开发和系统管理的领域中,数据查询和包管理是两个至关重要的方面。下面我们将深入探讨这两个领域的相关内容。 数据查询与路由 在数据处理中,从数据存储中获取特定记录是常见操作。以下是一段用于从数据存储中获取最后 10 条…

作者头像 李华
网站建设 2026/3/29 13:18:56

高频高速 PCB 能不能量产,打样阶段就能看出来吗?

本文基于深圳市充裕科技有限公司在高频高速 PCB 项目中的实际打样与量产经验整理。 很多高频高速 PCB 项目&#xff0c;在打样阶段都会经历一个“虚假的安全感”&#xff1a; 首版就过测试阻抗、插损数据漂亮项目推进顺利 但当项目进入量产后&#xff0c;却开始陆续出现&…

作者头像 李华
网站建设 2026/4/17 20:33:35

Go 语言未来会取代 Java 吗?

Go 语言&#xff08;Golang&#xff09;不会完全取代 Java&#xff0c;至少在可预见的未来&#xff08;包括 2025 年及以后&#xff09;不会。两者在编程领域有重叠&#xff0c;但各自的优势领域不同&#xff0c;Java 的庞大生态、遗留系统和企业主导地位让它难以被彻底取代&am…

作者头像 李华
网站建设 2026/4/17 7:44:34

1分钟原型:用快马快速验证Vue3生命周期想法

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 构建一个极简原型&#xff1a;1.单文件组件包含所有生命周期钩子&#xff1b;2.每个钩子触发时在界面显示阶段名称和时间戳&#xff1b;3.添加强制重新渲染按钮。要求&#xff1a;1…

作者头像 李华
网站建设 2026/4/12 12:18:50

AI一键配置:用快马自动下载安装MinGW-w64环境

快速体验 打开 InsCode(快马)平台 https://www.inscode.net输入框内输入如下内容&#xff1a; 创建一个自动化脚本&#xff0c;能够自动检测操作系统类型(Windows 10/11)&#xff0c;从官网下载最新版MinGW-w64安装包(64位)&#xff0c;自动完成安装并配置系统环境变量PATH。要…

作者头像 李华
网站建设 2026/4/4 4:57:36

8、Puppet编程:变量、表达式与系统信息的运用

Puppet编程:变量、表达式与系统信息的运用 1. Puppet资源创建与更新 在Puppet中,若将字符串数组作为资源的标题,Puppet会创建多个除标题外完全相同的资源。这种方式不仅适用于软件包,还适用于文件、用户等任何类型的资源。 在应用Puppet清单之前,通常会运行 sudo apt-…

作者头像 李华