news 2026/6/10 14:23:04

ue 5.5 c++ mqtt 订阅/发布消息 字符串

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ue 5.5 c++ mqtt 订阅/发布消息 字符串

插件 mqtt支持

测试可以发送,接收长度小于100的字符串消息,长消息,会崩溃。

PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "InputCore", "EnhancedInput", "WebSockets", "Json", "JsonUtilities", "MQTTCore", "AudioMixer"});

MyObject.h

// Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #include "UObject/NoExportTypes.h" #include "IMQTTClient.h" #include "MyObject.generated.h" /** * */ UCLASS(BlueprintType, Blueprintable) class METAHUMANCHARACTERHEIXI_API UMyObject : public UObject { GENERATED_BODY() public: TSharedPtr<IMQTTClient, ESPMode::ThreadSafe> MQTTClient; UFUNCTION(BlueprintCallable, Category = "Demo") void HelloWorld(); void SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels); TMap<int32, FString> AudioChunks; // key = 片索引 int32 TotalChunks = 0; int32 FileIndex = 0; };

MyObject.cpp

// Fill out your copyright notice in the Description page of Project Settings. #include "MyObject.h" #include "MQTTSubsystem.h" #include "IMQTTCoreModule.h" #include "MQTTClientSettings.h" #include "MQTTShared.h" #include "Engine/Engine.h" #include "Misc/FileHelper.h" #include "Misc/Paths.h" #include "Async/Async.h" void UMyObject::SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } inline void SaveWavToFile(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } void UMyObject::HelloWorld() { UE_LOG(LogTemp, Warning, TEXT("Hello World from UMyObject!")); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Hello World from UMyObject!")); } IMQTTCoreModule& MQTTModule = FModuleManager::LoadModuleChecked<IMQTTCoreModule>("MQTTCore"); FMQTTURL URL; URL.Host = TEXT("127.0.0.1"); URL.Port = 1883; MQTTClient = MQTTModule.GetOrCreateClient(URL); if (!MQTTClient.IsValid()) { UE_LOG(LogTemp, Error, TEXT("MQTTClient 无效!")); return; } //TMap<int32, FString> AudioChunks; // key = 片索引 //int32 TotalChunks = 0; MQTTClient->OnMessage().AddLambda([this](const FMQTTClientMessage& Msg) { FString Topic = Msg.Topic; if (Topic == TEXT("ue/audio")) { FString Payload = Msg.GetPayloadAsString(); // 格式解析: "index/total:chunk" FString IndexStr, TotalStr, ChunkData; if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) { int32 Index = FCString::Atoi(*IndexStr); int32 Total = FCString::Atoi(*TotalStr); TotalChunks = Total; AudioChunks.Add(Index, ChunkData); // 检查是否收到所有分片 if (AudioChunks.Num() == TotalChunks) { // 拼接 FString FullB64; for (int32 i = 0; i < TotalChunks; ++i) { FullB64 += AudioChunks[i]; } // Base64 decode TArray<uint8> AudioBytes; if (FBase64::Decode(FullB64, AudioBytes)) { //UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); FString FilePath = FString::Printf(TEXT("D:/tmp/received_%03d.wav"), FileIndex++); SaveWavToFile(FilePath, AudioBytes, 16000, 1); } AudioChunks.Empty(); // 清理,准备下一条音频 } } } }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg_a) // { // FMQTTClientMessage Msg = Msg_a; // const FString Topic = Msg_a.Topic; // //const FString PayloadStr = Msg_a.GetPayloadAsString(); // //TArray<uint8> PayloadBytes = Msg_a.Payload; // AsyncTask(ENamedThreads::GameThread, [Topic, Msg]() // { // if (Topic == TEXT("ue/command")) // { // // 只在“明确是文本 topic”时才解析字符串 // FString TextPayload = Msg.GetPayloadAsString(); // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // } // else if (Topic == TEXT("ue/audio")) // { // // Base64 一定用 string // FString Payload = Msg.GetPayloadAsString(); // // 格式解析: "index/total:chunk" // FString IndexStr, TotalStr, ChunkData; // if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && // IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) // { // int32 Index = FCString::Atoi(*IndexStr); // int32 Total = FCString::Atoi(*TotalStr); // TotalChunks = Total; // AudioChunks.Add(Index, ChunkData); // // 检查是否收到所有分片 // if (AudioChunks.Num() == TotalChunks) // { // // 拼接 // FString FullB64; // for (int32 i = 0; i < TotalChunks; ++i) // { // FullB64 += AudioChunks[i]; // } // // Base64 decode // TArray<uint8> AudioBytes; // if (FBase64::Decode(FullB64, AudioBytes)) // { // UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); // FString FilePath = TEXT("D:/tmp/received.wav"); // SaveWavToFile(FilePath, AudioBytes, 44100, 1); // } // AudioChunks.Empty(); // 清理,准备下一条音频 // } // ////// 再 decode // //TArray<uint8> AudioBytes; // //if (FBase64::Decode(B64, AudioBytes)) // //{ // // UE_LOG(LogTemp, Error, TEXT("Audio bytes: %s"), *B64); // // //UE_LOG(LogTemp, Error, TEXT("Audio bytes")); // // // 这里可以调用 SaveWav 函数保存 // // FString FilePath = TEXT("D:/tmp/received.wav"); // // //SaveWavToFile(FilePath, AudioBytes, 44100, 1); // //} // //else // //{ // // UE_LOG(LogTemp, Warning, TEXT("Base64 decode failed!")); // //} // } // else // { // UE_LOG(LogTemp, Warning, TEXT("Unknown topic: %s"), *Topic); // } // } // }); // }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg) // { // const FString TopicCopy = Msg.Topic; // const bool bIsCommand = (TopicCopy == TEXT("ue/command")); // FString TextPayload; // TArray<uint8> BinaryPayload; // if (bIsCommand) // { // TextPayload = Msg.GetPayloadAsString(); // 文本 OK // } // else // { // BinaryPayload.SetNumUninitialized(Msg.Payload.Num()); // FMemory::Memcpy( // BinaryPayload.GetData(), // Msg.Payload.GetData(), // Msg.Payload.Num() // ); // // } // AsyncTask(ENamedThreads::GameThread, // [TopicCopy, bIsCommand, TextPayload, BinaryPayload]() // { // if (bIsCommand) // { // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // /*if (GEngine) // { // GEngine->AddOnScreenDebugMessage( // -1, 5.f, FColor::Yellow, TextPayload); // }*/ // } // else // { // UE_LOG(LogTemp, Log, // TEXT("Audio received, bytes=%d"), // BinaryPayload.Num()); // // 这里再存 wav / 播放 / 推 Audio2Face // } // }); // //AsyncTask(ENamedThreads::GameThread, [Msg](){ // // const FString& TopicCopy = Msg.Topic; // // //FString Topic = Msg.Topic; // // if (TopicCopy == "ue/command") { // // FString Payload = Msg.GetPayloadAsString(); // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, Payload=%s"), *TopicCopy, *Payload); // // if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s, Payload=%s"), *TopicCopy, *Payload)); // // } // // } // // else { // // // 拷贝音频数据,避免后台线程释放导致崩溃 // // //TArray<uint8> PayloadCopy = Msg.Payload; // // //FString FilePath = FPaths::ProjectSavedDir() / TEXT("received.wav"); // // //FString FilePath = FPaths::ProjectSavedDir() + TEXT("received.wav"); // // FString FilePath = TEXT("C:/Users/ChanJing-01/Desktop/tmp/0121/received.wav"); // // //if (PayloadCopy.Num() > 0) // // //{ // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] start Saved audio ")); // // // const int32 SampleRate = 44100; // // // const int32 NumChannels = 1; // // // const int32 BitsPerSample = 16; // // // const int32 BlockAlign = NumChannels * BitsPerSample / 8; // // // const int32 ByteRate = SampleRate * BlockAlign; // // // const int32 DataSize = PayloadCopy.Num(); // // // const int32 ChunkSize = 36 + DataSize; // // // //TArray<uint8> Wav; // // // //Wav.Reserve(44 + DataSize); // // // //auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; // // // //auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // // // //// WAV header // // // //Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); // // // //AppendInt32(ChunkSize); // // // //Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); // // // //Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); // // // //AppendInt32(16); // // // //AppendInt16(1); // // // //AppendInt16(NumChannels); // // // //AppendInt32(SampleRate); // // // //AppendInt32(ByteRate); // // // //AppendInt16(BlockAlign); // // // //AppendInt16(BitsPerSample); // // // //Wav.Append(reinterpret_cast<const uint8*>("data"), 4); // // // //AppendInt32(DataSize); // // // //// PCM 数据 // // // //Wav.Append(PayloadCopy); // // // //FFileHelper::SaveArrayToFile(Wav, *FilePath); // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] Saved audio, size=%d bytes"), DataSize); // // // // 调用你的 SaveWav 函数保存 // // // SaveWavToFile(FilePath, PayloadCopy, 44100, 1); // // // if (GEngine) // // // { // // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, // // // FString::Printf(TEXT("音频已保存: %s"), *FilePath)); // // // } // // //} // // /* const TArray<uint8>& AudioBytes = Msg.Payload; // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, audio"), *Topic);*/ // // /*if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s,audio"), *Topic)); // // }*/ // // } // // }); // }); MQTTClient->OnConnect().AddLambda([this](EMQTTConnectReturnCode ReturnCode) { if (ReturnCode == EMQTTConnectReturnCode::Accepted) { UE_LOG(LogTemp, Error, TEXT("IMQTTCoreModule 127.0.0.1--已连接---")); UE_LOG(LogTemp, Log, TEXT("MQTT 已连接!")); TArray<TPair<FString, EMQTTQualityOfService>> TopicsToSubscribe; TopicsToSubscribe.Add(MakeTuple(FString("ue/command"), EMQTTQualityOfService::Once)); TopicsToSubscribe.Add(MakeTuple(FString("ue/audio"), EMQTTQualityOfService::Once)); this->MQTTClient->Subscribe(TopicsToSubscribe); } else { UE_LOG(LogTemp, Warning, TEXT("MQTT 连接失败,ReturnCode=%d"), static_cast<int32>(ReturnCode)); } }); MQTTClient->Connect(); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("IMQTTCoreModule 127.0.0.1----------!")); } }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/6/10 11:41:42

