lots of changes in code for ssl client
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
// src/Services/WhipClient.h
|
||||
#pragma once
|
||||
|
||||
#define CPPHTTPLIB_OPENSSL_SUPPORT
|
||||
#include <memory>
|
||||
#include <string>
|
||||
#include <string_view>
|
||||
@@ -11,53 +11,54 @@
|
||||
#include <vector>
|
||||
#include <regex>
|
||||
#include <array>
|
||||
#include <random>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
|
||||
#include <spdlog/spdlog.h>
|
||||
#include <nlohmann/json.hpp>
|
||||
#include <httplib.h> // build with CPPHTTPLIB_OPENSSL_SUPPORT
|
||||
|
||||
#include <rtc/rtc.hpp> // libdatachannel
|
||||
#include <rtc/rtc.hpp>
|
||||
#include <rtc/peerconnection.hpp>
|
||||
#include <rtc/description.hpp>
|
||||
#include <rtc/track.hpp>
|
||||
|
||||
namespace snoop {
|
||||
namespace snoop
|
||||
{
|
||||
|
||||
class WhipClient {
|
||||
public:
|
||||
struct Params {
|
||||
std::string whipUrl; // full WHIP endpoint (may already include ?token=...)
|
||||
std::string caPath; // CA chain
|
||||
std::string crtPath; // client cert
|
||||
std::string keyPath; // client key (temp extracted from keyctl)
|
||||
int sampleRate = 48000;
|
||||
int channels = 1;
|
||||
};
|
||||
class WhipClient
|
||||
{
|
||||
public:
|
||||
struct Params
|
||||
{
|
||||
std::string whipUrl; // full WHIP endpoint (may include ?token=...)
|
||||
std::string caPath; // CA chain
|
||||
std::string crtPath; // client cert
|
||||
std::string keyPath; // client key (temp extracted from keyctl)
|
||||
int sampleRate = 48000;
|
||||
int channels = 1; // 1 or 2; timestamp advance uses PCM frames per channel
|
||||
};
|
||||
|
||||
explicit WhipClient(Params p)
|
||||
: m_p(std::move(p)) {}
|
||||
explicit WhipClient(Params p) : m_p(std::move(p)) {}
|
||||
|
||||
~WhipClient() {
|
||||
Stop();
|
||||
}
|
||||
~WhipClient() { Stop(); }
|
||||
|
||||
void Start() {
|
||||
std::lock_guard lk(m_mtx);
|
||||
if (m_started) return;
|
||||
void Start()
|
||||
{
|
||||
std::lock_guard lk(m_mtx);
|
||||
if (m_started)
|
||||
return;
|
||||
|
||||
// ----- PeerConnection -----
|
||||
rtc::Configuration cfg;
|
||||
// No STUN/TURN required for MediaMTX WHIP (server is public/ICE-lite).
|
||||
// cfg.iceServers = { }; // default
|
||||
rtc::Configuration cfg;
|
||||
m_pc = std::make_shared<rtc::PeerConnection>(cfg);
|
||||
|
||||
m_pc = std::make_shared<rtc::PeerConnection>(cfg);
|
||||
|
||||
// Optional: observe connection state / ICE
|
||||
m_pc->onStateChange([this](rtc::PeerConnection::State s){
|
||||
spdlog::info("WHIP pc state: {}", (int)s);
|
||||
});
|
||||
m_pc->onGatheringStateChange([this](rtc::PeerConnection::GatheringState s){
|
||||
spdlog::info("WHIP gathering state: {}", (int)s);
|
||||
});
|
||||
m_pc->onLocalDescription([this](rtc::Description desc){
|
||||
// When we have the local SDP, POST (WHIP) to get the answer
|
||||
m_pc->onStateChange([this](rtc::PeerConnection::State s)
|
||||
{ spdlog::info("WHIP pc state: {}", (int)s); });
|
||||
m_pc->onGatheringStateChange([this](rtc::PeerConnection::GatheringState s)
|
||||
{ spdlog::info("WHIP gathering state: {}", (int)s); });
|
||||
m_pc->onLocalDescription([this](rtc::Description desc)
|
||||
{
|
||||
try {
|
||||
const std::string offer = std::string(desc);
|
||||
auto [answer, resourceUrl] = PostOfferWHIP(offer);
|
||||
@@ -66,158 +67,171 @@ public:
|
||||
spdlog::info("WHIP remote description set");
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::error("WHIP POST offer failed: {}", e.what());
|
||||
} });
|
||||
m_pc->onLocalCandidate([this](rtc::Candidate c)
|
||||
{
|
||||
if (!m_resourceUrl) return;
|
||||
try { PatchCandidateWHIP(c); }
|
||||
catch (const std::exception& e) { spdlog::warn("WHIP PATCH candidate failed: {}", e.what()); } });
|
||||
|
||||
// Create a sendonly audio track
|
||||
rtc::Description::Audio audioDesc("audio", rtc::Description::Direction::SendOnly);
|
||||
// Opus PT typically negotiated to 111 by browsers; we'll use 111 in RTP header
|
||||
m_track = m_pc->addTrack(audioDesc);
|
||||
|
||||
// Initialize RTP state (random SSRC/seq)
|
||||
std::mt19937 rng{std::random_device{}()};
|
||||
m_ssrc = std::uniform_int_distribution<uint32_t>()(rng);
|
||||
m_seq = std::uniform_int_distribution<uint16_t>()(rng);
|
||||
m_ts = 0; // RTP clock for Opus is 48kHz
|
||||
|
||||
// Create SDP offer (triggers onLocalDescription)
|
||||
m_pc->setLocalDescription();
|
||||
|
||||
m_started = true;
|
||||
}
|
||||
|
||||
void Stop()
|
||||
{
|
||||
std::lock_guard lk(m_mtx);
|
||||
if (!m_started)
|
||||
return;
|
||||
|
||||
m_track.reset();
|
||||
m_pc.reset();
|
||||
|
||||
m_resourceUrl.reset();
|
||||
m_started = false;
|
||||
}
|
||||
|
||||
// Call this from your Opus encoder callback.
|
||||
// opusData: encoded Opus frame bytes
|
||||
// opusBytes: length
|
||||
// pcmFramesPerChannel: number of PCM samples per channel represented by this Opus frame (e.g., 960 for 20ms @48k).
|
||||
void PushOpus(const unsigned char *opusData, size_t opusBytes, int pcmFramesPerChannel)
|
||||
{
|
||||
std::lock_guard lk(m_mtx);
|
||||
if (!m_track || !m_started)
|
||||
return;
|
||||
|
||||
// Build RTP header (12 bytes)
|
||||
std::array<uint8_t, 12> rtp{};
|
||||
rtp[0] = 0x80; // V=2, P=0, X=0, CC=0
|
||||
rtp[1] = 0x80 | (m_pt & 0x7F); // M=1 (end of frame), PT
|
||||
rtp[2] = uint8_t(m_seq >> 8);
|
||||
rtp[3] = uint8_t(m_seq & 0xFF);
|
||||
rtp[4] = uint8_t(m_ts >> 24);
|
||||
rtp[5] = uint8_t((m_ts >> 16) & 0xFF);
|
||||
rtp[6] = uint8_t((m_ts >> 8) & 0xFF);
|
||||
rtp[7] = uint8_t(m_ts & 0xFF);
|
||||
rtp[8] = uint8_t(m_ssrc >> 24);
|
||||
rtp[9] = uint8_t((m_ssrc >> 16) & 0xFF);
|
||||
rtp[10] = uint8_t((m_ssrc >> 8) & 0xFF);
|
||||
rtp[11] = uint8_t(m_ssrc & 0xFF);
|
||||
|
||||
// Concatenate header + payload
|
||||
rtc::binary packet;
|
||||
packet.resize(rtp.size() + opusBytes);
|
||||
std::memcpy(packet.data(), rtp.data(), rtp.size());
|
||||
std::memcpy(packet.data() + rtp.size(), opusData, opusBytes);
|
||||
|
||||
// Send RTP on the media track
|
||||
m_track->send(packet); // libdatachannel expects RTP bytes on tracks
|
||||
|
||||
// Advance RTP state
|
||||
++m_seq;
|
||||
// For Opus @ 48k clock, timestamp increments by the PCM frame count per channel
|
||||
// (do not multiply by channels)
|
||||
m_ts += static_cast<uint32_t>(pcmFramesPerChannel);
|
||||
}
|
||||
|
||||
private:
|
||||
Params m_p;
|
||||
std::shared_ptr<rtc::PeerConnection> m_pc;
|
||||
std::shared_ptr<rtc::Track> m_track;
|
||||
std::optional<std::string> m_resourceUrl;
|
||||
std::mutex m_mtx;
|
||||
std::atomic<bool> m_started{false};
|
||||
|
||||
// RTP state
|
||||
uint32_t m_ssrc = 0;
|
||||
uint16_t m_seq = 0;
|
||||
uint32_t m_ts = 0;
|
||||
uint8_t m_pt = 111; // dynamic payload type commonly used for Opus
|
||||
|
||||
// --- WHIP HTTP helpers (mTLS-capable) ---
|
||||
|
||||
static std::tuple<std::string, std::string> ExtractAnswerAndLocation(const httplib::Result &r)
|
||||
{
|
||||
if (!r)
|
||||
throw std::runtime_error("No HTTP result");
|
||||
if (r->status != 201 && r->status != 200)
|
||||
throw std::runtime_error("Unexpected WHIP status: " + std::to_string(r->status));
|
||||
std::string answer = r->body;
|
||||
std::string resourceUrl;
|
||||
if (r->has_header("Location"))
|
||||
resourceUrl = r->get_header_value("Location");
|
||||
return {answer, resourceUrl};
|
||||
}
|
||||
|
||||
std::pair<std::string, std::string> PostOfferWHIP(const std::string &sdpOffer)
|
||||
{
|
||||
auto [cli, path] = MakeClientForUrl(sdpOffer, m_p.whipUrl);
|
||||
auto res = cli->Post(path.c_str(), sdpOffer, "application/sdp");
|
||||
auto [answer, resUrl] = ExtractAnswerAndLocation(res);
|
||||
return {answer, resUrl};
|
||||
}
|
||||
|
||||
void PatchCandidateWHIP(const rtc::Candidate &cand)
|
||||
{
|
||||
if (!m_resourceUrl)
|
||||
return;
|
||||
auto [cli, path] = MakeClientForUrl(std::string(""), *m_resourceUrl);
|
||||
std::string frag = "a=" + std::string(cand);
|
||||
auto res = cli->Patch(path.c_str(), frag, "application/trickle-ice-sdpfrag");
|
||||
if (!res || (res->status < 200 || res->status >= 300))
|
||||
{
|
||||
spdlog::warn("WHIP PATCH {} -> {}", path, res ? res->status : -1);
|
||||
}
|
||||
});
|
||||
m_pc->onLocalCandidate([this](rtc::Candidate c) {
|
||||
// Trickle via PATCH to WHIP resource (if server requires)
|
||||
if (!m_resourceUrl.has_value()) return;
|
||||
try {
|
||||
PatchCandidateWHIP(c);
|
||||
} catch (const std::exception& e) {
|
||||
spdlog::warn("WHIP PATCH candidate failed: {}", e.what());
|
||||
}
|
||||
});
|
||||
|
||||
// ----- Audio track / source -----
|
||||
m_audioSource = rtc::CreateAudioSource(m_p.sampleRate, m_p.channels);
|
||||
m_track = m_pc->addTrack(m_audioSource);
|
||||
|
||||
// ----- Create offer -----
|
||||
m_pc->setLocalDescription(); // triggers onLocalDescription callback
|
||||
|
||||
m_started = true;
|
||||
}
|
||||
|
||||
void Stop() {
|
||||
std::lock_guard lk(m_mtx);
|
||||
if (!m_started) return;
|
||||
|
||||
if (m_track) m_track = nullptr;
|
||||
if (m_audioSource) m_audioSource = nullptr;
|
||||
if (m_pc) m_pc = nullptr;
|
||||
|
||||
// No explicit WHIP DELETE here, but you can add it if server supports deleting the resource.
|
||||
|
||||
// cleanup resource url
|
||||
m_resourceUrl.reset();
|
||||
m_started = false;
|
||||
}
|
||||
|
||||
// Feed PCM (float32) in interleaved format. frames = samples per channel.
|
||||
void PushPCM(const float* interleaved, size_t frames) {
|
||||
std::lock_guard lk(m_mtx);
|
||||
if (!m_audioSource) return;
|
||||
// libdatachannel expects planar/int16? Actually CreateAudioSource accepts float32
|
||||
// and uses (frames, channels) to encode Opus internally.
|
||||
// Push signature is: audioSource->pushFloat(interleaved, frames, channels, sampleRate)
|
||||
m_audioSource->pushFloat(interleaved, (int)frames, m_p.channels, m_p.sampleRate);
|
||||
}
|
||||
|
||||
private:
|
||||
Params m_p;
|
||||
std::shared_ptr<rtc::PeerConnection> m_pc;
|
||||
std::shared_ptr<rtc::AudioSource> m_audioSource;
|
||||
std::shared_ptr<rtc::Track> m_track;
|
||||
std::optional<std::string> m_resourceUrl;
|
||||
std::mutex m_mtx;
|
||||
std::atomic<bool> m_started{false};
|
||||
|
||||
// --- WHIP HTTP helpers (mTLS-capable) ---
|
||||
|
||||
static std::tuple<std::string,std::string> ExtractAnswerAndLocation(const httplib::Result& r) {
|
||||
// WHIP server responds 201 Created, body = SDP answer (text/plain),
|
||||
// Location header = resource URL for PATCH candidates.
|
||||
if (!r) throw std::runtime_error("No HTTP result");
|
||||
if (r->status != 201 && r->status != 200)
|
||||
throw std::runtime_error("Unexpected WHIP status: " + std::to_string(r->status));
|
||||
std::string answer = r->body;
|
||||
std::string resourceUrl;
|
||||
if (r->has_header("Location")) resourceUrl = r->get_header_value("Location");
|
||||
return {answer, resourceUrl};
|
||||
}
|
||||
|
||||
std::pair<std::string,std::string> PostOfferWHIP(const std::string& sdpOffer) {
|
||||
auto [cli, path] = MakeClientForUrl(m_p.whipUrl);
|
||||
// WHIP requires:
|
||||
// POST <endpoint> Content-Type: application/sdp Body: offer SDP
|
||||
// Response: 201 Created, body: answer SDP, Location: resource URL
|
||||
auto res = cli->Post(path.c_str(), sdpOffer, "application/sdp");
|
||||
auto [answer, resUrl] = ExtractAnswerAndLocation(res);
|
||||
return {answer, resUrl};
|
||||
}
|
||||
|
||||
void PatchCandidateWHIP(const rtc::Candidate& cand) {
|
||||
if (!m_resourceUrl) return;
|
||||
auto [cli, path] = MakeClientForUrl(*m_resourceUrl);
|
||||
// WHIP trickle: PATCH resource with Content-Type: application/trickle-ice-sdpfrag
|
||||
// Body is an SDP fragment with "a=candidate:..."
|
||||
std::string frag = "a=" + std::string(cand);
|
||||
auto res = cli->Patch(path.c_str(), frag, "application/trickle-ice-sdpfrag");
|
||||
if (!res || (res->status < 200 || res->status >= 300)) {
|
||||
spdlog::warn("WHIP PATCH {} -> {}", path, res ? res->status : -1);
|
||||
}
|
||||
}
|
||||
|
||||
// Parse URL, return httplib client + path, configured with mTLS if https
|
||||
static std::pair<std::unique_ptr<httplib::Client>, std::string>
|
||||
MakeClientForUrl(const std::string& url) {
|
||||
// scheme://host[:port]/path...
|
||||
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)");
|
||||
std::smatch m;
|
||||
if (!std::regex_match(url, m, re)) {
|
||||
throw std::runtime_error("Invalid URL: " + url);
|
||||
}
|
||||
std::string scheme = m[1].str();
|
||||
std::string host = m[2].str();
|
||||
int port = m[3].matched ? std::stoi(m[3].str()) : (scheme=="https"?443:80);
|
||||
std::string path = m[4].matched ? m[4].str() : "/";
|
||||
// Build client + path for URL; uses mTLS when https
|
||||
std::pair<std::unique_ptr<httplib::SSLClient>, std::string>
|
||||
MakeClientForUrl(const std::string &body, const std::string &url)
|
||||
{
|
||||
(void)body;
|
||||
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)");
|
||||
std::smatch m;
|
||||
if (!std::regex_match(url, m, re))
|
||||
throw std::runtime_error("Invalid URL: " + url);
|
||||
std::string scheme = m[1].str();
|
||||
std::string host = m[2].str();
|
||||
int port = m[3].matched ? std::stoi(m[3].str()) : (scheme == "https" ? 443 : 80);
|
||||
std::string path = m[4].matched ? m[4].str() : "/";
|
||||
|
||||
// We need the mTLS params; capture via thread-local? We’ll pass via this pointer.
|
||||
// To access instance fields here, make this non-static or pass params through a lambda.
|
||||
// Simpler: look up global thread-local; or we restructure to capture this.
|
||||
// For cleanliness, we’ll make a non-static wrapper below.
|
||||
throw std::logic_error("Use MakeClientForUrl_inst instead");
|
||||
}
|
||||
|
||||
std::pair<std::unique_ptr<httplib::Client>, std::string>
|
||||
MakeClientForUrl_inst(const std::string& url) {
|
||||
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)");
|
||||
std::smatch m;
|
||||
if (!std::regex_match(url, m, re)) {
|
||||
throw std::runtime_error("Invalid URL: " + url);
|
||||
}
|
||||
std::string scheme = m[1].str();
|
||||
std::string host = m[2].str();
|
||||
int port = m[3].matched ? std::stoi(m[3].str()) : (scheme=="https"?443:80);
|
||||
std::string path = m[4].matched ? m[4].str() : "/";
|
||||
|
||||
if (scheme == "https") {
|
||||
if (scheme == "https")
|
||||
{
|
||||
#ifndef CPPHTTPLIB_OPENSSL_SUPPORT
|
||||
throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled");
|
||||
throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled");
|
||||
#else
|
||||
auto cli = std::make_unique<httplib::SSLClient>(host.c_str(), port);
|
||||
cli->enable_server_certificate_verification(true);
|
||||
cli->set_ca_cert_path(m_p.caPath.c_str());
|
||||
cli->set_client_cert_file(m_p.crtPath.c_str(), m_p.keyPath.c_str(), nullptr);
|
||||
cli->set_connection_timeout(10);
|
||||
cli->set_read_timeout(60);
|
||||
cli->set_write_timeout(60);
|
||||
return {std::move(cli), path};
|
||||
auto cli = std::make_unique<httplib::SSLClient>(host.c_str(), port, m_p.crtPath, m_p.keyPath, std::string());
|
||||
cli->enable_server_certificate_verification(true);
|
||||
cli->set_ca_cert_path(m_p.caPath.c_str());
|
||||
cli->set_connection_timeout(10);
|
||||
cli->set_read_timeout(60);
|
||||
cli->set_write_timeout(60);
|
||||
return {std::move(cli), path};
|
||||
#endif
|
||||
} else {
|
||||
auto cli = std::make_unique<httplib::Client>(host.c_str(), port);
|
||||
cli->set_connection_timeout(10);
|
||||
cli->set_read_timeout(60);
|
||||
cli->set_write_timeout(60);
|
||||
return {std::move(cli), path};
|
||||
}
|
||||
// else
|
||||
// {
|
||||
// auto cli = std::make_unique<httplib::Client>(host.c_str(), port);
|
||||
// cli->set_connection_timeout(10);
|
||||
// cli->set_read_timeout(60);
|
||||
// cli->set_write_timeout(60);
|
||||
// return {std::move(cli), path};
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
// Overload to call instance version
|
||||
std::pair<std::unique_ptr<httplib::Client>, std::string>
|
||||
MakeClientForUrl(const std::string& url) {
|
||||
return MakeClientForUrl_inst(url);
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
} // namespace snoop
|
||||
|
||||
Reference in New Issue
Block a user