news 2026/4/17 18:00:26

ue 5.5 c++ mqtt 订阅/发布消息 字符串

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ue 5.5 c++ mqtt 订阅/发布消息 字符串

插件 mqtt支持

测试可以发送,接收长度小于100的字符串消息,长消息,会崩溃。

PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "InputCore", "EnhancedInput", "WebSockets", "Json", "JsonUtilities", "MQTTCore", "AudioMixer"});

MyObject.h

// Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #include "UObject/NoExportTypes.h" #include "IMQTTClient.h" #include "MyObject.generated.h" /** * */ UCLASS(BlueprintType, Blueprintable) class METAHUMANCHARACTERHEIXI_API UMyObject : public UObject { GENERATED_BODY() public: TSharedPtr<IMQTTClient, ESPMode::ThreadSafe> MQTTClient; UFUNCTION(BlueprintCallable, Category = "Demo") void HelloWorld(); void SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels); TMap<int32, FString> AudioChunks; // key = 片索引 int32 TotalChunks = 0; int32 FileIndex = 0; };

MyObject.cpp

// Fill out your copyright notice in the Description page of Project Settings. #include "MyObject.h" #include "MQTTSubsystem.h" #include "IMQTTCoreModule.h" #include "MQTTClientSettings.h" #include "MQTTShared.h" #include "Engine/Engine.h" #include "Misc/FileHelper.h" #include "Misc/Paths.h" #include "Async/Async.h" void UMyObject::SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } inline void SaveWavToFile(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } void UMyObject::HelloWorld() { UE_LOG(LogTemp, Warning, TEXT("Hello World from UMyObject!")); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Hello World from UMyObject!")); } IMQTTCoreModule& MQTTModule = FModuleManager::LoadModuleChecked<IMQTTCoreModule>("MQTTCore"); FMQTTURL URL; URL.Host = TEXT("127.0.0.1"); URL.Port = 1883; MQTTClient = MQTTModule.GetOrCreateClient(URL); if (!MQTTClient.IsValid()) { UE_LOG(LogTemp, Error, TEXT("MQTTClient 无效!")); return; } //TMap<int32, FString> AudioChunks; // key = 片索引 //int32 TotalChunks = 0; MQTTClient->OnMessage().AddLambda([this](const FMQTTClientMessage& Msg) { FString Topic = Msg.Topic; if (Topic == TEXT("ue/audio")) { FString Payload = Msg.GetPayloadAsString(); // 格式解析: "index/total:chunk" FString IndexStr, TotalStr, ChunkData; if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) { int32 Index = FCString::Atoi(*IndexStr); int32 Total = FCString::Atoi(*TotalStr); TotalChunks = Total; AudioChunks.Add(Index, ChunkData); // 检查是否收到所有分片 if (AudioChunks.Num() == TotalChunks) { // 拼接 FString FullB64; for (int32 i = 0; i < TotalChunks; ++i) { FullB64 += AudioChunks[i]; } // Base64 decode TArray<uint8> AudioBytes; if (FBase64::Decode(FullB64, AudioBytes)) { //UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); FString FilePath = FString::Printf(TEXT("D:/tmp/received_%03d.wav"), FileIndex++); SaveWavToFile(FilePath, AudioBytes, 16000, 1); } AudioChunks.Empty(); // 清理,准备下一条音频 } } } }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg_a) // { // FMQTTClientMessage Msg = Msg_a; // const FString Topic = Msg_a.Topic; // //const FString PayloadStr = Msg_a.GetPayloadAsString(); // //TArray<uint8> PayloadBytes = Msg_a.Payload; // AsyncTask(ENamedThreads::GameThread, [Topic, Msg]() // { // if (Topic == TEXT("ue/command")) // { // // 只在“明确是文本 topic”时才解析字符串 // FString TextPayload = Msg.GetPayloadAsString(); // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // } // else if (Topic == TEXT("ue/audio")) // { // // Base64 一定用 string // FString Payload = Msg.GetPayloadAsString(); // // 格式解析: "index/total:chunk" // FString IndexStr, TotalStr, ChunkData; // if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && // IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) // { // int32 Index = FCString::Atoi(*IndexStr); // int32 Total = FCString::Atoi(*TotalStr); // TotalChunks = Total; // AudioChunks.Add(Index, ChunkData); // // 检查是否收到所有分片 // if (AudioChunks.Num() == TotalChunks) // { // // 拼接 // FString FullB64; // for (int32 i = 0; i < TotalChunks; ++i) // { // FullB64 += AudioChunks[i]; // } // // Base64 decode // TArray<uint8> AudioBytes; // if (FBase64::Decode(FullB64, AudioBytes)) // { // UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); // FString FilePath = TEXT("D:/tmp/received.wav"); // SaveWavToFile(FilePath, AudioBytes, 44100, 1); // } // AudioChunks.Empty(); // 清理,准备下一条音频 // } // ////// 再 decode // //TArray<uint8> AudioBytes; // //if (FBase64::Decode(B64, AudioBytes)) // //{ // // UE_LOG(LogTemp, Error, TEXT("Audio bytes: %s"), *B64); // // //UE_LOG(LogTemp, Error, TEXT("Audio bytes")); // // // 这里可以调用 SaveWav 函数保存 // // FString FilePath = TEXT("D:/tmp/received.wav"); // // //SaveWavToFile(FilePath, AudioBytes, 44100, 1); // //} // //else // //{ // // UE_LOG(LogTemp, Warning, TEXT("Base64 decode failed!")); // //} // } // else // { // UE_LOG(LogTemp, Warning, TEXT("Unknown topic: %s"), *Topic); // } // } // }); // }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg) // { // const FString TopicCopy = Msg.Topic; // const bool bIsCommand = (TopicCopy == TEXT("ue/command")); // FString TextPayload; // TArray<uint8> BinaryPayload; // if (bIsCommand) // { // TextPayload = Msg.GetPayloadAsString(); // 文本 OK // } // else // { // BinaryPayload.SetNumUninitialized(Msg.Payload.Num()); // FMemory::Memcpy( // BinaryPayload.GetData(), // Msg.Payload.GetData(), // Msg.Payload.Num() // ); // // } // AsyncTask(ENamedThreads::GameThread, // [TopicCopy, bIsCommand, TextPayload, BinaryPayload]() // { // if (bIsCommand) // { // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // /*if (GEngine) // { // GEngine->AddOnScreenDebugMessage( // -1, 5.f, FColor::Yellow, TextPayload); // }*/ // } // else // { // UE_LOG(LogTemp, Log, // TEXT("Audio received, bytes=%d"), // BinaryPayload.Num()); // // 这里再存 wav / 播放 / 推 Audio2Face // } // }); // //AsyncTask(ENamedThreads::GameThread, [Msg](){ // // const FString& TopicCopy = Msg.Topic; // // //FString Topic = Msg.Topic; // // if (TopicCopy == "ue/command") { // // FString Payload = Msg.GetPayloadAsString(); // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, Payload=%s"), *TopicCopy, *Payload); // // if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s, Payload=%s"), *TopicCopy, *Payload)); // // } // // } // // else { // // // 拷贝音频数据,避免后台线程释放导致崩溃 // // //TArray<uint8> PayloadCopy = Msg.Payload; // // //FString FilePath = FPaths::ProjectSavedDir() / TEXT("received.wav"); // // //FString FilePath = FPaths::ProjectSavedDir() + TEXT("received.wav"); // // FString FilePath = TEXT("C:/Users/ChanJing-01/Desktop/tmp/0121/received.wav"); // // //if (PayloadCopy.Num() > 0) // // //{ // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] start Saved audio ")); // // // const int32 SampleRate = 44100; // // // const int32 NumChannels = 1; // // // const int32 BitsPerSample = 16; // // // const int32 BlockAlign = NumChannels * BitsPerSample / 8; // // // const int32 ByteRate = SampleRate * BlockAlign; // // // const int32 DataSize = PayloadCopy.Num(); // // // const int32 ChunkSize = 36 + DataSize; // // // //TArray<uint8> Wav; // // // //Wav.Reserve(44 + DataSize); // // // //auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; // // // //auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // // // //// WAV header // // // //Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); // // // //AppendInt32(ChunkSize); // // // //Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); // // // //Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); // // // //AppendInt32(16); // // // //AppendInt16(1); // // // //AppendInt16(NumChannels); // // // //AppendInt32(SampleRate); // // // //AppendInt32(ByteRate); // // // //AppendInt16(BlockAlign); // // // //AppendInt16(BitsPerSample); // // // //Wav.Append(reinterpret_cast<const uint8*>("data"), 4); // // // //AppendInt32(DataSize); // // // //// PCM 数据 // // // //Wav.Append(PayloadCopy); // // // //FFileHelper::SaveArrayToFile(Wav, *FilePath); // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] Saved audio, size=%d bytes"), DataSize); // // // // 调用你的 SaveWav 函数保存 // // // SaveWavToFile(FilePath, PayloadCopy, 44100, 1); // // // if (GEngine) // // // { // // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, // // // FString::Printf(TEXT("音频已保存: %s"), *FilePath)); // // // } // // //} // // /* const TArray<uint8>& AudioBytes = Msg.Payload; // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, audio"), *Topic);*/ // // /*if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s,audio"), *Topic)); // // }*/ // // } // // }); // }); MQTTClient->OnConnect().AddLambda([this](EMQTTConnectReturnCode ReturnCode) { if (ReturnCode == EMQTTConnectReturnCode::Accepted) { UE_LOG(LogTemp, Error, TEXT("IMQTTCoreModule 127.0.0.1--已连接---")); UE_LOG(LogTemp, Log, TEXT("MQTT 已连接!")); TArray<TPair<FString, EMQTTQualityOfService>> TopicsToSubscribe; TopicsToSubscribe.Add(MakeTuple(FString("ue/command"), EMQTTQualityOfService::Once)); TopicsToSubscribe.Add(MakeTuple(FString("ue/audio"), EMQTTQualityOfService::Once)); this->MQTTClient->Subscribe(TopicsToSubscribe); } else { UE_LOG(LogTemp, Warning, TEXT("MQTT 连接失败,ReturnCode=%d"), static_cast<int32>(ReturnCode)); } }); MQTTClient->Connect(); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("IMQTTCoreModule 127.0.0.1----------!")); } }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/11 17:16:41

