news 2026/4/16 13:27:05

mediasoup源码走读(五)——RTP流处理

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
mediasoup源码走读(五)——RTP流处理

🧩 5.1、整体架构图

发送RTP包
接收RTP包
路由决策
维护关系
维护关系
存储流
请求流
转发数据
发送RTP包
推流客户端
WebRtcTransport
Worker进程
Router
Producer
Consumer
WebRtcTransport
观看客户端

说明

  • Worker进程:C++子进程,负责媒体处理(ICE/DTLS/RTP)
  • Router:媒体流逻辑中枢,维护Producer-Consumer关系
  • Producer:媒体源(推流端),存储并管理RTP流
  • Consumer:媒体接收端(观看端),请求并消费RTP流
  • WebRtcTransport:网络传输层,处理RTP/RTCP协议

📦 5.2、RTP包流转全流程

🌟 关键流程图(含函数名中文注释)

推流客户端
WebRtcTransport
Router
Producer
Consumer
WebRtcTransport
观看客户端
sendRtpPacket
onTransportProducerRtpPacketReceived
addRtpPacket
遍历Consumer列表
onRtpPacket

说明

  1. sendRtpPacket():推流客户端发送RTP包
  2. onTransportProducerRtpPacketReceived():Worker接收RTP包后触发
  3. addRtpPacket():Producer存储RTP包
  4. onRtpPacket():Consumer处理RTP包
  5. sendRtpPacket():Consumer转发RTP包给观看端

⏱️ 5.3、关键时序图

推流客户端WebRtcTransportWorkerRouterProducerConsumer观看客户端sendRtpPacket(rtpPacket)onTransportProducerRtpPacketReceived(producer, rtpPacket)onTransportProducerRtpPacketReceived(producer, rtpPacket)addRtpPacket(rtpPacket)遍历mapProducerConsumers[producer]onRtpPacket(rtpPacket)sendRtpPacket(rtpPacket)发送RTP包loop[每个Consumer]解码并渲染推流客户端WebRtcTransportWorkerRouterProducerConsumer观看客户端

说明

  • onTransportProducerRtpPacketReceived():Worker接收RTP包的核心入口
  • addRtpPacket():Producer存储RTP包,用于RTX重传
  • mapProducerConsumers[producer]:Router维护的Producer-Consumer映射
  • onRtpPacket():Consumer处理RTP包的核心方法

📊 5.4、关键类图关联

包含
关联
关联
包含
包含
关联
«interface»
EnhancedEventEmitter
+emit(event, data)
+on(event, callback)
Worker
-channel: Channel 与JS层通信
-routerMap: Map 管理多个Router
+createRouter(options)
+onTransportProducerRtpPacketReceived(producer, packet)
Router
-producerMap: Map 所有Producer
-consumerMap: Map 所有Consumer
-mapProducerConsumers: Map> Producer-Consumer关系
-mapConsumerProducer: Map Consumer-Producer关系
+onTransportProducerRtpPacketReceived(producer, packet)
+onTransportNewProducer(transport, producer)
+onTransportNewConsumer(transport, consumer, producerId)
Producer
-rtpStreamMap: Map 存储RTP流
-producerId: string Producer唯一ID
+addRtpPacket(packet)
+getRtpStream(ssrc)
+requestRtx(ssrc, seq)
Consumer
-rtpStreamMap: Map 存储RTP流
-producer: Producer 关联的Producer
-consumerId: string Consumer唯一ID
+onRtpPacket(packet)
+requestNack(seq)
WebRtcTransport
-localSdp: string 本地SDP
-remoteSdp: string 远端SDP
+sendRtpPacket(packet)
+sendRtcpPacket(packet)
RtpStream
-ssrc: uint32 SSRC
-sequenceNumber: uint16 序列号
-timestamp: uint32 时间戳
-packetList: Queue 包队列
+addPacket(packet)
+getPacket(seq)

