news 2026/4/16 20:04:56

基于librtmp库封装拉流动态库

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
基于librtmp库封装拉流动态库

一 概述

该文章主要是实现对librtmp库的封装,封装一个动态库,作为以后实现拉流程序的依赖库.

二 代码实现

1.rtmp_pull.h实现

#ifndef RTMP_PULL_H #define RTMP_PULL_H #include <stdint.h> #include <stdbool.h> /************************** 跨平台导出宏 **************************/ // 编译动态库时定义RTMP_PULL_EXPORTS,使用时不定义 #ifdef _WIN32 #ifdef RTMP_PULL_EXPORTS #define RTMP_PULL_API __declspec(dllexport) #else #define RTMP_PULL_API __declspec(dllimport) #endif #else #define RTMP_PULL_API __attribute__((visibility("default"))) #endif /************************** 基础枚举定义 **************************/ // RTMP拉流状态(生命周期全覆盖) typedef enum { RTMP_PULL_STATE_UNINIT = 0, // 未初始化(仅创建句柄,未设置URL) RTMP_PULL_STATE_CONFIGED = 1, // 已配置(设置了URL/参数,未开始拉流) RTMP_PULL_STATE_CONNECTING = 2,// 连接中(正在与RTMP服务器握手/连接) RTMP_PULL_STATE_STREAMING = 3, // 拉流中(正常接收数据) RTMP_PULL_STATE_PAUSED = 4, // 已暂停(连接未断,暂时停止接收) RTMP_PULL_STATE_STOPPED = 5, // 已停止(主动调用停止,连接断开) RTMP_PULL_STATE_ERROR = 6 // 错误状态(任意步骤出错,需销毁重建) } RTMPPullState; // RTMP包类型(与librtmp原生类型对应,对外屏蔽librtmp枚举) typedef enum { RTMP_PULL_PACKET_VIDEO = 0, // 视频包(H.264/H.265等裸流) RTMP_PULL_PACKET_AUDIO = 1, // 音频包(AAC/MP3等裸流) RTMP_PULL_PACKET_SCRIPT = 2,// 脚本包(AMF元数据,如宽高/码率/帧率) RTMP_PULL_PACKET_OTHER = 3 // 其他类型包(暂不处理) } RTMPPullPacketType; // 错误码(覆盖所有接口的常见错误,便于问题排查) typedef enum { RTMP_PULL_ERR_OK = 0, // 成功 RTMP_PULL_ERR_PARAM = -1, // 参数错误(空指针/无效值) RTMP_PULL_ERR_MEM = -2, // 内存分配失败 RTMP_PULL_ERR_URL = -3, // URL无效/格式错误 RTMP_PULL_ERR_WSA = -4, // Windows Socket初始化失败 RTMP_PULL_ERR_CONNECT = -5, // 连接RTMP服务器失败 RTMP_PULL_ERR_HANDSHAKE = -6,// RTMP握手失败 RTMP_PULL_ERR_STREAM = -7, // 连接RTMP流失败(流不存在/权限问题) RTMP_PULL_ERR_READ = -8, // 读取数据失败 RTMP_PULL_ERR_TIMEOUT = -9, // 操作超时 RTMP_PULL_ERR_STATE = -10, // 状态错误(如未配置就开始拉流) RTMP_PULL_ERR_LOCK = -11, // 线程锁操作失败 RTMP_PULL_ERR_LIBRTMP = -12, // librtmp底层调用失败 RTMP_PULL_ERR_UNKNOWN = -99 // 未知错误 } RTMPPullErrCode; /************************** 对外数据结构 **************************/ // RTMP拉流包(用户读取数据的核心结构,包含裸数据+元信息) // 注意:数据指针为库内部缓冲区,用户**无需释放**,下次read会覆盖 typedef struct { RTMPPullPacketType type; // 包类型(视频/音频/脚本) uint32_t timestamp; // 时间戳(ms,相对时间) uint8_t* data; // 库内部分配可写缓冲区 uint32_t data_len; // 裸数据长度(字节) uint32_t buf_size; // 缓冲区实际大小(用于上层越界判断) } RTMPPullPacket; // RTMP流信息(拉流成功后可获取,包含基础媒体信息) typedef struct { char url[256]; // 拉流URL int width; // 视频宽(0表示无视频) int height; // 视频高(0表示无视频) int fps; // 视频帧率(0表示无视频) int audio_sample_rate; // 音频采样率(0表示无音频) int audio_channels; // 音频通道数(0表示无音频) uint64_t recv_bytes; // 已接收总字节数 uint64_t recv_packets; // 已接收总包数 } RTMPPullStreamInfo; /************************** 不透明句柄 **************************/ // 对外仅暴露句柄类型,内部结构完全封装,避免用户直接操作 typedef void* RTMPPullHandle; /************************** 对外核心接口(全功能) **************************/ #ifdef __cplusplus extern "C" { #endif /** * @brief 创建RTMP拉流句柄(初始化基础资源) * @return 成功返回句柄,失败返回NULL(可通过系统errno排查,一般是内存不足) */ RTMP_PULL_API RTMPPullHandle rtmp_pull_create(void); /** * @brief 销毁RTMP拉流句柄(释放所有资源,包括连接/缓冲区/锁) * @param handle 拉流句柄(不可为NULL) */ RTMP_PULL_API void rtmp_pull_destroy(RTMPPullHandle handle); /** * @brief 设置RTMP拉流URL(核心配置,必须在start前调用) * @param handle 拉流句柄 * @param url RTMP拉流URL(如rtmp://xxx/live/stream,不可为NULL/空) * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_set_url(RTMPPullHandle handle, const char* url); /** * @brief 设置操作超时时间(连接/读取的超时时间,默认3000ms) * @param handle 拉流句柄 * @param timeout_ms 超时时间(ms,必须>0,建议1000-10000) * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_set_timeout(RTMPPullHandle handle, int timeout_ms); /** * @brief 设置内部接收缓冲区大小(默认4096*1024=4M) * @param handle 拉流句柄 * @param buffer_size 缓冲区大小(字节,必须>1024,建议1M-16M) * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_set_buffer_size(RTMPPullHandle handle, uint32_t buffer_size); /** * @brief 开始RTMP拉流(建立连接+握手+订阅流,阻塞直到连接成功/失败/超时) * @param handle 拉流句柄(必须先设置URL) * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_start(RTMPPullHandle handle); /** * @brief 停止RTMP拉流(断开连接,释放流资源,句柄可重新配置后再次start) * @param handle 拉流句柄 * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_stop(RTMPPullHandle handle); /** * @brief 暂停RTMP拉流(连接不中断,暂时停止接收数据) * @param handle 拉流句柄(必须处于拉流中状态) * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_pause(RTMPPullHandle handle); /** * @brief 恢复RTMP拉流(从暂停状态恢复接收数据) * @param handle 拉流句柄(必须处于暂停状态) * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_resume(RTMPPullHandle handle); /** * @brief 读取RTMP包(阻塞式,直到读到数据/超时/出错) * @param handle 拉流句柄(必须处于拉流中状态) * @param packet 输出参数,接收的RTMP包(用户无需释放内部data) * @return 成功返回RTMP_PULL_ERR_OK,超时返回RTMP_PULL_ERR_TIMEOUT,其他为错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_read_packet(RTMPPullHandle handle, RTMPPullPacket* packet); /** * @brief 获取当前拉流状态 * @param handle 拉流句柄 * @return 成功返回当前状态,失败返回RTMP_PULL_STATE_UNINIT */ RTMP_PULL_API RTMPPullState rtmp_pull_get_state(RTMPPullHandle handle); /** * @brief 获取最后一次错误的描述信息 * @param handle 拉流句柄 * @param buf 输出缓冲区,存储错误信息 * @param buf_len 缓冲区长度 * @return 成功返回buf,失败返回NULL */ RTMP_PULL_API char* rtmp_pull_get_last_error(RTMPPullHandle handle, char* buf, int buf_len); /** * @brief 获取当前拉流的媒体信息(拉流成功后有效) * @param handle 拉流句柄 * @param info 输出参数,流信息 * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_get_stream_info(RTMPPullHandle handle, RTMPPullStreamInfo* info); /** * @brief 判断是否已成功连接到RTMP服务器 * @param handle 拉流句柄 * @return 已连接返回true,否则返回false */ RTMP_PULL_API bool rtmp_pull_is_connected(RTMPPullHandle handle); /** * @brief 刷新内部接收缓冲区(清空未读取的数据) * @param handle 拉流句柄 * @return 成功返回RTMP_PULL_ERR_OK,失败返回对应错误码 */ RTMP_PULL_API RTMPPullErrCode rtmp_pull_flush_buffer(RTMPPullHandle handle); #ifdef __cplusplus } #endif #endif // RTMP_PULL_H

