当前位置: 首页 > news >正文

基于发布-订阅模型的音视频流分发框架

有时需要同时网络推流和把流封装为某格式,或做一些其它操作。这就需要一个分发流的机制,把同一路流分发给多个使用者去操作,下面实现了一个简易的音视频流分发框架。代码如下:

avStreamHub.h

#ifndef STREAMHUB_H
#define STREAMHUB_H#include <stddef.h>
#include <pthread.h>typedef void (*AVStreamCallback)(void *data, size_t size, void *arg);typedef struct Subscriber
{AVStreamCallback callback;void *arg;
} Subscriber;typedef struct AVStreamHub
{Subscriber *subscribers; // 存储订阅者的数组size_t size;             // 当前订阅者数量size_t capacity;         // 动态数组容量pthread_mutex_t lock;    // 互斥锁
} AVStreamHub;/*** @brief 初始化AVStreamHub* @param hub 指向AVStreamHub结构体的指针* @param initial_capacity 初始分配的订阅者容量* @return 0 成功,-1 失败*/
int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity);/*** @brief 添加订阅者* @param hub 指向AVStreamHub结构体的指针* @param callback 订阅者的回调函数* @param arg 回调携带的参数* @return 0 成功,< 0 失败*/
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg);/*** @brief 删除订阅者* @param hub 指向AVStreamHub结构体的指针* @param callback 要移除的订阅者的回调函数* @return 无*/
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback);/*** @brief 将数据流分发给所有已注册的订阅者* @param hub 指向AVStreamHub结构体的指针* @param data 指向要分发的数据的指针* @param size 数据的大小* @return 无*/
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size);/*** @brief 销毁AVStreamHub* @param hub 指向AVStreamHub结构体的指针* @return 无*/
void avStreamHub_destroy(AVStreamHub *hub);#endif // STREAMHUB_H

 

avStreamHub.c

#include "avStreamHub.h"
#include <stdlib.h>
#include <string.h>#define INITIAL_CAPACITY 2int avStreamHub_init(AVStreamHub *hub, size_t initial_capacity)
{hub->size = 0;hub->capacity = 0;if (initial_capacity == 0){initial_capacity = INITIAL_CAPACITY;}hub->subscribers = (Subscriber *)malloc(initial_capacity * sizeof(Subscriber));if (!hub->subscribers){return -1;}memset(hub->subscribers, 0, initial_capacity * sizeof(Subscriber));hub->capacity = initial_capacity;pthread_mutex_init(&hub->lock, NULL);return 0;
}// 添加订阅者
int avStreamHub_addSub(AVStreamHub *hub, AVStreamCallback callback, void *arg)
{pthread_mutex_lock(&hub->lock);// 检查该回调是否已存在for (size_t i = 0; i < hub->size; i++){if (hub->subscribers[i].callback == callback){pthread_mutex_unlock(&hub->lock);return -1; // 回调已存在}}// 如果容量不足则扩展容量if (hub->size == hub->capacity){hub->subscribers = (Subscriber *)realloc(hub->subscribers,hub->capacity * 2 * sizeof(Subscriber));if (!hub->subscribers){pthread_mutex_unlock(&hub->lock);return -2;}hub->capacity *= 2;}// 添加新订阅者hub->subscribers[hub->size].callback = callback;hub->subscribers[hub->size].arg = arg;hub->size++;pthread_mutex_unlock(&hub->lock);return 0;
}// 删除订阅者
void avStreamHub_removeSub(AVStreamHub *hub, AVStreamCallback callback)
{pthread_mutex_lock(&hub->lock);for (size_t i = 0; i < hub->size; i++){if (hub->subscribers[i].callback == callback){size_t j = i;for (; j < hub->size - 1; j++){hub->subscribers[j] = hub->subscribers[j + 1];}hub->subscribers[j].callback = NULL;hub->subscribers[j].arg = NULL;hub->size--;break;}}pthread_mutex_unlock(&hub->lock);
}// 分发数据
void avStreamHub_publish(AVStreamHub *hub, void *data, size_t size)
{pthread_mutex_lock(&hub->lock);for (size_t i = 0; i < hub->size; i++){hub->subscribers[i].callback(data, size, hub->subscribers[i].arg);}pthread_mutex_unlock(&hub->lock);
}// 销毁AVStreamHub
void avStreamHub_destroy(AVStreamHub *hub)
{free(hub->subscribers);hub->subscribers = NULL;hub->size = 0;hub->capacity = 0;pthread_mutex_destroy(&hub->lock);
}

http://www.mrgr.cn/news/18950.html

相关文章:

  • Mybatis分页查询主从表
  • 国内怎么进行海外直播?需要什么工具?
  • 快专利与慢专利:速度与质量的天平
  • 文心一言 VS 讯飞星火 VS chatgpt (339)-- 算法导论23.1 8题
  • Spring Cloud Gateway整合基于STOMP协议的WebSocket实战及遇到问题解决
  • JavaScript网页设计案例
  • 使用vscode debug cpp/python混合编程的程序(从python调用的C++编译的dll)
  • 深度学习(六)-循环神经网络
  • 【微信小程序入门】3、微信小程序开发基础及微信开发者工具的使用
  • python读取excel数据详细讲解
  • ListBox等控件的SelectedItem,SelectedValue,SelectedValuePath属性详解
  • pr瘦脸怎么操作?
  • CSS学习10
  • 内存卡乱码问题解析恢复方案
  • 渠道招商有哪些工作内容?可以通过什么途径获客!
  • Allure报告下载不同格式的文件
  • redis缓存和数据库通过延迟双删除实现数据一致性
  • 基于Bert-base-chinese训练多分类文本模型(代码详解)
  • 为什么说2025年是国自然申请最佳时机?
  • Centos根目录扩容Docker分区扩容最佳实践