说明

  • mapProducerConsumers:Router维护的Producer-Consumer映射(一对多)
  • mapConsumerProducer:Router维护的Consumer-Producer映射(一对一)
  • RtpStream:RTP流管理单元,存储RTP包队列
  • addPacket():RtpStream存储RTP包

🔧 5.5、关键代码片段

5.5.1. Router核心处理逻辑(C++层)

// 文件: src/Router.cpp/** * 处理接收到的RTP包(核心入口函数) * @param producer 关联的Producer * @param packet 接收到的RTP包 */voidRouter::onTransportProducerRtpPacketReceived(Producer*producer,RtpPacket*packet){// 1. 检查是否为RTX包(重传包)if(packet->isRtx()){handleRtxPacket(producer,packet);// 处理RTX重传包return;}// 2. 检查RTP包是否为新流(SSRC首次出现)if(!producer->hasRtpStream(packet->ssrc())){// 2.1 创建新的RTP流RtpStream*rtpStream=newRtpStream(packet->ssrc(),packet);producer->addRtpStream(rtpStream);// 2.2 通知所有关联的Consumer新流已建立auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onNewRtpStream(rtpStream);// Consumer处理新流}}// 3. 将RTP包添加到Producer的RTP流中producer->addRtpPacket(packet);// 4. 遍历所有关联的Consumer,转发RTP包auto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(packet);// Consumer处理RTP包}}/** * 处理RTX重传包 * @param producer 关联的Producer * @param rtxPacket RTX重传包 */voidRouter::handleRtxPacket(Producer*producer,RtpPacket*rtxPacket){// 1. 从RTX包中提取原始RTP包信息RtpPacket*originalPacket=rtxPacket->getOriginalPacket();// 2. 检查原始包是否已存在(是否已接收过)if(!producer->hasRtpStream(originalPacket->ssrc())){// 2.1 如果原始包不存在,请求重传producer->requestRtx(originalPacket->ssrc(),originalPacket->sequenceNumber());return;}// 3. 将原始RTP包转发给所有关联的Consumerauto&consumers=mapProducerConsumers[producer];for(auto*consumer:consumers){consumer->onRtpPacket(originalPacket);// 转发原始包}}/** * 新Producer创建时的处理 * @param transport 关联的Transport * @param producer 新创建的Producer */voidRouter::onTransportNewProducer(Transport*transport,Producer*producer){// 1. 将Producer添加到Router的Producer列表producerMap[producer->id()]=producer;// 2. 初始化Producer的Consumer集合mapProducerConsumers[producer]=std::unordered_set<Consumer*>();// 3. 通知应用层(可选)emit("producer",producer);}/** * 新Consumer创建时的处理 * @param transport 关联的Transport * @param consumer 新创建的Consumer * @param producerId 要消费的Producer ID */voidRouter::onTransportNewConsumer(Transport*transport,Consumer*consumer,conststd::string&producerId){// 1. 查找目标ProducerautoproducerIt=producerMap.find(producerId);if(producerIt==producerMap.end()){throwstd::runtime_error("Producer not found: "+producerId);}Producer*producer=producerIt->second;// 2. 建立Consumer与Producer的关联mapConsumerProducer[consumer]=producer;mapProducerConsumers[producer].insert(consumer);// 3. 通知应用层(可选)emit("consumer",consumer);// 4. 如果Producer已有数据,立即发送if(producer->isRtpStreamActive()){producer->sendRtpStreamToConsumer(consumer);}}

5.5.2. Producer核心处理逻辑(C++层)