2.rtmp_pull.cpp实现

#include "rtmp_pull.h" #include <librtmp/rtmp.h> #include <librtmp/amf.h> #include <string.h> #include <stdlib.h> #include <stdio.h> #include <stdarg.h> #include <errno.h> // 1. 新增包缓冲区大小宏(推荐4MB,兼容超大I帧) #define RTMP_PULL_PACKET_BUF_SIZE 4 * 1024 * 1024 // 4MB包缓冲区 /************************** 跨平台线程锁 **************************/ #ifdef _WIN32 #include <winsock2.h> #include <windows.h> #define MUTEX_HANDLE CRITICAL_SECTION #define MUTEX_INIT(m) InitializeCriticalSection(&m) #define MUTEX_LOCK(m) EnterCriticalSection(&m) #define MUTEX_UNLOCK(m) LeaveCriticalSection(&m) #define MUTEX_DESTROY(m) DeleteCriticalSection(&m) #pragma comment(lib, "ws2_32.lib") // 链接Windows Socket库 #else #include <pthread.h> #include <unistd.h> #define MUTEX_HANDLE pthread_mutex_t #define MUTEX_INIT(m) pthread_mutex_init(&m, NULL) #define MUTEX_LOCK(m) pthread_mutex_lock(&m) #define MUTEX_UNLOCK(m) pthread_mutex_unlock(&m) #define MUTEX_DESTROY(m) pthread_mutex_destroy(&m) #endif /************************** 内部核心结构体(完全封装) **************************/ typedef struct { RTMP* rtmp; // librtmp原生句柄 char url[256]; // 拉流URL int timeout_ms; // 超时时间(ms) uint32_t buffer_size; // 内部接收缓冲区大小 uint8_t* buffer; // 接收数据缓冲区 RTMPPullState state; // 当前拉流状态 RTMPPullErrCode last_err; // 最后一次错误码 char last_err_msg[512]; // 最后一次错误描述 MUTEX_HANDLE mutex; // 线程互斥锁(保护所有成员) RTMPPacket recv_pkt; // librtmp接收包缓冲区 RTMPPullStreamInfo stream_info; // 流信息 bool wsa_inited; // Windows WSA是否初始化 uint8_t* pkt_buf; // 【新增】对外包专用可写缓冲区 } RTMPPullContext; /************************** 内部工具函数 **************************/ // 设置错误信息(加锁保护) static void set_error(RTMPPullContext* ctx, RTMPPullErrCode err, const char* fmt, ...) { if (!ctx) return; va_list ap; va_start(ap, fmt); vsnprintf(ctx->last_err_msg, sizeof(ctx->last_err_msg)-1, fmt, ap); va_end(ap); ctx->last_err = err; // ctx->state = RTMP_PULL_STATE_ERROR; } // 重置上下文(保留基础配置,清空状态/错误/流信息) static void reset_context(RTMPPullContext* ctx) { if (!ctx) return; if (ctx->rtmp) { RTMP_Close(ctx->rtmp); RTMP_Free(ctx->rtmp); ctx->rtmp = NULL; } RTMPPacket_Reset(&ctx->recv_pkt); memset(&ctx->stream_info, 0, sizeof(ctx->stream_info)); strncpy(ctx->stream_info.url, ctx->url, sizeof(ctx->stream_info.url)-1); ctx->state = RTMP_PULL_STATE_CONFIGED; ctx->last_err = RTMP_PULL_ERR_OK; memset(ctx->last_err_msg, 0, sizeof(ctx->last_err_msg)); } /************************** 对外接口实现 **************************/ RTMP_PULL_API RTMPPullHandle rtmp_pull_create(void) { RTMPPullContext* ctx = (RTMPPullContext*)malloc(sizeof(RTMPPullContext)); if (!ctx) { return NULL; } memset(ctx, 0, sizeof(RTMPPullContext)); // 初始化线程锁 if (MUTEX_INIT(ctx->mutex) != 0) { free(ctx); return NULL; } // 初始化默认配置 ctx->timeout_ms = 3000; ctx->buffer_size = 4 * 1024 * 1024; // 4M默认缓冲区 ctx->buffer = (uint8_t*)malloc(ctx->buffer_size); if (!ctx->buffer) { MUTEX_DESTROY(ctx->mutex); free(ctx); return NULL; } // 【关键新增】初始化4MB对外包专用可写缓冲区 ctx->pkt_buf = (uint8_t*)malloc(RTMP_PULL_PACKET_BUF_SIZE); if (!ctx->pkt_buf) { // 内存分配失败,释放已分配资源 free(ctx->buffer); MUTEX_DESTROY(ctx->mutex); free(ctx); return NULL; } memset(ctx->pkt_buf, 0, RTMP_PULL_PACKET_BUF_SIZE); // 初始化缓冲区 // Windows WSA初始化(原有逻辑,不变) #ifdef _WIN32 WSADATA wsa_data; if (WSAStartup(MAKEWORD(2, 2), &wsa_data) != 0) { set_error(ctx, RTMP_PULL_ERR_WSA, "WSAStartup failed, err: %d", WSAGetLastError()); ctx->wsa_inited = false; } else { ctx->wsa_inited = true; } #endif // 初始化librtmp包(原有逻辑,不变) RTMPPacket_Alloc(&ctx->recv_pkt, ctx->buffer_size); RTMPPacket_Reset(&ctx->recv_pkt); ctx->state = RTMP_PULL_STATE_UNINIT; ctx->last_err = RTMP_PULL_ERR_OK; memset(ctx->last_err_msg, 0, sizeof(ctx->last_err_msg)); return (RTMPPullHandle)ctx; } RTMP_PULL_API void rtmp_pull_destroy(RTMPPullHandle handle) { if (!handle) return; RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); // 停止拉流并释放librtmp资源(原有逻辑,不变) reset_context(ctx); // 释放缓冲区(原有逻辑,不变) if (ctx->buffer) { free(ctx->buffer); ctx->buffer = NULL; } // 【关键新增】释放对外包专用缓冲区 if (ctx->pkt_buf) { free(ctx->pkt_buf); ctx->pkt_buf = NULL; } // 释放librtmp包(原有逻辑,不变) RTMPPacket_Free(&ctx->recv_pkt); // Windows WSA清理(原有逻辑,不变) #ifdef _WIN32 if (ctx->wsa_inited) { WSACleanup(); ctx->wsa_inited = false; } #endif MUTEX_UNLOCK(ctx->mutex); // 销毁锁并释放上下文(原有逻辑,不变) MUTEX_DESTROY(ctx->mutex); free(ctx); } RTMP_PULL_API RTMPPullErrCode rtmp_pull_set_url(RTMPPullHandle handle, const char* url) { if (!handle || !url || strlen(url) == 0 || strlen(url) >= 256) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); strncpy(ctx->url, url, sizeof(ctx->url)-1); ctx->state = RTMP_PULL_STATE_CONFIGED; strncpy(ctx->stream_info.url, url, sizeof(ctx->stream_info.url)-1); set_error(ctx, RTMP_PULL_ERR_OK, "set url success"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_set_timeout(RTMPPullHandle handle, int timeout_ms) { if (!handle || timeout_ms <= 0) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); ctx->timeout_ms = timeout_ms; set_error(ctx, RTMP_PULL_ERR_OK, "set timeout success: %dms", timeout_ms); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_set_buffer_size(RTMPPullHandle handle, uint32_t buffer_size) { if (!handle || buffer_size <= 1024) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); // 重新分配缓冲区 uint8_t* new_buf = (uint8_t*)realloc(ctx->buffer, buffer_size); if (!new_buf) { set_error(ctx, RTMP_PULL_ERR_MEM, "realloc buffer failed, size: %u", buffer_size); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_MEM; } ctx->buffer = new_buf; ctx->buffer_size = buffer_size; // 重新分配librtmp包缓冲区 RTMPPacket_Free(&ctx->recv_pkt); RTMPPacket_Alloc(&ctx->recv_pkt, buffer_size); set_error(ctx, RTMP_PULL_ERR_OK, "set buffer size success: %u bytes", buffer_size); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_start(RTMPPullHandle handle) { if (!handle) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); // 状态校验 if (ctx->state != RTMP_PULL_STATE_CONFIGED) { set_error(ctx, RTMP_PULL_ERR_STATE, "start failed, invalid state: %d", ctx->state); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_STATE; } // 重置之前的资源 reset_context(ctx); ctx->state = RTMP_PULL_STATE_CONNECTING; // 创建librtmp句柄并初始化 ctx->rtmp = RTMP_Alloc(); if (!ctx->rtmp) { set_error(ctx, RTMP_PULL_ERR_MEM, "RTMP_Alloc failed"); ctx->state = RTMP_PULL_STATE_ERROR; MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_MEM; } RTMP_Init(ctx->rtmp); ctx->rtmp->Link.timeout = ctx->timeout_ms / 1000; // librtmp超时为秒 RTMP_SetupURL(ctx->rtmp, ctx->url); #if 0 RTMP_SetStreamID(ctx->rtmp, 0); ctx->rtmp->Link.flags |= RTMP_FLAG_LIVE; // 拉取直播流(关键) #else ctx->rtmp->m_stream_id = 0; ctx->rtmp->Link.lFlags |= RTMP_LF_LIVE; #endif // RTMP_EnableWrite(ctx->rtmp); // 仅拉流,禁用写 // 建立连接 if (!RTMP_Connect(ctx->rtmp, NULL)) { set_error(ctx, RTMP_PULL_ERR_CONNECT, "RTMP_Connect failed, url: %s", ctx->url); ctx->state = RTMP_PULL_STATE_ERROR; MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_CONNECT; } // 连接流 if (!RTMP_ConnectStream(ctx->rtmp, 0)) { set_error(ctx, RTMP_PULL_ERR_STREAM, "RTMP_ConnectStream failed"); RTMP_Close(ctx->rtmp); ctx->rtmp = NULL; ctx->state = RTMP_PULL_STATE_ERROR; MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_STREAM; } // 拉流成功,更新状态 ctx->state = RTMP_PULL_STATE_STREAMING; set_error(ctx, RTMP_PULL_ERR_OK, "start pull success, url: %s", ctx->url); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_stop(RTMPPullHandle handle) { if (!handle) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); if (ctx->state != RTMP_PULL_STATE_STREAMING && ctx->state != RTMP_PULL_STATE_PAUSED) { set_error(ctx, RTMP_PULL_ERR_STATE, "stop failed, invalid state: %d", ctx->state); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_STATE; } // 重置上下文(断开连接,释放librtmp资源) reset_context(ctx); set_error(ctx, RTMP_PULL_ERR_OK, "stop pull success"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_pause(RTMPPullHandle handle) { if (!handle) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); if (ctx->state != RTMP_PULL_STATE_STREAMING) { set_error(ctx, RTMP_PULL_ERR_STATE, "pause failed, invalid state: %d", ctx->state); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_STATE; } ctx->state = RTMP_PULL_STATE_PAUSED; set_error(ctx, RTMP_PULL_ERR_OK, "pause pull success"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_resume(RTMPPullHandle handle) { if (!handle) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); if (ctx->state != RTMP_PULL_STATE_PAUSED) { set_error(ctx, RTMP_PULL_ERR_STATE, "resume failed, invalid state: %d", ctx->state); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_STATE; } ctx->state = RTMP_PULL_STATE_STREAMING; set_error(ctx, RTMP_PULL_ERR_OK, "resume pull success"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_read_packet(RTMPPullHandle handle, RTMPPullPacket* packet) { // 校验1:句柄和packet都不能为空(上层传参错误) if (!handle || !packet) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); // 校验2:拉流状态必须为STREAMING(非法状态调用) if (ctx->state != RTMP_PULL_STATE_STREAMING) { set_error(ctx, RTMP_PULL_ERR_STATE, "read failed, invalid state: %d", ctx->state); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_STATE; } // 校验3:librtmp句柄不能为空(内部资源异常) if (!ctx->rtmp) { set_error(ctx, RTMP_PULL_ERR_LIBRTMP, "read failed, rtmp handle is NULL"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_LIBRTMP; } // 阻塞读取RTMP包(librtmp原生接口) int ret = RTMP_ReadPacket(ctx->rtmp, &ctx->recv_pkt); if (ret == 0) { // 超时/无数据 set_error(ctx, RTMP_PULL_ERR_TIMEOUT, "read packet timeout"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_TIMEOUT; } else if (ret < 0) { // 读取失败 set_error(ctx, RTMP_PULL_ERR_READ, "read packet failed, ret: %d", ret); ctx->state = RTMP_PULL_STATE_ERROR; MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_READ; } // 校验4:librtmp包体不能为空、包长度必须大于0(异常包) if (!ctx->recv_pkt.m_body || ctx->recv_pkt.m_nBodySize == 0) { set_error(ctx, RTMP_PULL_ERR_LIBRTMP, "read empty packet, body is NULL or len 0"); RTMPPacket_Reset(&ctx->recv_pkt); // 重置包,避免影响下一次读取 MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_LIBRTMP; } // 校验5:包长度不能超过库内部预分配的缓冲区(避免越界) if (ctx->recv_pkt.m_nBodySize > RTMP_PULL_PACKET_BUF_SIZE) { set_error(ctx, RTMP_PULL_ERR_READ, "packet too large, len: %u > buf size: %u", ctx->recv_pkt.m_nBodySize, RTMP_PULL_PACKET_BUF_SIZE); RTMPPacket_Reset(&ctx->recv_pkt); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_READ; } // 【核心】安全拷贝:将librtmp包体拷贝到库内部可写的pkt_buf memcpy(ctx->pkt_buf, ctx->recv_pkt.m_body, ctx->recv_pkt.m_nBodySize); // 初始化对外packet结构体,指向库内部可写缓冲区 memset(packet, 0, sizeof(RTMPPullPacket)); switch (ctx->recv_pkt.m_packetType) { case RTMP_PACKET_TYPE_VIDEO: packet->type = RTMP_PULL_PACKET_VIDEO; break; case RTMP_PACKET_TYPE_AUDIO: packet->type = RTMP_PULL_PACKET_AUDIO; break; case RTMP_PACKET_TYPE_INFO: packet->type = RTMP_PULL_PACKET_SCRIPT; break; default: packet->type = RTMP_PULL_PACKET_OTHER; break; } packet->timestamp = ctx->recv_pkt.m_nTimeStamp; packet->data = ctx->pkt_buf; // 指向库内部可写缓冲区 packet->data_len = ctx->recv_pkt.m_nBodySize; // 实际数据长度 packet->buf_size = RTMP_PULL_PACKET_BUF_SIZE; // 缓冲区总大小 // 更新流统计信息 ctx->stream_info.recv_bytes += packet->data_len; ctx->stream_info.recv_packets += 1; // 重置librtmp包,准备下一次读取 RTMPPacket_Reset(&ctx->recv_pkt); set_error(ctx, RTMP_PULL_ERR_OK, "read packet success, type: %d, len: %u", packet->type, packet->data_len); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API RTMPPullState rtmp_pull_get_state(RTMPPullHandle handle) { if (!handle) { return RTMP_PULL_STATE_UNINIT; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); RTMPPullState state = ctx->state; MUTEX_UNLOCK(ctx->mutex); return state; } RTMP_PULL_API char* rtmp_pull_get_last_error(RTMPPullHandle handle, char* buf, int buf_len) { if (!handle || !buf || buf_len <= 0) { return NULL; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); strncpy(buf, ctx->last_err_msg, buf_len-1); buf[buf_len-1] = '\0'; MUTEX_UNLOCK(ctx->mutex); return buf; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_get_stream_info(RTMPPullHandle handle, RTMPPullStreamInfo* info) { if (!handle || !info) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); memcpy(info, &ctx->stream_info, sizeof(RTMPPullStreamInfo)); set_error(ctx, RTMP_PULL_ERR_OK, "get stream info success"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; } RTMP_PULL_API bool rtmp_pull_is_connected(RTMPPullHandle handle) { if (!handle) { return false; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); bool connected = (ctx->state == RTMP_PULL_STATE_STREAMING || ctx->state == RTMP_PULL_STATE_PAUSED) && ctx->rtmp != NULL; MUTEX_UNLOCK(ctx->mutex); return connected; } RTMP_PULL_API RTMPPullErrCode rtmp_pull_flush_buffer(RTMPPullHandle handle) { if (!handle) { return RTMP_PULL_ERR_PARAM; } RTMPPullContext* ctx = (RTMPPullContext*)handle; MUTEX_LOCK(ctx->mutex); // 重置librtmp包,清空缓冲区 RTMPPacket_Reset(&ctx->recv_pkt); memset(ctx->buffer, 0, ctx->buffer_size); set_error(ctx, RTMP_PULL_ERR_OK, "flush buffer success"); MUTEX_UNLOCK(ctx->mutex); return RTMP_PULL_ERR_OK; }