STL转STEP终极指南:5步精通3D模型格式转换

STL转STEP终极指南&#xff1a;5步精通3D模型格式转换 【免费下载链接】stltostp Convert stl files to STEP brep files 项目地址: https://gitcode.com/gh_mirrors/st/stltostp 在3D设计和制造领域&#xff0c;STL转STEP格式转换是连接3D打印与专业CAD设计的关键桥梁。…

作者头像 李华
网站建设 2026/4/10 9:28:38

BabelDOC完全指南:零基础掌握PDF智能翻译核心技术

BabelDOC完全指南&#xff1a;零基础掌握PDF智能翻译核心技术 【免费下载链接】BabelDOC Yet Another Document Translator 项目地址: https://gitcode.com/GitHub_Trending/ba/BabelDOC 还在为阅读外文PDF文档而头疼吗&#xff1f;BabelDOC作为专业的文档翻译工具&…

作者头像 李华
网站建设 2026/4/14 23:51:24

计算机毕业设计springboot汽车租赁管理系统 基于SpringBoot的共享汽车运营平台 轻量级汽车在线租赁与订单追踪系统

计算机毕业设计springboot汽车租赁管理系统qd7xr &#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。 注:以上是纯课题毕业设计功能介绍&#xff0c;并非实际开发完成&#xff0c;最…

作者头像 李华
网站建设 2026/4/16 19:08:56

Downkyi哔哩下载姬完整使用指南:5步轻松掌握B站视频下载技巧

Downkyi哔哩下载姬完整使用指南&#xff1a;5步轻松掌握B站视频下载技巧 【免费下载链接】downkyi 哔哩下载姬downkyi&#xff0c;哔哩哔哩网站视频下载工具&#xff0c;支持批量下载&#xff0c;支持8K、HDR、杜比视界&#xff0c;提供工具箱&#xff08;音视频提取、去水印等…

作者头像 李华
网站建设 2026/4/6 17:20:33

PDF文档翻译终极指南:三步实现智能格式保留翻译

PDF文档翻译终极指南&#xff1a;三步实现智能格式保留翻译 【免费下载链接】BabelDOC Yet Another Document Translator 项目地址: https://gitcode.com/GitHub_Trending/ba/BabelDOC 还在为阅读外文PDF文档而烦恼吗&#xff1f;BabelDOC作为专业的PDF翻译工具&#xf…

作者头像 李华