// 文件: src/Producer.cpp/** * 添加RTP包到Producer的RTP流 * @param packet 要添加的RTP包 */voidProducer::addRtpPacket(RtpPacket*packet){// 1. 获取或创建RTP流RtpStream*rtpStream=getOrCreateRtpStream(packet->ssrc());// 2. 将RTP包添加到RTP流队列rtpStream->addPacket(packet);// 3. 更新统计信息updateStatistics(packet);}/** * 获取或创建RTP流 * @param ssrc RTP流的SSRC * @return RtpStream指针 */RtpStream*Producer::getOrCreateRtpStream(uint32_tssrc){// 1. 检查是否已存在该SSRC的RTP流autoit=rtpStreamMap.find(ssrc);if(it!=rtpStreamMap.end()){returnit->second;}// 2. 创建新的RTP流RtpStream*rtpStream=newRtpStream(ssrc);rtpStreamMap[ssrc]=rtpStream;returnrtpStream;}/** * 请求RTX重传(当Consumer请求重传时) * @param ssrc RTP流的SSRC * @param sequenceNumber 要重传的序列号 */voidProducer::requestRtx(uint32_tssrc,uint16_tsequenceNumber){// 1. 获取RTP流autoit=rtpStreamMap.find(ssrc);if(it==rtpStreamMap.end()){return;// 不存在该流}RtpStream*rtpStream=it->second;// 2. 检查序列号是否在范围内if(sequenceNumber<rtpStream->getFirstSequenceNumber()||sequenceNumber>rtpStream->getLastSequenceNumber()){return;// 序列号超出范围}// 3. 获取要重传的包RtpPacket*packet=rtpStream->getPacket(sequenceNumber);if(!packet){return;// 未找到包}// 4. 创建RTX包并发送RtpPacket*rtxPacket=createRtxPacket(packet);sendRtxPacket(rtxPacket);}/** * 发送RTP流到Consumer(用于新Consumer建立连接) * @param consumer 要发送的Consumer */voidProducer::sendRtpStreamToConsumer(Consumer*consumer){// 1. 遍历所有RTP流for(auto&pair:rtpStreamMap){RtpStream*rtpStream=pair.second;// 2. 获取当前RTP流的最新包RtpPacket*latestPacket=rtpStream->getLastPacket();if(latestPacket){// 3. 发送最新包给Consumerconsumer->onRtpPacket(latestPacket);}}}

5.5.3. Consumer核心处理逻辑(C++层)

// 文件: src/Consumer.cpp/** * 处理接收到的RTP包 * @param packet 接收到的RTP包 */voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计信息updateStatistics(packet);// 2. 检查是否需要NACK(丢包重传请求)if(isPacketLost(packet->sequenceNumber)){requestNack(packet->sequenceNumber());// 请求重传}// 3. 将RTP包转发给WebRtcTransporttransport->sendRtpPacket(packet);}/** * 检查序列号是否丢失 * @param sequenceNumber 要检查的序列号 * @return 是否丢失 */boolConsumer::isPacketLost(uint16_tsequenceNumber){// 1. 检查是否是第一个包if(firstSequenceNumber==-1){firstSequenceNumber=sequenceNumber;returnfalse;}// 2. 检查序列号是否连续if(sequenceNumber==nextExpectedSequenceNumber){nextExpectedSequenceNumber=(sequenceNumber+1)%65536;returnfalse;}// 3. 如果序列号不是下一个,说明有丢包returntrue;}/** * 请求NACK重传(发送NACK请求给Producer) * @param sequenceNumber 丢失的序列号 */voidConsumer::requestNack(uint16_tsequenceNumber){// 1. 创建NACK包RtcpPacket*nackPacket=createNackPacket(sequenceNumber);// 2. 发送NACK包到Producertransport->sendRtcpPacket(nackPacket);}/** * 处理新RTP流(当Producer有新流时) * @param rtpStream 新的RTP流 */voidConsumer::onNewRtpStream(RtpStream*rtpStream){// 1. 将RTP流添加到Consumer的RTP流集合rtpStreamMap[rtpStream->ssrc()]=rtpStream;// 2. 通知应用层emit("newRtpStream",rtpStream);}

5.5.4. RTP流管理核心逻辑(C++层)