中小企业AI转型:Qwen3-1.7B低门槛部署实战指南

中小企业AI转型&#xff1a;Qwen3-1.7B低门槛部署实战指南 在当前人工智能加速落地的背景下&#xff0c;越来越多中小企业开始探索如何将大模型技术融入日常运营。然而&#xff0c;高昂的算力成本、复杂的部署流程和专业人才的缺乏&#xff0c;常常成为阻碍其AI转型的主要瓶颈…

作者头像 李华
网站建设 2026/6/10 13:06:52

AI研发提效新方式:MinerU本地部署一文详解

AI研发提效新方式&#xff1a;MinerU本地部署一文详解 1. 为什么PDF提取需要AI&#xff1f;传统方法的瓶颈在哪 你有没有遇到过这种情况&#xff1a;手头有一份几十页的学术论文或技术报告PDF&#xff0c;想把内容复制到Markdown里整理笔记&#xff0c;结果一粘贴全是乱码、错…

作者头像 李华
网站建设 2026/6/10 11:37:42

百度文库文档免费获取工具:终极清理与打印优化指南

百度文库文档免费获取工具&#xff1a;终极清理与打印优化指南 【免费下载链接】baidu-wenku fetch the document for free 项目地址: https://gitcode.com/gh_mirrors/ba/baidu-wenku 想要免费获取百度文库的完整文档内容吗&#xff1f;这款百度文库文档获取工具通过智…