3.CMakeLists.txt实现

# 最小CMake版本(兼容大部分环境) cmake_minimum_required(VERSION 3.10) # 项目名称、版本、支持语言(C/C++) project(rtmp_pull LANGUAGES C CXX VERSION 1.0.0) # 定义编译选项 set(CMAKE_C_STANDARD 99) set(CMAKE_CXX_STANDARD 11) set(CMAKE_C_STANDARD_REQUIRED ON) set(CMAKE_CXX_STANDARD_REQUIRED ON) # 跨平台编译选项:开启警告、PIC(位置无关代码,动态库必须) add_compile_options( -Wall -Wextra -fPIC ) # 调试模式配置(核心:保留调试符号、关闭优化) if(CMAKE_BUILD_TYPE STREQUAL "Debug") message(STATUS "Build mode: Debug (enable -g -O0, debug symbols)") add_compile_options( -g -O0 -ggdb # 增强gdb调试信息 ) # 调试版本库名加d(如librtmppull-d.so、rtmppull-d.dll) set(CMAKE_DEBUG_POSTFIX d) else() message(STATUS "Build mode: Release (enable -O2, optimize)") add_compile_options(-O2) endif() # 定义导出宏(编译动态库时使用) add_definitions(-DRTMP_PULL_EXPORTS) # ************************** 查找依赖库 ************************** # 1. 查找zlib(librtmp底层依赖) find_package(ZLIB REQUIRED) if(ZLIB_FOUND) message(STATUS "Found ZLIB: ${ZLIB_INCLUDE_DIRS} ${ZLIB_LIBRARIES}") else() message(FATAL_ERROR "ZLIB not found! Please install zlib-dev/zlib-devel first.") endif() # 2. 查找OpenSSL(librtmp底层依赖,加密握手) find_package(OpenSSL REQUIRED) if(OPENSSL_FOUND) message(STATUS "Found OpenSSL: ${OPENSSL_INCLUDE_DIR} ${OPENSSL_LIBRARIES}") else() message(FATAL_ERROR "OpenSSL not found! Please install libssl-dev/openssl-devel first.") endif() # 3. 查找librtmp(核心依赖,手动查找因为无官方CMake配置) # 手动指定librtmp路径(如果未安装到系统,用户可手动设置:-DRTMP_INCLUDE_DIR=/path/to/rtmp/include -DRTMP_LIBRARY=/path/to/librtmp.so) find_path(RTMP_INCLUDE_DIR NAMES rtmp.h PATHS /usr/include/librtmp /usr/local/include/librtmp) find_library(RTMP_LIBRARY NAMES rtmp librtmp PATHS /usr/lib /usr/local/lib /usr/lib64 /usr/local/lib64) if(RTMP_INCLUDE_DIR AND RTMP_LIBRARY) message(STATUS "Found librtmp: ${RTMP_INCLUDE_DIR} ${RTMP_LIBRARY}") else() message(FATAL_ERROR "librtmp not found! \n" "Please install librtmp-dev/librtmp-devel first, or manually set: \n" "cmake .. -DRTMP_INCLUDE_DIR=/path/to/rtmp/include -DRTMP_LIBRARY=/path/to/librtmp.so") endif() # ************************** 收集源文件 ************************** # 头文件目录(对外头文件+依赖头文件) include_directories( ${PROJECT_SOURCE_DIR}/include ${RTMP_INCLUDE_DIR} ${ZLIB_INCLUDE_DIRS} ${OPENSSL_INCLUDE_DIR} ) # 源文件 file(GLOB SRC_FILES ${PROJECT_SOURCE_DIR}/src/*.cpp ${PROJECT_SOURCE_DIR}/src/*.c) # ************************** 生成动态库 ************************** # 生成跨平台动态库(SHARED) add_library(${PROJECT_NAME} SHARED ${SRC_FILES}) # 设置动态库版本(SOVERSION:主版本,VERSION:完整版本) set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION ${PROJECT_VERSION_MAJOR} DEBUG_POSTFIX ${CMAKE_DEBUG_POSTFIX} ) # 链接依赖库(跨平台兼容) target_link_libraries(${PROJECT_NAME} PRIVATE ${RTMP_LIBRARY} ${ZLIB_LIBRARIES} ${OPENSSL_LIBRARIES} ) # Windows额外链接(Socket库+线程库) if(WIN32) target_link_libraries(${PROJECT_NAME} PRIVATE ws2_32 kernel32 user32 gdi32 winspool comdlg32 advapi32 shell32 ole32 oleaut32 uuid odbc32 odbccp32) endif() # ************************** 安装配置(可选,便于系统级安装) ************************** # 安装动态库到系统库目录 install(TARGETS ${PROJECT_NAME} LIBRARY DESTINATION lib ARCHIVE DESTINATION lib RUNTIME DESTINATION bin ) # 安装对外头文件到系统头文件目录 install(FILES ${PROJECT_SOURCE_DIR}/include/rtmp_pull.h DESTINATION include) # 打印编译信息 message(STATUS "Project: ${PROJECT_NAME} ${PROJECT_VERSION}") message(STATUS "Build type: ${CMAKE_BUILD_TYPE}") message(STATUS "Output library: ${CMAKE_LIBRARY_OUTPUT_DIRECTORY}/${CMAKE_SHARED_LIBRARY_PREFIX}${PROJECT_NAME}${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX}")