// 文件: src/RtpStream.cpp/** * RtpStream构造函数 * @param ssrc RTP流的SSRC * @param firstPacket 首个RTP包 */RtpStream::RtpStream(uint32_tssrc,RtpPacket*firstPacket):ssrc(ssrc),firstSequenceNumber(firstPacket->sequenceNumber()),lastSequenceNumber(firstPacket->sequenceNumber()){// 1. 将首个包加入队列packetList.push(firstPacket);}/** * 添加RTP包到流 * @param packet 要添加的RTP包 */voidRtpStream::addPacket(RtpPacket*packet){// 1. 检查序列号是否连续if(packet->sequenceNumber()==lastSequenceNumber+1){// 2. 序列号连续,添加到队列末尾packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}elseif(packet->sequenceNumber()>lastSequenceNumber){// 3. 序列号跳跃,可能有丢包// 4. 但不处理,先存储packetList.push(packet);lastSequenceNumber=packet->sequenceNumber();}else{// 5. 序列号回绕(超过65535)if(packet->sequenceNumber()<firstSequenceNumber){// 6. 重传包,直接覆盖packetList.push(packet);}}}/** * 获取指定序列号的RTP包 * @param sequenceNumber 要获取的序列号 * @return RtpPacket指针,若不存在返回nullptr */RtpPacket*RtpStream::getPacket(uint16_tsequenceNumber){// 1. 遍历包队列for(auto&packet:packetList){if(packet->sequenceNumber()==sequenceNumber){returnpacket;}}returnnullptr;}/** * 获取最新RTP包 * @return 最新RTP包指针 */RtpPacket*RtpStream::getLastPacket(){if(!packetList.empty()){returnpacketList.back();}returnnullptr;}

🌟 5.6、关键机制解析

5.6.1. RTX(重传)机制

ConsumerRouterProducer发送NACK请求请求RTX重传发送RTX包转发RTX包ConsumerRouterProducer

工作流程

  1. Consumer检测到丢包(序列号不连续)
  2. Consumer发送NACK请求给Router
  3. Router将NACK转发给Producer
  4. Producer生成RTX包(包含原始包信息)
  5. Router将RTX包转发给Consumer

5.6.2. NACK(丢包重传请求)机制

ConsumerRouterProducer检测到丢包发送NACK包请求重传发送RTX包转发RTX包ConsumerRouterProducer

关键点

  • NACK是Consumer向Producer请求重传的机制
  • Router作为中介转发NACK请求
  • Producer生成RTX包(重传包)并发送

5.6.3. 流量控制机制

// 在Consumer::onRtpPacket()中voidConsumer::onRtpPacket(RtpPacket*packet){// 1. 更新接收统计updateStatistics(packet);// 2. 检查是否超过带宽限制if(isBandwidthExceeded()){// 3. 请求降级(如降低码率)requestBandwidthReduction();return;}// 4. 正常转发transport->sendRtpPacket(packet);}

机制说明

  • Consumer实时监控接收带宽
  • 超过阈值时请求Producer降级
  • 通过RTCP反馈机制实现

📊 5.7、关键数据结构关系图

mapProducerConsumers
mapConsumerProducer
rtpStreamMap
rtpStreamMap
packetList
Router
Producer
Consumer
RtpStream
RtpPacket

关系说明

  1. mapProducerConsumers:Router维护Producer-Consumer映射(一对多)
  2. mapConsumerProducer:Router维护Consumer-Producer映射(一对一)
  3. rtpStreamMap:Producer/Consumer存储RTP流
  4. packetList:RtpStream存储RTP包队列

