跳到主要内容使用 C++ 构建简单的 RTSP 流媒体转发服务器 | 极客日志C++
使用 C++ 构建简单的 RTSP 流媒体转发服务器
本文介绍如何使用 C++ 和 wepoll 库构建一个简单的 RTSP 流媒体转发服务器。项目包含 EventPoller(并发封装)、RtspServer(服务接入)、RtspSession(会话管理)、SdpParser(SDP 解析)和 MediaPusherManager(推拉流管理)五个核心类。支持桌面推流及播放器拉流播放,采用 TCP 传输协议,原封不动转发 RTP 包。代码结构清晰,适合学习 RTSP 协议交互流程。
念念不忘1 浏览 使用效果举例
运行程序代码后,打开 cmd,执行以下代码进行桌面推流:
ffmpeg -f gdigrab -i desktop -c:v libx264 -f rtsp -rtsp_transport tcp rtsp://127.0.0.1/live/test
这会将流推送给流媒体,创建了一条流媒体的转发流 rtsp://127.0.0.1/live/test。
随后使用 PotPlayer 或者 VLC Player 拉取该流,即可成功播放转发流。
注意:这个流媒体是简单的 RTSP 流媒体转发程序,不做 RTP 和 RTCP 包的分析和重构处理,仅原封不动地将发送至流媒体的 RTSP 流转发出去。
微信扫一扫,关注极客日志
微信公众号「极客日志」,在微信中扫描左侧二维码关注。展示文案:极客日志 zeeklog
相关免费在线工具
- Base64 字符串编码/解码
将字符串编码和解码为其 Base64 格式表示形式即可。 在线工具,Base64 字符串编码/解码在线工具,online
- Base64 文件转换器
将字符串、文件或图像转换为其 Base64 表示形式。 在线工具,Base64 文件转换器在线工具,online
- Markdown 转 HTML
将 Markdown(GFM)转为 HTML 片段,浏览器内 marked 解析;与 HTML 转 Markdown 互为补充。 在线工具,Markdown 转 HTML在线工具,online
- HTML 转 Markdown
将 HTML 片段转为 GitHub Flavored Markdown,支持标题、列表、链接、代码块与表格等;浏览器内处理,可链接预填。 在线工具,HTML 转 Markdown在线工具,online
- JSON 压缩
通过删除不必要的空白来缩小和压缩JSON。 在线工具,JSON 压缩在线工具,online
- JSON美化和格式化
将JSON字符串修饰为友好的可读格式。 在线工具,JSON美化和格式化在线工具,online
开篇
作为流媒体,其并发能力很重要,因此从搭建之初就引入了 wepoll 库(很小,仅两个文件),以其为核心做开发。
- main.cpp (主程序)
- wepoll.h (三方库)
- wepoll.c (三方库)
过程
- 对 wepoll 进行封装,做套接字的并发处理,从而产生一个新类 EventPoller。
- 建立 RtspServer 类,接入 EventPoller 的并发循环,开始 accept 客户端。
- 得到推流到 RtspServer 的客户端连接,将其命名为 RtspSession 类使用。
- 新建 SdpParser 类来解析推流的 SDP 交互报文,确定流的通道参数,给 RtspSession 分析流通道的数据。
- 得到向 RtspServer 申请拉流的客户端连接,同样是 RtspSession 类实例。
- 新建 MediaPusherManager 类,用于追踪管理和区分推/拉流的 RtspSession。
总项目结构完成,5 个类完成一个简单的流媒体转发服务器。
代码
1. main.cpp
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <winsock2.h>
#include <ws2tcpip.h>
#include <iphlpapi.h>
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Iphlpapi.lib")
#include <string>
#include <map>
#include <vector>
#include <iostream>
#include "EventPoller.h"
#include "RtspServer.h"
int main() {
bool isRet;
WORD wVersionRequested = MAKEWORD(2, 2);
WSADATA wsaData;
isRet = (0 == WSAStartup(wVersionRequested, &wsaData));
if (false == isRet) return 1;
EventPoller eventPoller;
auto rtspServer = std::make_shared<RtspServer>(&eventPoller);
if (rtspServer->Start("127.0.0.1", 554)) {
std::cout << "已就绪,请试着用 ffmpeg 运行如下命令进行推流:ffmpeg -f gdigrab -i desktop -c:v libx264 -f rtsp -rtsp_transport tcp rtsp://127.0.0.1/live/test\n";
std::cout << "推流成功后,可以使用某个播放器 (比如 VlcPlayer/PotPlayer) 对 rtsp://127.0.0.1/live/test 进行拉流以查看转发效果\n";
eventPoller.Exec();
rtspServer->Stop();
}
rtspServer.reset();
WSACleanup();
return 0;
}
2. EventPoller.h
#pragma once
#include <string>
#include <functional>
#include <mutex>
typedef void* HANDLE;
class EventPoller {
public:
using PollEventCB = std::function<void(int event)>;
typedef enum {
Event_Read = 1 << 0,
Event_Write = 1 << 2,
Event_Error = 1 << 3,
} Poll_Event;
EventPoller();
~EventPoller();
bool AddEvent(int fd, int event, PollEventCB cb);
bool DelEvent(int fd);
void Exec();
std::vector<char>& GetSharedBuffer();
private:
bool m_isRun = false;
HANDLE m_hEpoll = NULL;
std::unordered_map<int, std::shared_ptr<PollEventCB>> m_mapEventCB;
std::vector<char> m_vecSharedBuffer;
};
3. EventPoller.cpp
#include "EventPoller.h"
#include "wepoll.h"
#include "Windows.h"
#define EPOLL_SIZE 1024
EventPoller::EventPoller() {
m_vecSharedBuffer.resize(32 * 4 * 1024);
m_hEpoll = epoll_create(EPOLL_SIZE);
}
EventPoller::~EventPoller() {}
bool EventPoller::AddEvent(int fd, int event, PollEventCB cb) {
struct epoll_event ev = { 0 };
ev.events = event;
ev.data.fd = fd;
const int iRet = epoll_ctl(m_hEpoll, EPOLL_CTL_ADD, fd, &ev);
if (-1 == iRet) {
return false;
}
m_mapEventCB.emplace(fd, std::make_shared<PollEventCB>(std::move(cb)));
return true;
}
bool EventPoller::DelEvent(int fd) {
int iRet = -1;
if (m_mapEventCB.erase(fd)) {
iRet = epoll_ctl(m_hEpoll, EPOLL_CTL_DEL, fd, nullptr);
}
return -1 != iRet;
}
void EventPoller::Exec() {
m_isRun = true;
int iMinDelay = 5000;
struct epoll_event* events = new struct epoll_event[EPOLL_SIZE];
while (m_isRun) {
int ret = epoll_wait(m_hEpoll, events, EPOLL_SIZE, iMinDelay);
if (ret <= 0) continue;
for (int i = 0; i < ret; ++i) {
struct epoll_event& ev = events[i];
int fd = ev.data.fd;
auto it = m_mapEventCB.find(fd);
if (it == m_mapEventCB.end()) {
epoll_ctl(m_hEpoll, EPOLL_CTL_DEL, fd, nullptr);
continue;
}
(*(it->second))(ev.events);
}
}
delete[] events;
}
std::vector<char>& EventPoller::GetSharedBuffer() {
return m_vecSharedBuffer;
}
4. RtspServer.h
#pragma once
#include <memory>
#include <mutex>
#include <unordered_map>
class EventPoller;
class RtspSession;
class RtspServer : public std::enable_shared_from_this<RtspServer> {
public:
explicit RtspServer(EventPoller* pEventPoller);
bool Start(const std::string& szHost, uint16_t uPort);
void Stop();
void CloseSession(int fd);
private:
EventPoller* m_pEventPoller = nullptr;
std::string m_szHost;
uint16_t m_uPort = 0;
int m_fd = -1;
std::unordered_map<int, std::shared_ptr<RtspSession>> m_mapSession;
std::mutex m_mtxSession;
};
5. RtspServer.cpp
#pragma once
#include "RtspServer.h"
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <winsock2.h>
#include <ws2tcpip.h>
#include <iphlpapi.h>
#pragma comment(lib, "Ws2_32.lib")
#pragma comment(lib, "Iphlpapi.lib")
#include <string>
#include <map>
#include <vector>
#include <thread>
#include "EventPoller.h"
#include "RtspSession.h"
using namespace std;
RtspServer::RtspServer(EventPoller* pEventPoller) {
m_pEventPoller = pEventPoller;
}
bool RtspServer::Start(const std::string& szHost, uint16_t uPort) {
m_szHost = szHost;
m_uPort = uPort;
m_fd = (int)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
int opt = 1;
setsockopt(m_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&opt, static_cast<socklen_t>(sizeof(opt)));
unsigned long ul = 1;
ioctlsocket(m_fd, FIONBIO, &ul);
struct sockaddr_in addr;
ZeroMemory(&addr, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(uPort);
addr.sin_addr.s_addr = INADDR_ANY;
if (::bind(m_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) {
DebugBreak();
}
if (::listen(m_fd, 1024) == -1) {
DebugBreak();
}
std::weak_ptr<RtspServer> weak_self = shared_from_this();
bool isRet = m_pEventPoller->AddEvent(m_fd, EventPoller::Event_Read | EventPoller::Event_Error, [weak_self](int event) {
auto strong_self = weak_self.lock();
if (nullptr == strong_self) return;
const int fdServer = strong_self->m_fd;
int fd;
struct sockaddr_storage peer_addr;
socklen_t addr_len = sizeof(peer_addr);
while (true) {
if (event & EventPoller::Event_Read) {
fd = (int)accept(fdServer, (struct sockaddr*)&peer_addr, &addr_len);
if (fd == -1) {
int iRet = WSAGetLastError();
if (iRet == WSAEWOULDBLOCK) {
return;
}
DebugBreak();
return;
}
auto session = std::make_shared<RtspSession>(strong_self, strong_self->m_pEventPoller, fd);
if (session->StartSession()) {
strong_self->m_mtxSession.lock();
strong_self->m_mapSession[fd] = session;
strong_self->m_mtxSession.unlock();
}
}
if (event & EventPoller::Event_Error) {
return;
}
}
});
if (false == isRet) Stop();
return isRet;
}
void RtspServer::Stop() {
if (-1 != m_fd) {
m_pEventPoller->DelEvent(m_fd);
closesocket(m_fd);
m_fd = -1;
}
}
void RtspServer::CloseSession(int fd) {
thread task([this, fd]() {
m_mtxSession.lock();
m_mapSession.erase(fd);
m_mtxSession.unlock();
});
task.detach();
}
6. RtspSession.h
#pragma once
#include <vector>
#include <string>
#include <unordered_map>
#include <memory>
#include "SdpParser.h"
class EventPoller;
class RtspServer;
class RtspSession : public std::enable_shared_from_this<RtspSession> {
public:
RtspSession(std::weak_ptr<RtspServer> pServer, EventPoller* pEventPoller, int fd);
virtual ~RtspSession();
bool StartSession();
void CloseSession();
bool SendData(const char* buffer, size_t uSize);
protected:
size_t onRead(std::vector<char>& buffer);
void onRecv(char* buffer, size_t uSize);
private:
bool parseHeader();
void handleReq();
void handleRtpPacket();
private:
void handleReq_Options(const std::unordered_map<std::string, std::string>& parser);
void handleReq_Describe(const std::unordered_map<std::string, std::string>& parser);
void handleReq_ANNOUNCE(const std::unordered_map<std::string, std::string>& parser);
void handleReq_RECORD(const std::unordered_map<std::string, std::string>& parser);
void handleReq_SETUP(const std::unordered_map<std::string, std::string>& parser);
void handleReq_PLAY(const std::unordered_map<std::string, std::string>& parser);
void handleReq_TEARDOWN(const std::unordered_map<std::string, std::string>& parser);
void handleReq_SET_PARAMETER(const std::unordered_map<std::string, std::string>& parser);
bool sendRtspResponse(const std::string& res_code, std::multimap<std::string, std::string> header = {}, const std::string& sdp = "", const char* protocol = "RTSP/1.0");
size_t getTrackIndexByControlUrl(const std::string& control_url);
private:
std::weak_ptr<RtspServer> m_pServer;
EventPoller* m_pEventPoller = nullptr;
int m_fd = -1;
std::vector<char> m_vecRecv;
std::vector<char> m_vecHeader;
std::vector<char> m_vecContent;
bool m_isHeader = true;
int m_iContentLength = 0;
size_t m_uRtpPacketLength = 0;
std::unordered_map<std::string, std::string> m_mapHeader;
int m_iCseq = 0;
std::string m_szSessionId;
SdpParser* m_pSdpParser = nullptr;
std::vector<SdpTrack*> m_vecSdpTrack;
std::string m_szFullUrl;
bool m_isPusher = false;
Rtsp::ERtpType m_eRtpType = Rtsp::ERtpType::INVALID;
RtspSession* m_pPusherRtspSession = nullptr;
};
7. RtspSession.cpp
#include "RtspSession.h"
#define _WINSOCK_DEPRECATED_NO_WARNINGS
#include <winsock2.h>
#include <ws2tcpip.h>
#pragma comment(lib, "Ws2_32.lib")
#include <string>
#include <map>
#include <vector>
#include <random>
#include <iomanip>
#include <unordered_map>
#include <sstream>
#include "EventPoller.h"
#include "MediaPusherManager.h"
#include "RtspServer.h"
using namespace std;
std::string makeRandStr(size_t uLen) {
static constexpr char CCH[] = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
static constexpr size_t CCH_LEN = sizeof(CCH) - 1;
string ret;
ret.resize(uLen);
thread_local std::mt19937 rng(std::random_device{}());
for (size_t i = 0; i < uLen; ++i) {
ret[i] = CCH[rng % (sizeof(CCH) - 1)];
}
return ret;
}
static string dateStr() {
char buf[64];
time_t tt = time(NULL);
tm t;
gmtime_s(&t, &tt);
strftime(buf, sizeof buf, "%a, %b %d %Y %H:%M:%S GMT", &t);
return buf;
}
RtspSession::RtspSession(std::weak_ptr<RtspServer> pServer, EventPoller* pEventPoller, int fd) : m_pServer(pServer), m_pEventPoller(pEventPoller), m_fd(fd) {
unsigned long ul = 1;
ioctlsocket(fd, FIONBIO, &ul);
int opt = 1;
setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, static_cast<socklen_t>(sizeof(opt)));
int size = 262144;
setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char*)&size, sizeof(size));
setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char*)&size, sizeof(size));
linger m_sLinger;
m_sLinger.l_onoff = false;
m_sLinger.l_linger = 0;
setsockopt(fd, SOL_SOCKET, SO_LINGER, (char*)&m_sLinger, sizeof(linger));
}
RtspSession::~RtspSession() {
if (m_isPusher) MediaPusherManager::Instance().DelPusher(m_szFullUrl);
else MediaPusherManager::Instance().DelRecvier(m_szFullUrl, this);
}
bool RtspSession::StartSession() {
std::weak_ptr<RtspSession> weak_self = shared_from_this();
bool isRet = m_pEventPoller->AddEvent(m_fd, EventPoller::Event_Read | EventPoller::Event_Error, [weak_self](int event) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
if (event & EventPoller::Event_Read) {
strong_self->onRead(strong_self->m_pEventPoller->GetSharedBuffer());
}
if (event & EventPoller::Event_Error) {
strong_self->CloseSession();
}
});
return isRet;
}
void RtspSession::CloseSession() {
m_pEventPoller->DelEvent(m_fd);
auto pServer = m_pServer.lock();
if (nullptr == pServer) return;
pServer->CloseSession(m_fd);
}
bool RtspSession::SendData(const char* buffer, size_t uSize) {
const int iSendRet = ::send(m_fd, buffer, uSize, 0);
return iSendRet > 0;
}
size_t RtspSession::onRead(std::vector<char>& buffer) {
size_t uReadSizeAll = 0, uReadSize = 0, count = 0;
while (true) {
uReadSize = (size_t)recv(m_fd, buffer.data(), buffer.size() - 1, 0);
if (uReadSize == 0) {
CloseSession();
return uReadSizeAll;
}
if (uReadSize == -1) {
int iRetWSA = WSAGetLastError();
if (iRetWSA != WSAEWOULDBLOCK) {
CloseSession();
}
return uReadSizeAll;
}
buffer[uReadSize] = '\0';
uReadSizeAll += uReadSize;
onRecv(buffer.data(), uReadSize);
}
}
void RtspSession::onRecv(char* buffer, size_t uSize) {
size_t uIndex = 0;
if (m_vecRecv.size() < 4) {
size_t uLen = min(4 - m_vecRecv.size(), uSize);
m_vecRecv.insert(m_vecRecv.end(), buffer, buffer + uLen);
if (m_vecRecv.size() < 4) return;
uSize -= uLen;
buffer += uLen;
}
if (m_vecRecv[0] == '$') {
char* data = m_vecRecv.data();
size_t uDataLen = m_vecRecv.size();
if (0 == m_uRtpPacketLength) {
m_uRtpPacketLength = ((((uint8_t*)data)[2] << 8) | ((uint8_t*)data)[3]) + 4;
}
size_t uLen = min(m_uRtpPacketLength - uDataLen, uSize);
m_vecRecv.insert(m_vecRecv.end(), buffer, buffer + uLen);
uSize -= uLen;
buffer += uLen;
if (m_vecRecv.size() == m_uRtpPacketLength) {
handleRtpPacket();
if (uSize > 0)
onRecv(buffer, uSize);
}
return;
}
if (m_isHeader) {
while (uIndex < uSize) {
m_vecRecv.push_back(buffer[uIndex]);
if (buffer[uIndex] == '\n') {
const size_t vec_size = m_vecRecv.size();
if (m_vecRecv[vec_size - 1] == '\n' && m_vecRecv[vec_size - 2] == '\r' && m_vecRecv[vec_size - 3] == '\n' && m_vecRecv[vec_size - 4] == '\r') {
m_vecRecv.push_back('\0');
m_vecHeader.swap(m_vecRecv);
m_vecRecv.clear();
++uIndex;
if (parseHeader()) {
if (m_iContentLength > 0) {
m_isHeader = false;
}
else {
handleReq();
}
}
else {
m_vecHeader.clear();
}
break;
}
}
++uIndex;
}
}
if (false == m_isHeader) {
while (uIndex < uSize) {
m_vecRecv.push_back(buffer[uIndex]);
if (m_vecRecv.size() == m_iContentLength) {
m_vecRecv.push_back('\0');
m_vecContent.swap(m_vecRecv);
m_vecRecv.clear();
m_mapHeader["reqContent"] = m_vecContent.data();
m_isHeader = true;
++uIndex;
handleReq();
break;
}
++uIndex;
}
}
if (uSize - uIndex != 0)
onRecv(buffer + uIndex, uSize - uIndex);
}
bool RtspSession::parseHeader() {
m_mapHeader.clear();
std::istringstream stream(m_vecHeader.data());
stream >> m_mapHeader["reqMethod"];
stream >> m_mapHeader["reqUrl"];
stream >> m_mapHeader["reqProtocol"];
std::string szUrl = m_mapHeader.at("reqUrl");
size_t uSplitIndex = szUrl.find('?');
if (std::string::npos != uSplitIndex) {
m_mapHeader["reqUrl_Short"] = szUrl.substr(0, uSplitIndex);
m_mapHeader["reqUrl_Args"] = szUrl.substr(uSplitIndex + 1);
}
else {
m_mapHeader["reqUrl_Short"] = szUrl;
}
#define RM_R(STR) STR = STR.substr(0, STR.size()-1)
std::string line;
while (std::getline(stream, line)) {
size_t pos = line.find(':');
if (pos != std::string::npos) {
std::string key = line.substr(0, pos);
std::string value = line.substr(pos + 1);
key.erase(0, key.find_first_not_of(" \t"));
key.erase(key.find_last_not_of(" \t") + 1);
value.erase(0, value.find_first_not_of(" \t"));
value.erase(value.find_last_not_of(" \t") + 1);
RM_R(value);
m_mapHeader[key] = value;
}
}
#undef RM_R
if (0 == m_mapHeader.size()) {
return false;
}
m_iContentLength = atoi(m_mapHeader["Content-Length"].c_str());
return true;
}
void RtspSession::handleReq() {
auto method = m_mapHeader["reqMethod"];
m_iCseq = atoi(m_mapHeader["CSeq"].data());
if (m_szFullUrl.empty()) {
m_szFullUrl = m_mapHeader.at("reqUrl");
}
using rtsp_request_handler = void(RtspSession::*)(const std::unordered_map<std::string, std::string>& parser);
static unordered_map<string, rtsp_request_handler> s_cmd_functions{
{"OPTIONS", &RtspSession::handleReq_Options},
{"DESCRIBE", &RtspSession::handleReq_Describe},
{"ANNOUNCE", &RtspSession::handleReq_ANNOUNCE},
{"RECORD", &RtspSession::handleReq_RECORD},
{"SETUP", &RtspSession::handleReq_SETUP},
{"PLAY", &RtspSession::handleReq_PLAY},
{"TEARDOWN", &RtspSession::handleReq_TEARDOWN},
{"GET_PARAMETER", &RtspSession::handleReq_SET_PARAMETER},
};
auto it = s_cmd_functions.find(method);
if (it == s_cmd_functions.end()) {
sendRtspResponse("403 Forbidden");
}
else {
(this->*(it->second))(m_mapHeader);
}
m_vecHeader.clear();
m_vecContent.clear();
m_iContentLength = 0;
}
void RtspSession::handleRtpPacket() {
char* data = m_vecRecv.data();
size_t uDataLen = m_vecRecv.size();
MediaPusherManager::Instance().SendDataToRecvier(m_szFullUrl, data, uDataLen);
m_vecRecv.clear();
m_uRtpPacketLength = 0;
}
void RtspSession::handleReq_Options(const std::unordered_map<std::string, std::string>& parser) {
std::multimap<std::string, std::string> header;
header.emplace("Public", "OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY, PAUSE, ANNOUNCE, RECORD, SET_PARAMETER, GET_PARAMETER");
sendRtspResponse("200 OK", header);
}
void RtspSession::handleReq_Describe(const std::unordered_map<std::string, std::string>& parser) {
m_pPusherRtspSession = dynamic_cast<RtspSession*>(MediaPusherManager::Instance().AddRecvier(m_szFullUrl, this));
if (nullptr == m_pPusherRtspSession) {
static constexpr auto err = "该流在服务器上不存在";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("406 Not Acceptable", header, err);
return;
}
m_isPusher = false;
m_pSdpParser = new SdpParser(m_pPusherRtspSession->m_pSdpParser->GetSdp());
m_vecSdpTrack = m_pSdpParser->GetAvailableTrack();
if (m_vecSdpTrack.empty()) {
static constexpr auto err = "sdp 中无有效 track,该流无效";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("406 Not Acceptable", header, err);
return;
}
m_szSessionId = makeRandStr(12);
std::multimap<std::string, std::string> header;
header.emplace("Content-Base", m_szFullUrl + "/");
header.emplace("x-Accept-Retransmit", "our-retransmit");
header.emplace("x-Accept-Dynamic-Rate", "1");
sendRtspResponse("200 OK", header, m_pSdpParser->GetSdp());
}
void RtspSession::handleReq_ANNOUNCE(const std::unordered_map<std::string, std::string>& parser) {
m_szFullUrl = parser.at("reqUrl");
if (false == MediaPusherManager::Instance().AddPusher(m_szFullUrl, this)) {
static constexpr auto err = "该流在服务器上已存在";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("406 Not Acceptable", header, err);
return;
}
m_isPusher = true;
m_pSdpParser = new SdpParser(parser.at("reqContent"));
m_szSessionId = makeRandStr(12);
m_vecSdpTrack = m_pSdpParser->GetAvailableTrack();
if (m_vecSdpTrack.empty()) {
static constexpr auto err = "sdp 中无有效 track";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("403 Forbidden", header, err);
return;
}
sendRtspResponse("200 OK");
}
void RtspSession::handleReq_RECORD(const std::unordered_map<std::string, std::string>& parser) {
if (m_vecSdpTrack.empty() || parser.at("Session") != m_szSessionId) {
std::multimap<std::string, std::string> header;
header.emplace("Connection", "Close");
sendRtspResponse("454 Session Not Found", header);
return;
}
stringstream rtp_info;
for (auto& track : m_vecSdpTrack) {
if (track->isInited == false) {
static constexpr auto err = "track not setuped";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("403 Forbidden", header, err);
return;
}
rtp_info << "url=" << track->GetControlUrl(m_szFullUrl) << ",";
}
auto rtpStr = rtp_info.str();
rtpStr.pop_back();
std::multimap<std::string, std::string> header;
header.emplace("RTP-Info", rtpStr);
sendRtspResponse("200 OK", header);
}
void RtspSession::handleReq_SETUP(const std::unordered_map<std::string, std::string>& parser) {
m_szFullUrl = parser.at("reqUrl");
int trackIdx = getTrackIndexByControlUrl(parser.at("reqUrl"));
SdpTrack* trackRef = m_vecSdpTrack[trackIdx];
if (trackRef->isInited) {
static constexpr auto err = "不允许对同一个 track 进行两次 setup";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("403 Forbidden", header, err);
return;
}
static auto getRtpTypeStr = [](const Rtsp::ERtpType type) {
switch (type) {
case Rtsp::ERtpType::TCP: return "TCP";
case Rtsp::ERtpType::UDP: return "UDP";
case Rtsp::ERtpType::MULTICAST: return "MULTICAST";
default: return "Invalid";
}
};
if (m_eRtpType == Rtsp::ERtpType::INVALID) {
auto& strTransport = parser.at("Transport");
auto rtpType = Rtsp::ERtpType::INVALID;
if (strTransport.find("TCP") != string::npos) {
rtpType = Rtsp::ERtpType::TCP;
}
else if (strTransport.find("multicast") != string::npos) {
}
else {
}
if (Rtsp::ERtpType::INVALID == rtpType) {
sendRtspResponse("461 Unsupported transport");
return;
}
m_eRtpType = rtpType;
}
trackRef->isInited = true;
{
auto key_values = SdpParser::ParseArgs(parser.at("Transport"), ";", "=");
int interleaved_rtp = -1, interleaved_rtcp = -1;
if (2 == sscanf_s(key_values["interleaved"].data(), "%d-%d", &interleaved_rtp, &interleaved_rtcp)) {
trackRef->uInterleaved = interleaved_rtp;
}
else {
static constexpr auto err = "can not find interleaved when setup of rtp over tcp";
std::multimap<std::string, std::string> header;
header.emplace("Content-Type", "text/plain");
sendRtspResponse("403 Forbidden", header, err);
return;
}
stringstream sdpResponse;
sdpResponse << "RTP/AVP/TCP;unicast;" << "interleaved=" << (int)trackRef->uInterleaved << "-" << (int)trackRef->uInterleaved + 1 << ";" << "ssrc=00000000";
std::multimap<std::string, std::string> header;
header.emplace("Transport", sdpResponse.str());
header.emplace("x-Transport-Options", "late-tolerance=1.400000");
header.emplace("x-Dynamic-Rate", "1");
sendRtspResponse("200 OK", header);
}
}
void RtspSession::handleReq_PLAY(const std::unordered_map<std::string, std::string>& parser) {
if (m_vecSdpTrack.empty() || parser.at("Session") != m_szSessionId) {
std::multimap<std::string, std::string> header;
header.emplace("Connection", "Close");
sendRtspResponse("454 Session Not Found", header);
return;
}
std::multimap<std::string, std::string> res_header;
vector<ETrackType> inited_tracks;
stringstream rtp_info;
for (auto& track : m_vecSdpTrack) {
if (track->isInited == false) {
continue;
}
inited_tracks.emplace_back(track->type);
rtp_info << "url=" << track->GetControlUrl(m_szFullUrl) << ";" << "seq=0;" << "rtptime=0" << ",";
}
auto rtpStr = rtp_info.str();
rtpStr.pop_back();
res_header.emplace("RTP-Info", rtpStr);
stringstream szPlayRange;
szPlayRange << "npt=" << setiosflags(ios::fixed) << setprecision(2) << 0;
res_header.emplace("Range", szPlayRange.str());
sendRtspResponse("200 OK", res_header);
}
void RtspSession::handleReq_TEARDOWN(const std::unordered_map<std::string, std::string>& parser) {
sendRtspResponse("200 OK");
CloseSession();
}
void RtspSession::handleReq_SET_PARAMETER(const std::unordered_map<std::string, std::string>& parser) {
sendRtspResponse("200 OK");
}
bool RtspSession::sendRtspResponse(const std::string& res_code, std::multimap<std::string, std::string> header, const std::string& sdp, const char* protocol) {
header.emplace("CSeq", to_string(m_iCseq));
if (!m_szSessionId.empty()) {
header.emplace("Session", m_szSessionId);
}
header.emplace("Server", "服务器名字");
header.emplace("Date", dateStr());
if (!sdp.empty()) {
header.emplace("Content-Length", to_string(sdp.size()));
header.emplace("Content-Type", "application/sdp");
}
stringstream printer;
printer << protocol << " " << res_code << "\r\n";
for (auto& pr : header) {
printer << pr.first << ": " << pr.second << "\r\n";
}
printer << "\r\n";
if (!sdp.empty()) {
printer << sdp;
}
const int iSendRet = ::send(m_fd, printer.str().data(), printer.str().size(), 0);
return iSendRet > 0;
}
size_t RtspSession::getTrackIndexByControlUrl(const std::string& control_url) {
for (size_t i = 0; i < m_vecSdpTrack.size(); ++i) {
if (control_url.find(m_vecSdpTrack[i]->GetControlUrl(m_szFullUrl)) == 0) {
return i;
}
}
if (m_vecSdpTrack.size() == 1) {
return 0;
}
DebugBreak();
}
8. SdpParser.h
#pragma once
#include <string>
#include <vector>
#include <map>
enum class ETrackType {
INVALID = -1,
VIDEO = 0,
AUDIO,
TITLE,
MIN = INVALID,
MAX = TITLE,
};
namespace Rtsp {
enum class ERtpType {
INVALID = -1,
TCP = 0,
UDP = 1,
MULTICAST = 2,
};
}
struct SdpTrack {
std::multimap<std::string, std::string> map_attr;
ETrackType type = ETrackType::INVALID;
std::string szControl;
uint8_t uInterleaved = 0;
bool isInited = false;
std::string GetControlUrl(const std::string& szUrl) const;
};
class SdpParser {
public:
SdpParser(const std::string& szSdp);
~SdpParser();
SdpTrack* GetTrack(ETrackType type) const;
std::vector<SdpTrack*> GetAvailableTrack() const;
const std::string GetSdp() const {
return m_szSdp;
}
static std::map<std::string, std::string> ParseArgs(const std::string& str, const char* pair_delim, const char* key_delim);
private:
std::string m_szSdp;
std::vector<SdpTrack*> m_vecTrack;
};
9. SdpParser.cpp
#include "SdpParser.h"
#include <algorithm>
using namespace std;
static int GetClockRate(int pt) {
switch (pt) {
case 0: return 8000;
case 3: return 8000;
case 4: return 8000;
case 5: return 8000;
case 6: return 16000;
case 7: return 8000;
case 8: return 8000;
case 9: return 16000;
case 10: return 44100;
case 11: return 44100;
case 12: return 8000;
case 13: return 8000;
case 14: return 44100;
case 15: return 8000;
case 16: return 11025;
case 17: return 22050;
case 18: return 8000;
case 25: return 90000;
case 26: return 90000;
case 28: return 90000;
case 31: return 90000;
case 32: return 90000;
case 33: return 90000;
case 34: return 90000;
default: return 90000;
}
}
static vector<string> split(const string& s, const char* delim) {
vector<string> ret;
size_t last = 0;
auto index = s.find(delim, last);
while (index != string::npos) {
if (index - last > 0) {
ret.push_back(s.substr(last, index - last));
}
last = index + strlen(delim);
index = s.find(delim, last);
}
if (!s.size() || s.size() - last > 0) {
ret.push_back(s.substr(last));
}
return ret;
}
static string& trim(string& s, const string& chars = " \r\n\t") {
string map(0xFF, '\0');
for (auto& ch : chars) {
map[(unsigned char&)ch] = '\1';
}
while (s.size() && map.at((unsigned char&)s.back())) s.pop_back();
while (s.size() && map.at((unsigned char&)s.front())) s.erase(0, 1);
return s;
}
static string findSubString(const char* buf, const char* start, const char* end, size_t buf_size = 0) {
if (buf_size <= 0) {
buf_size = strlen(buf);
}
auto msg_start = buf;
auto msg_end = buf + buf_size;
size_t len = 0;
if (start != NULL) {
len = strlen(start);
msg_start = strstr(buf, start);
}
if (msg_start == NULL) {
return "";
}
msg_start += len;
if (end != NULL) {
msg_end = strstr(msg_start, end);
if (msg_end == NULL) {
return "";
}
}
return string(msg_start, msg_end);
}
static ETrackType toTrackType(const string& str) {
if (str == "") {
return ETrackType::TITLE;
}
if (str == "video") {
return ETrackType::VIDEO;
}
if (str == "audio") {
return ETrackType::AUDIO;
}
return ETrackType::INVALID;
}
SdpParser::SdpParser(const std::string& szSdp) : m_szSdp(szSdp) {
SdpTrack* track = new SdpTrack;
track->type = ETrackType::TITLE;
m_vecTrack.emplace_back(track);
auto lines = split(m_szSdp, "\n");
for (auto& line : lines) {
trim(line);
if (line.size() < 2 || line[1] != '=') {
continue;
}
char opt = line[0];
string opt_val = line.substr(2);
switch (opt) {
case 'm': {
track = new SdpTrack;
m_vecTrack.emplace_back(track);
int pt, port, port_count;
char rtp[16] = { 0 }, type[16] = { 0 };
if (4 == sscanf_s(opt_val.data(), " %15[^ ] %d %15[^ ] %d", type, (unsigned)_countof(type), &port, rtp, (unsigned)_countof(rtp), &pt) ||
5 == sscanf_s(opt_val.data(), " %15[^ ] %d/%d %15[^ ] %d", type, (unsigned)_countof(type), &port, &port_count, rtp, (unsigned)_countof(rtp), &pt)) {
track->type = toTrackType(type);
}
break;
}
case 'a': {
string attr = findSubString(opt_val.data(), nullptr, ":");
if (attr.empty()) {
track->map_attr.emplace(opt_val, "");
}
else {
track->map_attr.emplace(attr, findSubString(opt_val.data(), ":", nullptr));
}
break;
}
}
}
for (auto& track_ptr : m_vecTrack) {
auto& track = *track_ptr;
auto it = track.map_attr.find("control");
if (it != track.map_attr.end()) {
track.szControl = it->second;
}
}
}
SdpParser::~SdpParser() {
for (auto one : m_vecTrack) delete one;
m_vecTrack.clear();
}
SdpTrack* SdpParser::GetTrack(ETrackType type) const {
for (auto& track : m_vecTrack) {
if (track->type == type) {
return track;
}
}
return nullptr;
}
std::vector<SdpTrack*> SdpParser::GetAvailableTrack() const {
vector<SdpTrack*> ret;
bool audio_added = false;
bool video_added = false;
for (auto& track : m_vecTrack) {
if (track->type == ETrackType::AUDIO) {
if (!audio_added) {
ret.emplace_back(track);
audio_added = true;
}
continue;
}
if (track->type == ETrackType::VIDEO) {
if (!video_added) {
ret.emplace_back(track);
video_added = true;
}
continue;
}
}
return ret;
}
std::map<std::string, std::string> SdpParser::ParseArgs(const string& str, const char* pair_delim, const char* key_delim) {
map<string, string> ret;
auto arg_vec = split(str, pair_delim);
for (auto& key_val : arg_vec) {
if (key_val.empty()) {
continue;
}
auto pos = key_val.find(key_delim);
if (pos != string::npos) {
std::string key(key_val, 0, pos);
std::string val(key_val.substr(pos + strlen(key_delim)));
trim(key);
trim(val);
ret.emplace(std::move(key), std::move(val));
}
else {
trim(key_val);
if (!key_val.empty()) {
ret.emplace(std::move(key_val), "");
}
}
}
return ret;
}
std::string SdpTrack::GetControlUrl(const std::string& szUrl) const {
if (szControl.find("://") != string::npos) {
return szControl;
}
return szUrl + "/" + szControl;
}
10. MediaPusherManager.h
#pragma once
#include <string>
#include <map>
#include <list>
#include <mutex>
class RtspSession;
class MediaPusher {
public:
MediaPusher(const std::string& szUrl, RtspSession* pRtspSession);
~MediaPusher();
void AddRecvier(RtspSession* pRtspSession);
void DelRecvier(RtspSession* pRtspSession);
bool SendDataToRecvier(const char* pData, size_t uSize);
RtspSession* GetRtspSession() const {
return m_pRtspSession;
}
private:
std::string m_szUrl;
RtspSession* m_pRtspSession = nullptr;
std::list<RtspSession*> m_listRtspSession;
};
class MediaPusherManager {
public:
static MediaPusherManager& Instance() {
static MediaPusherManager instance;
return instance;
}
bool AddPusher(const std::string& szUrl, RtspSession* pRtspSession);
bool DelPusher(const std::string& szUrl);
bool IsExist(const std::string& szUrl);
RtspSession* AddRecvier(const std::string& szUrl, RtspSession* pRtspSession);
bool DelRecvier(const std::string& szUrl, RtspSession* pRtspSession);
bool SendDataToRecvier(const std::string& szUrl, const char* pData, size_t uSize);
private:
std::map<const std::string, MediaPusher*> m_mapMediaPusher;
std::mutex m_mtxMediaPusher;
};
11. MediaPusherManager.cpp
#include "MediaPusherManager.h"
#include "RtspSession.h"
bool MediaPusherManager::AddPusher(const std::string& szUrl, RtspSession* pRtspSession) {
{
std::lock_guard lck(m_mtxMediaPusher);
if (m_mapMediaPusher.find(szUrl) == m_mapMediaPusher.end()) {
m_mapMediaPusher[szUrl] = new MediaPusher(szUrl, pRtspSession);
return true;
}
}
return false;
}
bool MediaPusherManager::DelPusher(const std::string& szUrl) {
{
std::lock_guard lck(m_mtxMediaPusher);
if (m_mapMediaPusher.find(szUrl) != m_mapMediaPusher.end()) {
delete m_mapMediaPusher.at(szUrl);
m_mapMediaPusher.erase(szUrl);
return true;
}
}
return false;
}
bool MediaPusherManager::IsExist(const std::string& szUrl) {
std::lock_guard lck(m_mtxMediaPusher);
return m_mapMediaPusher.find(szUrl) != m_mapMediaPusher.end();
}
RtspSession* MediaPusherManager::AddRecvier(const std::string& szUrl, RtspSession* pRtspSession) {
{
std::lock_guard lck(m_mtxMediaPusher);
if (m_mapMediaPusher.find(szUrl) != m_mapMediaPusher.end()) {
auto pusher = m_mapMediaPusher.at(szUrl);
pusher->AddRecvier(pRtspSession);
return pusher->GetRtspSession();
}
}
return nullptr;
}
bool MediaPusherManager::DelRecvier(const std::string& szUrl, RtspSession* pRtspSession) {
{
std::lock_guard lck(m_mtxMediaPusher);
if (m_mapMediaPusher.find(szUrl) != m_mapMediaPusher.end()) {
m_mapMediaPusher.at(szUrl)->DelRecvier(pRtspSession);
return true;
}
}
return false;
}
bool MediaPusherManager::SendDataToRecvier(const std::string& szUrl, const char* pData, size_t uSize) {
{
std::lock_guard lck(m_mtxMediaPusher);
if (m_mapMediaPusher.find(szUrl) != m_mapMediaPusher.end()) {
m_mapMediaPusher.at(szUrl)->SendDataToRecvier(pData, uSize);
return true;
}
}
return false;
}
MediaPusher::MediaPusher(const std::string& szUrl, RtspSession* pRtspSession) : m_szUrl(szUrl), m_pRtspSession(pRtspSession) {}
MediaPusher::~MediaPusher() {}
void MediaPusher::AddRecvier(RtspSession* pRtspSession) {
m_listRtspSession.push_back(pRtspSession);
}
void MediaPusher::DelRecvier(RtspSession* pRtspSession) {
m_listRtspSession.remove(pRtspSession);
}
bool MediaPusher::SendDataToRecvier(const char* pData, size_t uSize) {
decltype(m_listRtspSession) removeList;
for (auto one : m_listRtspSession) {
if (false == one->SendData(pData, uSize)) {
removeList.push_back(one);
}
}
for (auto one : removeList) {
this->DelRecvier(one);
one->CloseSession();
}
return true;
}