三 封装库调用Demo

main函数实现

int main(int argc, char *argv[]) { // 1. 创建拉流句柄 RTMPPullHandle handle = rtmp_pull_create(); if (!handle) { printf("create handle failed\n"); return -1; }else{ RTMPPullState state = rtmp_pull_get_state(handle); printf("create handle success status:%d\n",state); } // 2. 设置拉流URL #if 1 const char* rtmp_url = "rtmp://192.168.22.53/live/livestream"; #else const char* rtmp_url = "rtmp://192.168.22.53/mytv/livestream"; #endif RTMPPullErrCode err = rtmp_pull_set_url(handle, rtmp_url); if (err != RTMP_PULL_ERR_OK) { printf("set url failed, err: %d\n", err); rtmp_pull_destroy(handle); return -1; }else{ char err_msg[512]; rtmp_pull_get_last_error(handle, err_msg, sizeof(err_msg)); RTMPPullState state = rtmp_pull_get_state(handle); printf("set url success state:%d msg:%s\n",state,err_msg); } // 3. 开始拉流 err = rtmp_pull_start(handle); if (err != RTMP_PULL_ERR_OK) { char err_msg[512]; rtmp_pull_get_last_error(handle, err_msg, sizeof(err_msg)); printf("start pull failed, err: %d, msg: %s\n", err, err_msg); rtmp_pull_destroy(handle); return -1; } printf("start pull success, url: %s\n", rtmp_url); RtmpPacketParseResult result; // 4. 循环读取RTMP包 RTMPPullPacket packet; while (rtmp_pull_is_connected(handle)) { err = rtmp_pull_read_packet(handle, &packet); if (err == RTMP_PULL_ERR_OK) { #if 1 // 处理包(视频/音频/脚本) const char* type_str = ""; switch (packet.type) { case RTMP_PULL_PACKET_VIDEO: type_str = "VIDEO"; break; case RTMP_PULL_PACKET_AUDIO: type_str = "AUDIO"; break; case RTMP_PULL_PACKET_SCRIPT: type_str = "SCRIPT"; break; default: type_str = "OTHER"; break; } #endif } else if (err == RTMP_PULL_ERR_TIMEOUT) { // 超时,继续读取 continue; }else if(err == RTMP_PULL_ERR_LIBRTMP){ continue; }else { // 读取错误 char err_msg[512]; rtmp_pull_get_last_error(handle, err_msg, sizeof(err_msg)); printf("read packet failed, err: %d, msg: %s\n", err, err_msg); break; } } // 5. 停止拉流并销毁句柄 rtmp_pull_stop(handle); rtmp_pull_destroy(handle); printf("pull stop success\n"); }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 12:13:21

mT5中文-base零样本增强模型企业应用:客服工单扩增与意图识别实战

mT5中文-base零样本增强模型企业应用&#xff1a;客服工单扩增与意图识别实战 1. 为什么企业需要“不教就会”的文本增强能力&#xff1f; 你有没有遇到过这样的情况&#xff1a;客服团队每天收到上千条工单&#xff0c;但其中80%都集中在“订单未发货”“物流信息不更新”“退…

作者头像 李华
网站建设 2026/4/16 15:31:34

Mac滚动控制深度指南:构建多设备滚动协同的高效工作流

Mac滚动控制深度指南&#xff1a;构建多设备滚动协同的高效工作流 【免费下载链接】Scroll-Reverser Per-device scrolling prefs on macOS. 项目地址: https://gitcode.com/gh_mirrors/sc/Scroll-Reverser 在现代数字工作环境中&#xff0c;多设备协同已成为专业人士的…

作者头像 李华
网站建设 2026/4/15 22:36:01

CTC语音唤醒模型的数据集构建与管理最佳实践

CTC语音唤醒模型的数据集构建与管理最佳实践 1. 为什么数据集质量直接决定唤醒效果 你有没有遇到过这样的情况&#xff1a;语音唤醒模型在实验室里表现完美&#xff0c;一放到真实设备上就频频失灵&#xff1f;用户喊"小云小云"十次有三次没反应&#xff0c;或者环…

作者头像 李华
网站建设 2026/4/16 12:28:44

2025全功能Linux平台B站客户端:无缝体验与跨平台方案指南

2025全功能Linux平台B站客户端&#xff1a;无缝体验与跨平台方案指南 【免费下载链接】bilibili-linux 基于哔哩哔哩官方客户端移植的Linux版本 支持漫游 项目地址: https://gitcode.com/gh_mirrors/bi/bilibili-linux Linux平台B站客户端作为一款基于官方客户端移植的开…

作者头像 李华
网站建设 2026/4/16 13:46:11

MusePublic数学公式处理引擎:LaTeX与MathType无缝转换

MusePublic数学公式处理引擎&#xff1a;LaTeX与MathType无缝转换效果实测 最近在整理一批高校数学教材的电子化工作&#xff0c;遇到个让人头疼的问题&#xff1a;老教授们习惯用MathType写公式&#xff0c;出版社却要求统一提交LaTeX源码&#xff1b;学生交来的作业里&#…

作者头像 李华
网站建设 2026/4/16 14:25:56

RMBG-2.0部署指南:镜像免配置一键启动透明Alpha抠图服务

RMBG-2.0部署指南&#xff1a;镜像免配置一键启动透明Alpha抠图服务 1. 项目概述 RMBG-2.0是一款基于BiRefNet架构开发的高精度图像背景去除工具。它能快速准确地分离图像主体与背景&#xff0c;生成带有透明通道的PNG图像。相比传统抠图工具&#xff0c;RMBG-2.0在处理复杂边…

作者头像 李华