💡 5.8、这样设计的优势

  1. 选择性转发:Router仅转发Consumer需要的RTP包,避免全量复制

    • 4人视频会议:4个Producer → 4×3=12个RTP包 → 实际转发12个包(不是48个)
  2. RTX重传优化:只重传丢失的包,而非整个流

    • 丢包率10% → 仅重传10%的包,而非100%
  3. 内存高效:RtpStream仅存储最近的RTP包(滑动窗口)

    • 通常只存储100个包(约2秒视频数据)
  4. 事件驱动:通过EnhancedEventEmitter实现解耦

    • Producer、Consumer、Router之间通过事件通信

🌟 5.9、总结:RTP流处理全生命周期

发送RTP包
onTransportProducerRtpPacketReceived
存储RTP包
遍历Consumer
处理RTP包
发送RTP包
检测丢包
转发
请求RTX
发送RTX包
转发
推流客户端
WebRtcTransport
Router
Producer
Consumer
WebRtcTransport
观看客户端
NACK请求

完整生命周期

  1. 推流客户端发送RTP包 → WebRtcTransport
  2. Router接收RTP包,存储到Producer
  3. Router遍历Consumer列表,转发RTP包
  4. Consumer处理RTP包,检测丢包
  5. Consumer请求NACK重传 → Router转发
  6. Producer生成RTX包 → Router转发给Consumer
  7. 观看客户端接收RTP包并渲染
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:20:50

Min浏览器:重新定义移动端隐私浏览体验的轻量级解决方案

在数字隐私日益受到重视的今天&#xff0c;Min浏览器以其极简设计和强大的隐私保护功能&#xff0c;为用户提供了全新的浏览体验。这款专注于隐私保护的轻量级浏览器&#xff0c;正在为移动端用户打造更安全的上网环境。 【免费下载链接】min A fast, minimal browser that pro…

作者头像 李华
网站建设 2026/4/15 22:48:32

java计算机毕业设计社区老人健康服务跟踪系统 基于SpringBoot的社区长者智慧健康照护平台 JavaWeb社区老年健康动态跟踪与干预系统

计算机毕业设计社区老人健康服务跟踪系统t86i39&#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。居家老人最怕的是“身体有状况&#xff0c;子女不在场&#xff1b;体检报告看不懂…

作者头像 李华
网站建设 2026/4/15 12:46:42

java计算机毕业设计社区人员信息管理系统设计与实现 基于SpringBoot的社区居民档案智慧管理平台 JavaWeb社区人口信息综合服务平台

计算机毕业设计社区人员信息管理系统设计与实现0146g9&#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。社区网格员最头疼的就是“人户分离”&#xff1a;Excel 里 3000 条记录&…

作者头像 李华
网站建设 2026/4/15 17:00:54

腾讯混元3D-Part:AI技术如何重塑游戏美术生产的未来格局

在数字内容创作领域&#xff0c;3D资产制作一直是制约游戏开发效率的关键瓶颈。传统制作流程中&#xff0c;美术师需要在十余个专业软件间反复切换&#xff0c;一个中等复杂度的角色模型从概念设计到最终绑定动画往往需要数周时间。腾讯混元3D-Part的出现&#xff0c;通过七大A…

作者头像 李华
网站建设 2026/4/8 21:33:42

Linux内核动态调试终极指南:从入门到实战精通

Linux内核动态调试终极指南&#xff1a;从入门到实战精通 【免费下载链接】linux Linux kernel source tree 项目地址: https://gitcode.com/GitHub_Trending/li/linux 还在为Linux内核崩溃后无从下手而苦恼&#xff1f;面对系统卡顿、死锁、内存泄漏等棘手问题&#xf…

作者头像 李华
网站建设 2026/4/16 10:40:15

12、Puppet模块使用与开发全解析

Puppet模块使用与开发全解析 1. Puppet Forge模块使用原则 在使用Puppet管理应用时,通常应用需要数据库来存储状态,以及用户凭证来访问它。以创建 cat_pictures 数据库并设置 greebo 用户账户访问为例,Puppet可以轻松完成这些操作,而 mysql 模块能让配置变得非常简…

作者头像 李华