diff --git a/src/Services/AudioStreamService.h b/src/Services/AudioStreamService.h index beacb8d..34b4672 100644 --- a/src/Services/AudioStreamService.h +++ b/src/Services/AudioStreamService.h @@ -27,23 +27,20 @@ class AudioStreamService { // WHIP std::unique_ptr m_whip; std::mutex m_whipMutex; - std::string m_tmpKeyPath; // temp key extracted from keyctl (deleted on Stop) public: explicit AudioStreamService(std::shared_ptr cfg) : m_cfg(std::move(cfg)) {} - ~AudioStreamService() { - StopWhip(); - } + ~AudioStreamService() { StopWhip(); } - // Feed raw PCM (float32 interleaved), frames = samples per channel + // Feed encoded Opus; frames = PCM samples per channel represented by this Opus frame void OnOpus(const unsigned char* opusData, size_t opusBytes, int pcmFramesPerChannel) { std::lock_guard lk(m_whipMutex); if (m_whip) m_whip->PushOpus(opusData, opusBytes, pcmFramesPerChannel); } - bool StartWhip(const std::string& whipUrl, int sampleRate=48000, int channels=1) { + bool StartWhip(const std::string& whipUrl, int sampleRate = 48000, int channels = 1) { std::lock_guard lk(m_whipMutex); if (m_whip) { spdlog::info("WHIP already started"); @@ -59,30 +56,21 @@ public: if (!std::filesystem::exists(ca)) ca = "/etc/iot/keys/ca_chain.pem"; std::filesystem::path crt = "/etc/iot/keys/device.crt.pem"; - // extract client key via keyctl - // auto tmpKey = snoop::device_sec::ExtractClientKeyFromKernelKeyring(); - // if (!tmpKey.string().empty()) { - // spdlog::error("Cannot extract client key for WHIP (keyctl user iot-client-key)"); - // return false; - // } - WhipClient::Params p{ - .whipUrl = whipUrl, - .caPath = ca.string(), - .crtPath = crt.string(), - // .keyPath = tmpKey, - .sampleRate= sampleRate, - .channels = channels + .whipUrl = whipUrl, // may include ?token=..., client will normalize path and set Bearer + .caPath = ca.string(), + .crtPath = crt.string(), + .sampleRate = sampleRate, + .channels = channels }; + m_whip = std::make_unique(p); try { m_whip->Start(); spdlog::info("WHIP started"); - // m_tmpKeyPath = tmpKey; return true; } catch (const std::exception& e) { spdlog::error("WHIP start failed: {}", e.what()); - // std::error_code ec; std::filesystem::remove(tmpKey, ec); m_whip.reset(); return false; } @@ -94,14 +82,7 @@ public: m_whip->Stop(); m_whip.reset(); } - if (!m_tmpKeyPath.empty()) { - std::error_code ec; std::filesystem::remove(m_tmpKeyPath, ec); - m_tmpKeyPath.clear(); - } } - -private: - }; } // namespace snoop diff --git a/src/Services/WhipClient.h b/src/Services/WhipClient.h index b6ceaae..b21a1c7 100644 --- a/src/Services/WhipClient.h +++ b/src/Services/WhipClient.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -24,6 +25,7 @@ #include #include #include "Security/TlsKeyUtil.h" +#include namespace snoop { @@ -33,16 +35,14 @@ namespace snoop public: struct Params { - std::string whipUrl; // full WHIP endpoint (may include ?token=...) + std::string whipUrl; // full WHIP endpoint, may include ?token=... std::string caPath; // CA chain std::string crtPath; // client cert - // std::string keyPath; // client key (temp extracted from keyctl) int sampleRate = 48000; - int channels = 1; // 1 or 2; timestamp advance uses PCM frames per channel + int channels = 1; // RTP timestamp increments by PCM frames/channel }; explicit WhipClient(Params p) : m_p(std::move(p)) {} - ~WhipClient() { Stop(); } void Start() @@ -51,6 +51,12 @@ namespace snoop if (m_started) return; + // Parse & normalize the provided URL now (extract token, add trailing /whip) + m_endpoint = ParseWhipUrl(m_p.whipUrl); + spdlog::info("WHIP normalized: scheme='{}' host='{}' port={} path='{}' auth='{}'", + m_endpoint.scheme, m_endpoint.host, m_endpoint.port, m_endpoint.path, + m_endpoint.bearer ? "yes" : "no"); + rtc::Configuration cfg; m_pc = std::make_shared(cfg); @@ -65,7 +71,7 @@ namespace snoop spdlog::info("Local SDP (first 8 lines):"); { std::istringstream is(offer); - for (int i=0; i<8 && is; ++i) { + for (int i = 0; i < 8 && is; ++i) { std::string line; std::getline(is, line); spdlog::info(" {}", line); } @@ -77,31 +83,44 @@ namespace snoop } catch (const std::exception& e) { spdlog::error("WHIP POST offer failed: {}", e.what()); } }); + m_pc->onLocalCandidate([this](rtc::Candidate c) { - if (!m_resourceUrl) return; - try { PatchCandidateWHIP(c); } - catch (const std::exception& e) { spdlog::warn("WHIP PATCH candidate failed: {}", e.what()); } }); + try { + std::string frag = "a=" + std::string(c); // libdatachannel gives correct candidate line + PatchSdpFrag(frag); // sends with CRLF + right content-type + } catch (const std::exception& e) { + spdlog::warn("WHIP PATCH candidate failed: {}", e.what()); + } }); + + // 2) When ICE gathering completes, send end-of-candidates + m_pc->onGatheringStateChange([this](rtc::PeerConnection::GatheringState s) + { + spdlog::info("WHIP gathering state: {}", (int)s); + if (s == rtc::PeerConnection::GatheringState::Complete) { + PatchSdpFrag("a=end-of-candidates"); + } }); // Create a sendonly audio track rtc::Description::Audio audioDesc("audio", rtc::Description::Direction::SendOnly); - // Opus PT typically negotiated to 111 by browsers; we'll use 111 in RTP header + audioDesc.addOpusCodec(111); m_track = m_pc->addTrack(audioDesc); - // IMPORTANT: wait for SRTP sender to be ready + // Open/close signals m_track->onOpen([this] { - spdlog::info("WHIP track opened"); - m_trackOpen = true; }); + spdlog::info("WHIP track opened"); + m_trackOpen = true; }); m_track->onClosed([this] { - spdlog::info("WHIP track closed"); - m_trackOpen = false; }); - // Initialize RTP state (random SSRC/seq) + spdlog::info("WHIP track closed"); + m_trackOpen = false; }); + + // RTP state std::mt19937 rng{std::random_device{}()}; m_ssrc = std::uniform_int_distribution()(rng); m_seq = std::uniform_int_distribution()(rng); - m_ts = 0; // RTP clock for Opus is 48kHz + m_ts = 0; // 48k clock for Opus // Create SDP offer (triggers onLocalDescription) m_pc->setLocalDescription(); @@ -114,28 +133,24 @@ namespace snoop std::lock_guard lk(m_mtx); if (!m_started) return; + m_trackOpen = false; m_track.reset(); m_pc.reset(); - m_resourceUrl.reset(); m_started = false; } - // Call this from your Opus encoder callback. - // opusData: encoded Opus frame bytes - // opusBytes: length - // pcmFramesPerChannel: number of PCM samples per channel represented by this Opus frame (e.g., 960 for 20ms @48k). + // Push encoded Opus frames as RTP over the libdatachannel track. void PushOpus(const unsigned char *opusData, size_t opusBytes, int pcmFramesPerChannel) { std::lock_guard lk(m_mtx); if (!m_track || !m_started || !m_trackOpen.load()) return; - // Build RTP header (12 bytes) std::array rtp{}; - rtp[0] = 0x80; // V=2, P=0, X=0, CC=0 - rtp[1] = 0x80 | (m_pt & 0x7F); // M=1 (end of frame), PT + rtp[0] = 0x80; // V=2 + rtp[1] = 0x80 | (m_pt & 0x7F); // M=1, PT rtp[2] = uint8_t(m_seq >> 8); rtp[3] = uint8_t(m_seq & 0xFF); rtp[4] = uint8_t(m_ts >> 24); @@ -147,23 +162,26 @@ namespace snoop rtp[10] = uint8_t((m_ssrc >> 8) & 0xFF); rtp[11] = uint8_t(m_ssrc & 0xFF); - // Concatenate header + payload rtc::binary packet; packet.resize(rtp.size() + opusBytes); std::memcpy(packet.data(), rtp.data(), rtp.size()); std::memcpy(packet.data() + rtp.size(), opusData, opusBytes); + m_track->send(packet); - // Send RTP on the media track - m_track->send(packet); // libdatachannel expects RTP bytes on tracks - - // Advance RTP state ++m_seq; - // For Opus @ 48k clock, timestamp increments by the PCM frame count per channel - // (do not multiply by channels) m_ts += static_cast(pcmFramesPerChannel); } private: + struct ParsedUrl + { + std::string scheme; + std::string host; + int port = 0; + std::string path; // normalized; includes leading '/', **ends with '/whip'** + std::optional bearer; // token from ?token=... + }; + Params m_p; std::shared_ptr m_pc; std::shared_ptr m_track; @@ -171,21 +189,84 @@ namespace snoop std::mutex m_mtx; std::atomic m_started{false}; std::atomic m_trackOpen{false}; + ParsedUrl m_endpoint; // RTP state uint32_t m_ssrc = 0; uint16_t m_seq = 0; uint32_t m_ts = 0; - uint8_t m_pt = 111; // dynamic payload type commonly used for Opus + uint8_t m_pt = 111; - // --- WHIP HTTP helpers (mTLS-capable) --- + // ---------- URL parsing & normalization ---------- + + static ParsedUrl ParseWhipUrl(const std::string &input) + { + // scheme://host[:port]/path[?query] + static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/[^?]*)?(?:\?(.+))?$)"); + std::smatch m; + if (!std::regex_match(input, m, re)) + { + throw std::runtime_error("Invalid WHIP URL: " + input); + } + ParsedUrl out; + out.scheme = m[1].str(); + if (out.scheme != "https") + { + throw std::runtime_error("Only HTTPS WHIP endpoints are supported"); + } + out.host = m[2].str(); + out.port = m[3].matched ? std::stoi(m[3].str()) : 443; + + std::string rawPath = m[4].matched ? m[4].str() : "/"; + std::string query = m[5].matched ? m[5].str() : ""; + + // Extract token=... from query (no decoding needed for your JWT) + if (!query.empty()) + { + // naive split, fine for `token=...` + size_t pos = 0; + while (pos < query.size()) + { + size_t amp = query.find('&', pos); + if (amp == std::string::npos) + amp = query.size(); + auto kv = query.substr(pos, amp - pos); + auto eq = kv.find('='); + if (eq != std::string::npos) + { + auto key = kv.substr(0, eq); + auto val = kv.substr(eq + 1); + if (key == "token" && !val.empty()) + { + out.bearer = val; + } + } + pos = amp + (amp < query.size() ? 1 : 0); + } + } + + // MediaMTX wants: /whip//whip + // Your incoming URLs already start with /whip/... — we just ensure they end with /whip. + if (rawPath.empty()) + rawPath = "/"; + if (rawPath.back() == '/') + rawPath.pop_back(); + if (rawPath.rfind("/whip", std::string::npos) != rawPath.size() - 5) + { + rawPath += "/whip"; + } + out.path = rawPath; + return out; + } static std::tuple ExtractAnswerAndLocation(const httplib::Result &r) { if (!r) throw std::runtime_error("No HTTP result"); if (r->status != 201 && r->status != 200) + { throw std::runtime_error("Unexpected WHIP status: " + std::to_string(r->status)); + } std::string answer = r->body; std::string resourceUrl; if (r->has_header("Location")) @@ -193,98 +274,146 @@ namespace snoop return {answer, resourceUrl}; } - // std::pair PostOfferWHIP(const std::string &sdpOffer) - // { - // auto [cli, path] = MakeClientForUrl(sdpOffer, m_p.whipUrl); - // auto res = cli->Post(path.c_str(), sdpOffer, "application/sdp"); - // auto [answer, resUrl] = ExtractAnswerAndLocation(res); - // return {answer, resUrl}; - // } - std::pair PostOfferWHIP(const std::string &sdpOffer) { - auto [cli, path] = MakeClientForUrl(sdpOffer, m_p.whipUrl); - + auto cli = MakeClient(m_endpoint); httplib::Headers hs{ {"Content-Type", "application/sdp"}, {"Accept", "application/sdp"}}; + if (m_endpoint.bearer) + { + hs.emplace("Authorization", "Bearer " + *m_endpoint.bearer); + } - spdlog::info("WHIP POST url='{}' path='{}' offer-bytes={}", m_p.whipUrl, path, sdpOffer.size()); - auto res = cli->Post(path.c_str(), hs, sdpOffer, "application/sdp"); + spdlog::info("WHIP POST url='{}://{}:{}' path='{}' offer-bytes={}", + m_endpoint.scheme, m_endpoint.host, m_endpoint.port, m_endpoint.path, sdpOffer.size()); + + auto res = cli->Post(m_endpoint.path.c_str(), hs, sdpOffer, "application/sdp"); if (!res) throw std::runtime_error("No HTTP result (network?)"); + const auto ctype = res->get_header_value("Content-Type"); + const bool has_loc = res->has_header("Location"); + spdlog::info("WHIP POST -> status={} len={} location='{}'", res->status, res->body.size(), - res->has_header("Location") ? res->get_header_value("Location") : ""); + res->has_header("Location") ? res->get_header_value("Location") : "", + ctype); if (res->status != 201 && res->status != 200) { + // Surface server body for debugging (404s, auth errors, etc.) spdlog::error("WHIP POST body:\n{}", res->body); throw std::runtime_error("Unexpected WHIP status: " + std::to_string(res->status)); } - std::string answer = res->body; std::string resourceUrl; if (res->has_header("Location")) resourceUrl = res->get_header_value("Location"); + + if (answer.find("a=ice-ufrag:") == std::string::npos) + { + spdlog::warn("Answer SDP has no top-level a=ice-ufrag; continuing. Body (first lines):"); + std::istringstream is(answer); + for (int i = 0; i < 12 && is; ++i) + { + std::string line; + std::getline(is, line); + spdlog::warn(" {}", line); + } + } return {answer, resourceUrl}; } void PatchCandidateWHIP(const rtc::Candidate &cand) { + // If MediaMTX returns an absolute Location for the resource, we parse it; + // otherwise we reuse the same host/port and use Location as path. if (!m_resourceUrl) return; - auto [cli, path] = MakeClientForUrl(std::string(""), *m_resourceUrl); + + ParsedUrl target; + try + { + target = ParseWhipUrl(*m_resourceUrl); + } + catch (...) + { + // Not a full URL; treat resourceUrl as path on the same host:port, still send auth header. + target = m_endpoint; + target.path = *m_resourceUrl; + } + + auto cli = MakeClient(target); std::string frag = "a=" + std::string(cand); - auto res = cli->Patch(path.c_str(), frag, "application/trickle-ice-sdpfrag"); + + httplib::Headers hs{ + {"Content-Type", "application/trickle-ice-sdpfrag"}}; + if (m_endpoint.bearer) + { + hs.emplace("Authorization", "Bearer " + *m_endpoint.bearer); + } + + auto res = cli->Patch(target.path.c_str(), hs, frag, "application/trickle-ice-sdpfrag"); if (!res || (res->status < 200 || res->status >= 300)) { - spdlog::warn("WHIP PATCH {} -> {}", path, res ? res->status : -1); + spdlog::warn("WHIP PATCH {} -> {}", target.path, res ? res->status : -1); } } - // Build client + path for URL; uses mTLS when https - std::pair, std::string> - MakeClientForUrl(const std::string &body, const std::string &url) + std::unique_ptr MakeClient(const ParsedUrl &u) { - (void)body; - static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)"); - std::smatch m; - if (!std::regex_match(url, m, re)) - throw std::runtime_error("Invalid URL: " + url); - std::string scheme = m[1].str(); - std::string host = m[2].str(); - int port = m[3].matched ? std::stoi(m[3].str()) : (scheme == "https" ? 443 : 80); - std::string path = m[4].matched ? m[4].str() : "/"; - - if (scheme == "https") + if (u.scheme == "https") { #ifndef CPPHTTPLIB_OPENSSL_SUPPORT throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled"); #else + // mTLS: write client key payload to a temp file so OpenSSL can read it auto payload = snoop::device_sec::ReadClientKeyPayloadFromKeyring(); - // pick a writable dir (use /tmp to avoid /run root perms surprises) snoop::device_sec::TempFile tf(std::filesystem::temp_directory_path()); tf.write_all(payload.data(), payload.size()); - auto cli = std::make_unique(host.c_str(), port, m_p.crtPath, tf.path, std::string()); + auto cli = std::make_unique(u.host.c_str(), u.port, + m_p.crtPath.c_str(), tf.path.c_str(), + std::string()); cli->enable_server_certificate_verification(false); cli->set_ca_cert_path(m_p.caPath.c_str()); cli->set_connection_timeout(10); cli->set_read_timeout(60); cli->set_write_timeout(60); - return {std::move(cli), path}; + return cli; #endif } - // else - // { - // auto cli = std::make_unique(host.c_str(), port); - // cli->set_connection_timeout(10); - // cli->set_read_timeout(60); - // cli->set_write_timeout(60); - // return {std::move(cli), path}; - // } + } + + void PatchSdpFrag(const std::string &sdpfrag) + { + if (!m_resourceUrl) + return; + + // Reuse the same host/port and just set path = Location + ParsedUrl target = m_endpoint; + target.path = *m_resourceUrl; + + auto cli = MakeClient(target); + + httplib::Headers hs{ + {"Content-Type", "application/trickle-ice-sdpfrag"}}; + if (m_endpoint.bearer) + { + hs.emplace("Authorization", "Bearer " + *m_endpoint.bearer); + } + + // MUST include CRLF at end of body + std::string body = sdpfrag; + if (body.empty() || body.back() != '\n') + body += "\r\n"; + + auto res = cli->Patch(target.path.c_str(), hs, body, "application/trickle-ice-sdpfrag"); + if (!res || !(res->status == 200 || res->status == 201 || res->status == 204)) + { + spdlog::warn("WHIP PATCH {} -> {}", target.path, res ? res->status : -1); + } } };