audio streams finally working on client side

This commit is contained in:
tdv
2025-10-28 16:52:24 +02:00
parent aec6a51ae6
commit 4d055c343a
2 changed files with 213 additions and 103 deletions

View File

@@ -27,23 +27,20 @@ class AudioStreamService {
// WHIP // WHIP
std::unique_ptr<WhipClient> m_whip; std::unique_ptr<WhipClient> m_whip;
std::mutex m_whipMutex; std::mutex m_whipMutex;
std::string m_tmpKeyPath; // temp key extracted from keyctl (deleted on Stop)
public: public:
explicit AudioStreamService(std::shared_ptr<ConfigService> cfg) explicit AudioStreamService(std::shared_ptr<ConfigService> cfg)
: m_cfg(std::move(cfg)) {} : m_cfg(std::move(cfg)) {}
~AudioStreamService() { ~AudioStreamService() { StopWhip(); }
StopWhip();
}
// Feed raw PCM (float32 interleaved), frames = samples per channel // Feed encoded Opus; frames = PCM samples per channel represented by this Opus frame
void OnOpus(const unsigned char* opusData, size_t opusBytes, int pcmFramesPerChannel) { void OnOpus(const unsigned char* opusData, size_t opusBytes, int pcmFramesPerChannel) {
std::lock_guard lk(m_whipMutex); std::lock_guard lk(m_whipMutex);
if (m_whip) m_whip->PushOpus(opusData, opusBytes, pcmFramesPerChannel); if (m_whip) m_whip->PushOpus(opusData, opusBytes, pcmFramesPerChannel);
} }
bool StartWhip(const std::string& whipUrl, int sampleRate=48000, int channels=1) { bool StartWhip(const std::string& whipUrl, int sampleRate = 48000, int channels = 1) {
std::lock_guard lk(m_whipMutex); std::lock_guard lk(m_whipMutex);
if (m_whip) { if (m_whip) {
spdlog::info("WHIP already started"); spdlog::info("WHIP already started");
@@ -59,30 +56,21 @@ public:
if (!std::filesystem::exists(ca)) ca = "/etc/iot/keys/ca_chain.pem"; if (!std::filesystem::exists(ca)) ca = "/etc/iot/keys/ca_chain.pem";
std::filesystem::path crt = "/etc/iot/keys/device.crt.pem"; std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
// extract client key via keyctl
// auto tmpKey = snoop::device_sec::ExtractClientKeyFromKernelKeyring();
// if (!tmpKey.string().empty()) {
// spdlog::error("Cannot extract client key for WHIP (keyctl user iot-client-key)");
// return false;
// }
WhipClient::Params p{ WhipClient::Params p{
.whipUrl = whipUrl, .whipUrl = whipUrl, // may include ?token=..., client will normalize path and set Bearer
.caPath = ca.string(), .caPath = ca.string(),
.crtPath = crt.string(), .crtPath = crt.string(),
// .keyPath = tmpKey, .sampleRate = sampleRate,
.sampleRate= sampleRate, .channels = channels
.channels = channels
}; };
m_whip = std::make_unique<WhipClient>(p); m_whip = std::make_unique<WhipClient>(p);
try { try {
m_whip->Start(); m_whip->Start();
spdlog::info("WHIP started"); spdlog::info("WHIP started");
// m_tmpKeyPath = tmpKey;
return true; return true;
} catch (const std::exception& e) { } catch (const std::exception& e) {
spdlog::error("WHIP start failed: {}", e.what()); spdlog::error("WHIP start failed: {}", e.what());
// std::error_code ec; std::filesystem::remove(tmpKey, ec);
m_whip.reset(); m_whip.reset();
return false; return false;
} }
@@ -94,14 +82,7 @@ public:
m_whip->Stop(); m_whip->Stop();
m_whip.reset(); m_whip.reset();
} }
if (!m_tmpKeyPath.empty()) {
std::error_code ec; std::filesystem::remove(m_tmpKeyPath, ec);
m_tmpKeyPath.clear();
}
} }
private:
}; };
} // namespace snoop } // namespace snoop

View File