作者头像 李华
网站建设 2026/6/10 11:37:17

Packmol分子结构构建工具深度解析与实战指南

Packmol分子结构构建工具深度解析与实战指南 【免费下载链接】packmol Packmol - Initial configurations for molecular dynamics simulations 项目地址: https://gitcode.com/gh_mirrors/pa/packmol Packmol作为分子动力学模拟领域的重要前置工具&#xff0c;为复杂分…

作者头像 李华
网站建设 2026/6/10 1:43:01

通义千问3-14B避坑指南:单卡部署常见问题全解

通义千问3-14B避坑指南&#xff1a;单卡部署常见问题全解 你是不是也和我一样&#xff0c;看到“14B体量、30B性能”、“单卡可跑”、“Thinking模式逼近QwQ-32B”这些关键词就心动不已&#xff1f;但一上手却发现&#xff1a;显存爆了、加载失败、响应卡顿、Ollama启动报错……

作者头像 李华
网站建设 2026/6/10 3:08:11

手把手教你部署OCR模型:cv_resnet18_ocr-detection保姆级教程

手把手教你部署OCR模型&#xff1a;cv_resnet18_ocr-detection保姆级教程 你是不是也遇到过这样的问题&#xff1a;一堆扫描件、截图、照片里的文字&#xff0c;手动一个个敲进电脑&#xff0c;费时又容易出错&#xff1f;别急&#xff0c;今天这篇文章就是为你准备的。我会带…

作者头像 李华