@@ -14,6 +14,7 @@
#include <random> #include <random>
#include <cstdint> #include <cstdint>
#include <cstring> #include <cstring>
#include <sstream>
#include <spdlog/spdlog.h> #include <spdlog/spdlog.h>
#include <nlohmann/json.hpp> #include <nlohmann/json.hpp>
@@ -24,6 +25,7 @@
#include <rtc/description.hpp> #include <rtc/description.hpp>
#include <rtc/track.hpp> #include <rtc/track.hpp>
#include "Security/TlsKeyUtil.h" #include "Security/TlsKeyUtil.h"
#include <locale>
namespace snoop namespace snoop
{ {
@@ -33,16 +35,14 @@ namespace snoop
public: public:
struct Params struct Params
{ {
std::string whipUrl; // full WHIP endpoint (may include ?token=...) std::string whipUrl; // full WHIP endpoint, may include ?token=...
std::string caPath; // CA chain std::string caPath; // CA chain
std::string crtPath; // client cert std::string crtPath; // client cert
// std::string keyPath; // client key (temp extracted from keyctl)
int sampleRate = 48000; int sampleRate = 48000;
int channels = 1; // 1 or 2; timestamp advance uses PCM frames per channel int channels = 1; // RTP timestamp increments by PCM frames/channel
}; };
explicit WhipClient(Params p) : m_p(std::move(p)) {} explicit WhipClient(Params p) : m_p(std::move(p)) {}
~WhipClient() { Stop(); } ~WhipClient() { Stop(); }
void Start() void Start()
@@ -51,6 +51,12 @@ namespace snoop
if (m_started) if (m_started)
return; return;
// Parse & normalize the provided URL now (extract token, add trailing /whip)
m_endpoint = ParseWhipUrl(m_p.whipUrl);
spdlog::info("WHIP normalized: scheme='{}' host='{}' port={} path='{}' auth='{}'",
m_endpoint.scheme, m_endpoint.host, m_endpoint.port, m_endpoint.path,
m_endpoint.bearer ? "yes" : "no");
rtc::Configuration cfg; rtc::Configuration cfg;
m_pc = std::make_shared<rtc::PeerConnection>(cfg); m_pc = std::make_shared<rtc::PeerConnection>(cfg);
@@ -65,7 +71,7 @@ namespace snoop
spdlog::info("Local SDP (first 8 lines):"); spdlog::info("Local SDP (first 8 lines):");
{ {
std::istringstream is(offer); std::istringstream is(offer);
for (int i=0; i<8 && is; ++i) { for (int i = 0; i < 8 && is; ++i) {
std::string line; std::getline(is, line); std::string line; std::getline(is, line);
spdlog::info(" {}", line); spdlog::info(" {}", line);
} }
@@ -77,31 +83,44 @@ namespace snoop
} catch (const std::exception& e) { } catch (const std::exception& e) {
spdlog::error("WHIP POST offer failed: {}", e.what()); spdlog::error("WHIP POST offer failed: {}", e.what());
} }); } });
m_pc->onLocalCandidate([this](rtc::Candidate c) m_pc->onLocalCandidate([this](rtc::Candidate c)
{ {
if (!m_resourceUrl) return; try {
try { PatchCandidateWHIP(c); } std::string frag = "a=" + std::string(c); // libdatachannel gives correct candidate line
catch (const std::exception& e) { spdlog::warn("WHIP PATCH candidate failed: {}", e.what()); } }); PatchSdpFrag(frag); // sends with CRLF + right content-type
} catch (const std::exception& e) {
spdlog::warn("WHIP PATCH candidate failed: {}", e.what());
} });
// 2) When ICE gathering completes, send end-of-candidates
m_pc->onGatheringStateChange([this](rtc::PeerConnection::GatheringState s)
{
spdlog::info("WHIP gathering state: {}", (int)s);
if (s == rtc::PeerConnection::GatheringState::Complete) {
PatchSdpFrag("a=end-of-candidates");
} });
// Create a sendonly audio track // Create a sendonly audio track
rtc::Description::Audio audioDesc("audio", rtc::Description::Direction::SendOnly); rtc::Description::Audio audioDesc("audio", rtc::Description::Direction::SendOnly);
// Opus PT typically negotiated to 111 by browsers; we'll use 111 in RTP header audioDesc.addOpusCodec(111);
m_track = m_pc->addTrack(audioDesc); m_track = m_pc->addTrack(audioDesc);
// IMPORTANT: wait for SRTP sender to be ready // Open/close signals
m_track->onOpen([this] m_track->onOpen([this]
{ {
spdlog::info("WHIP track opened"); spdlog::info("WHIP track opened");
m_trackOpen = true; }); m_trackOpen = true; });
m_track->onClosed([this] m_track->onClosed([this]
{ {
spdlog::info("WHIP track closed"); spdlog::info("WHIP track closed");
m_trackOpen = false; }); m_trackOpen = false; });
// Initialize RTP state (random SSRC/seq)
// RTP state
std::mt19937 rng{std::random_device{}()}; std::mt19937 rng{std::random_device{}()};
m_ssrc = std::uniform_int_distribution<uint32_t>()(rng); m_ssrc = std::uniform_int_distribution<uint32_t>()(rng);
m_seq = std::uniform_int_distribution<uint16_t>()(rng); m_seq = std::uniform_int_distribution<uint16_t>()(rng);
m_ts = 0; // RTP clock for Opus is 48kHz m_ts = 0; // 48k clock for Opus
// Create SDP offer (triggers onLocalDescription) // Create SDP offer (triggers onLocalDescription)
m_pc->setLocalDescription(); m_pc->setLocalDescription();
@@ -114,28 +133,24 @@ namespace snoop
std::lock_guard lk(m_mtx); std::lock_guard lk(m_mtx);
if (!m_started) if (!m_started)
return; return;
m_trackOpen = false; m_trackOpen = false;
m_track.reset(); m_track.reset();
m_pc.reset(); m_pc.reset();
m_resourceUrl.reset(); m_resourceUrl.reset();
m_started = false; m_started = false;
} }
// Call this from your Opus encoder callback. // Push encoded Opus frames as RTP over the libdatachannel track.
// opusData: encoded Opus frame bytes
// opusBytes: length
// pcmFramesPerChannel: number of PCM samples per channel represented by this Opus frame (e.g., 960 for 20ms @48k).
void PushOpus(const unsigned char *opusData, size_t opusBytes, int pcmFramesPerChannel) void PushOpus(const unsigned char *opusData, size_t opusBytes, int pcmFramesPerChannel)
{ {
std::lock_guard lk(m_mtx); std::lock_guard lk(m_mtx);
if (!m_track || !m_started || !m_trackOpen.load()) if (!m_track || !m_started || !m_trackOpen.load())
return; return;
// Build RTP header (12 bytes)
std::array<uint8_t, 12> rtp{}; std::array<uint8_t, 12> rtp{};
rtp[0] = 0x80; // V=2, P=0, X=0, CC=0 rtp[0] = 0x80; // V=2
rtp[1] = 0x80 | (m_pt & 0x7F); // M=1 (end of frame), PT rtp[1] = 0x80 | (m_pt & 0x7F); // M=1, PT
rtp[2] = uint8_t(m_seq >> 8); rtp[2] = uint8_t(m_seq >> 8);
rtp[3] = uint8_t(m_seq & 0xFF); rtp[3] = uint8_t(m_seq & 0xFF);
rtp[4] = uint8_t(m_ts >> 24); rtp[4] = uint8_t(m_ts >> 24);
@@ -147,23 +162,26 @@ namespace snoop
rtp[10] = uint8_t((m_ssrc >> 8) & 0xFF); rtp[10] = uint8_t((m_ssrc >> 8) & 0xFF);
rtp[11] = uint8_t(m_ssrc & 0xFF); rtp[11] = uint8_t(m_ssrc & 0xFF);
// Concatenate header + payload
rtc::binary packet; rtc::binary packet;
packet.resize(rtp.size() + opusBytes); packet.resize(rtp.size() + opusBytes);
std::memcpy(packet.data(), rtp.data(), rtp.size()); std::memcpy(packet.data(), rtp.data(), rtp.size());
std::memcpy(packet.data() + rtp.size(), opusData, opusBytes); std::memcpy(packet.data() + rtp.size(), opusData, opusBytes);
m_track->send(packet);
// Send RTP on the media track
m_track->send(packet); // libdatachannel expects RTP bytes on tracks
// Advance RTP state
++m_seq; ++m_seq;
// For Opus @ 48k clock, timestamp increments by the PCM frame count per channel
// (do not multiply by channels)
m_ts += static_cast<uint32_t>(pcmFramesPerChannel); m_ts += static_cast<uint32_t>(pcmFramesPerChannel);
} }
private: private:
struct ParsedUrl
{
std::string scheme;
std::string host;
int port = 0;
std::string path; // normalized; includes leading '/', **ends with '/whip'**
std::optional<std::string> bearer; // token from ?token=...
};
Params m_p; Params m_p;
std::shared_ptr<rtc::PeerConnection> m_pc; std::shared_ptr<rtc::PeerConnection> m_pc;
std::shared_ptr<rtc::Track> m_track; std::shared_ptr<rtc::Track> m_track;
@@ -171,21 +189,84 @@ namespace snoop
std::mutex m_mtx; std::mutex m_mtx;
std::atomic<bool> m_started{false}; std::atomic<bool> m_started{false};
std::atomic<bool> m_trackOpen{false}; std::atomic<bool> m_trackOpen{false};
ParsedUrl m_endpoint;
// RTP state // RTP state
uint32_t m_ssrc = 0; uint32_t m_ssrc = 0;
uint16_t m_seq = 0; uint16_t m_seq = 0;
uint32_t m_ts = 0; uint32_t m_ts = 0;
uint8_t m_pt = 111; // dynamic payload type commonly used for Opus uint8_t m_pt = 111;
// --- WHIP HTTP helpers (mTLS-capable) --- // ---------- URL parsing & normalization ----------
static ParsedUrl ParseWhipUrl(const std::string &input)
{
// scheme://host[:port]/path[?query]
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/[^?]*)?(?:\?(.+))?$)");
std::smatch m;
if (!std::regex_match(input, m, re))
{
throw std::runtime_error("Invalid WHIP URL: " + input);
}
ParsedUrl out;
out.scheme = m[1].str();
if (out.scheme != "https")
{
throw std::runtime_error("Only HTTPS WHIP endpoints are supported");
}
out.host = m[2].str();
out.port = m[3].matched ? std::stoi(m[3].str()) : 443;
std::string rawPath = m[4].matched ? m[4].str() : "/";
std::string query = m[5].matched ? m[5].str() : "";
// Extract token=... from query (no decoding needed for your JWT)
if (!query.empty())
{
// naive split, fine for `token=...`
size_t pos = 0;
while (pos < query.size())
{
size_t amp = query.find('&', pos);
if (amp == std::string::npos)
amp = query.size();
auto kv = query.substr(pos, amp - pos);
auto eq = kv.find('=');
if (eq != std::string::npos)
{
auto key = kv.substr(0, eq);
auto val = kv.substr(eq + 1);
if (key == "token" && !val.empty())
{
out.bearer = val;
}
}
pos = amp + (amp < query.size() ? 1 : 0);
}
}
// MediaMTX wants: /whip/<original...>/whip
// Your incoming URLs already start with /whip/... — we just ensure they end with /whip.
if (rawPath.empty())
rawPath = "/";
if (rawPath.back() == '/')
rawPath.pop_back();
if (rawPath.rfind("/whip", std::string::npos) != rawPath.size() - 5)
{
rawPath += "/whip";
}
out.path = rawPath;
return out;
}
static std::tuple<std::string, std::string> ExtractAnswerAndLocation(const httplib::Result &r) static std::tuple<std::string, std::string> ExtractAnswerAndLocation(const httplib::Result &r)
{ {
if (!r) if (!r)
throw std::runtime_error("No HTTP result"); throw std::runtime_error("No HTTP result");
if (r->status != 201 && r->status != 200) if (r->status != 201 && r->status != 200)
{
throw std::runtime_error("Unexpected WHIP status: " + std::to_string(r->status)); throw std::runtime_error("Unexpected WHIP status: " + std::to_string(r->status));
}
std::string answer = r->body; std::string answer = r->body;
std::string resourceUrl; std::string resourceUrl;
if (r->has_header("Location")) if (r->has_header("Location"))
@@ -193,98 +274,146 @@ namespace snoop
return {answer, resourceUrl}; return {answer, resourceUrl};
} }
// std::pair<std::string, std::string> PostOfferWHIP(const std::string &sdpOffer)
// {
// auto [cli, path] = MakeClientForUrl(sdpOffer, m_p.whipUrl);
// auto res = cli->Post(path.c_str(), sdpOffer, "application/sdp");
// auto [answer, resUrl] = ExtractAnswerAndLocation(res);
// return {answer, resUrl};
// }
std::pair<std::string, std::string> PostOfferWHIP(const std::string &sdpOffer) std::pair<std::string, std::string> PostOfferWHIP(const std::string &sdpOffer)
{ {
auto [cli, path] = MakeClientForUrl(sdpOffer, m_p.whipUrl); auto cli = MakeClient(m_endpoint);
httplib::Headers hs{ httplib::Headers hs{
{"Content-Type", "application/sdp"}, {"Content-Type", "application/sdp"},
{"Accept", "application/sdp"}}; {"Accept", "application/sdp"}};
if (m_endpoint.bearer)
{
hs.emplace("Authorization", "Bearer " + *m_endpoint.bearer);
}
spdlog::info("WHIP POST url='{}' path='{}' offer-bytes={}", m_p.whipUrl, path, sdpOffer.size()); spdlog::info("WHIP POST url='{}://{}:{}' path='{}' offer-bytes={}",
auto res = cli->Post(path.c_str(), hs, sdpOffer, "application/sdp"); m_endpoint.scheme, m_endpoint.host, m_endpoint.port, m_endpoint.path, sdpOffer.size());
auto res = cli->Post(m_endpoint.path.c_str(), hs, sdpOffer, "application/sdp");
if (!res) if (!res)
throw std::runtime_error("No HTTP result (network?)"); throw std::runtime_error("No HTTP result (network?)");
const auto ctype = res->get_header_value("Content-Type");
const bool has_loc = res->has_header("Location");
spdlog::info("WHIP POST -> status={} len={} location='{}'", spdlog::info("WHIP POST -> status={} len={} location='{}'",
res->status, res->body.size(), res->status, res->body.size(),
res->has_header("Location") ? res->get_header_value("Location") : "<none>"); res->has_header("Location") ? res->get_header_value("Location") : "<none>",
ctype);
if (res->status != 201 && res->status != 200) if (res->status != 201 && res->status != 200)
{ {
// Surface server body for debugging (404s, auth errors, etc.)
spdlog::error("WHIP POST body:\n{}", res->body); spdlog::error("WHIP POST body:\n{}", res->body);
throw std::runtime_error("Unexpected WHIP status: " + std::to_string(res->status)); throw std::runtime_error("Unexpected WHIP status: " + std::to_string(res->status));
} }
std::string answer = res->body; std::string answer = res->body;
std::string resourceUrl; std::string resourceUrl;
if (res->has_header("Location")) if (res->has_header("Location"))
resourceUrl = res->get_header_value("Location"); resourceUrl = res->get_header_value("Location");
if (answer.find("a=ice-ufrag:") == std::string::npos)
{
spdlog::warn("Answer SDP has no top-level a=ice-ufrag; continuing. Body (first lines):");
std::istringstream is(answer);
for (int i = 0; i < 12 && is; ++i)
{
std::string line;
std::getline(is, line);
spdlog::warn(" {}", line);
}
}
return {answer, resourceUrl}; return {answer, resourceUrl};
} }
void PatchCandidateWHIP(const rtc::Candidate &cand) void PatchCandidateWHIP(const rtc::Candidate &cand)
{ {
// If MediaMTX returns an absolute Location for the resource, we parse it;
// otherwise we reuse the same host/port and use Location as path.
if (!m_resourceUrl) if (!m_resourceUrl)
return; return;
auto [cli, path] = MakeClientForUrl(std::string(""), *m_resourceUrl);
ParsedUrl target;
try
{
target = ParseWhipUrl(*m_resourceUrl);
}
catch (...)
{
// Not a full URL; treat resourceUrl as path on the same host:port, still send auth header.
target = m_endpoint;
target.path = *m_resourceUrl;
}
auto cli = MakeClient(target);
std::string frag = "a=" + std::string(cand); std::string frag = "a=" + std::string(cand);
auto res = cli->Patch(path.c_str(), frag, "application/trickle-ice-sdpfrag");
httplib::Headers hs{
{"Content-Type", "application/trickle-ice-sdpfrag"}};
if (m_endpoint.bearer)
{
hs.emplace("Authorization", "Bearer " + *m_endpoint.bearer);
}
auto res = cli->Patch(target.path.c_str(), hs, frag, "application/trickle-ice-sdpfrag");
if (!res || (res->status < 200 || res->status >= 300)) if (!res || (res->status < 200 || res->status >= 300))
{ {
spdlog::warn("WHIP PATCH {} -> {}", path, res ? res->status : -1); spdlog::warn("WHIP PATCH {} -> {}", target.path, res ? res->status : -1);
} }
} }
// Build client + path for URL; uses mTLS when https std::unique_ptr<httplib::SSLClient> MakeClient(const ParsedUrl &u)
std::pair<std::unique_ptr<httplib::SSLClient>, std::string>
MakeClientForUrl(const std::string &body, const std::string &url)
{ {
(void)body; if (u.scheme == "https")
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)");
std::smatch m;
if (!std::regex_match(url, m, re))
throw std::runtime_error("Invalid URL: " + url);
std::string scheme = m[1].str();
std::string host = m[2].str();
int port = m[3].matched ? std::stoi(m[3].str()) : (scheme == "https" ? 443 : 80);
std::string path = m[4].matched ? m[4].str() : "/";
if (scheme == "https")
{ {
#ifndef CPPHTTPLIB_OPENSSL_SUPPORT #ifndef CPPHTTPLIB_OPENSSL_SUPPORT
throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled"); throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled");
#else #else
// mTLS: write client key payload to a temp file so OpenSSL can read it
auto payload = snoop::device_sec::ReadClientKeyPayloadFromKeyring(); auto payload = snoop::device_sec::ReadClientKeyPayloadFromKeyring();
// pick a writable dir (use /tmp to avoid /run root perms surprises)
snoop::device_sec::TempFile tf(std::filesystem::temp_directory_path()); snoop::device_sec::TempFile tf(std::filesystem::temp_directory_path());
tf.write_all(payload.data(), payload.size()); tf.write_all(payload.data(), payload.size());
auto cli = std::make_unique<httplib::SSLClient>(host.c_str(), port, m_p.crtPath, tf.path, std::string()); auto cli = std::make_unique<httplib::SSLClient>(u.host.c_str(), u.port,
m_p.crtPath.c_str(), tf.path.c_str(),
std::string());
cli->enable_server_certificate_verification(false); cli->enable_server_certificate_verification(false);
cli->set_ca_cert_path(m_p.caPath.c_str()); cli->set_ca_cert_path(m_p.caPath.c_str());
cli->set_connection_timeout(10); cli->set_connection_timeout(10);
cli->set_read_timeout(60); cli->set_read_timeout(60);
cli->set_write_timeout(60); cli->set_write_timeout(60);
return {std::move(cli), path}; return cli;
#endif #endif
} }
// else }
// {
// auto cli = std::make_unique<httplib::Client>(host.c_str(), port); void PatchSdpFrag(const std::string &sdpfrag)
// cli->set_connection_timeout(10); {
// cli->set_read_timeout(60); if (!m_resourceUrl)
// cli->set_write_timeout(60); return;
// return {std::move(cli), path};
// } // Reuse the same host/port and just set path = Location
ParsedUrl target = m_endpoint;
target.path = *m_resourceUrl;
auto cli = MakeClient(target);
httplib::Headers hs{
{"Content-Type", "application/trickle-ice-sdpfrag"}};
if (m_endpoint.bearer)
{
hs.emplace("Authorization", "Bearer " + *m_endpoint.bearer);
}
// MUST include CRLF at end of body
std::string body = sdpfrag;
if (body.empty() || body.back() != '\n')
body += "\r\n";
auto res = cli->Patch(target.path.c_str(), hs, body, "application/trickle-ice-sdpfrag");
if (!res || !(res->status == 200 || res->status == 201 || res->status == 204))
{
spdlog::warn("WHIP PATCH {} -> {}", target.path, res ? res->status : -1);
}
} }
}; };