removed socket.io, added webrtc audio stream support and deep sleep. for webrtc we will use libdatachannel

This commit is contained in:
tdv
2025-10-09 16:27:32 +03:00
parent 5af104acf5
commit 78b7d495d4
2024 changed files with 1332 additions and 581855 deletions

View File

@@ -1,102 +1,135 @@
// src/Services/AudioStreamService.h
#pragma once
#include <sio_client.h>
#include <atomic>
#include <spdlog/spdlog.h>
#include <chrono>
#include <filesystem>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include <cstdio>
#include <array>
#include <spdlog/spdlog.h>
#include "WhipClient.h"
#include "ConfigService.h"
namespace snoop {
class AudioStreamService {
std::shared_ptr<sio::client> m_client;
std::string m_guid;
std::atomic<bool> m_isConnected = false;
std::atomic<bool> m_isInStreaming = false;
std::vector<char> m_audioBuffer;
std::mutex m_bufferMutex;
const unsigned long long int FLUSH_PERIOD = 5000;
const std::vector<char> PACKET_DELIMITER = {
static_cast<char>(0xFF),
static_cast<char>(0xFE),
static_cast<char>(0xFD),
static_cast<char>(0xFC)
};
unsigned long long int m_flushedAt = 0;
std::shared_ptr<ConfigService> m_cfg;
// WHIP
std::unique_ptr<WhipClient> m_whip;
std::mutex m_whipMutex;
std::string m_tmpKeyPath; // temp key extracted from keyctl (deleted on Stop)
public:
explicit AudioStreamService( std::shared_ptr<sio::client> client, std::string guid ) :
m_client( std::move( client ) ),
m_guid( std::move( guid ) ) {
SetupEventListeners();
}
explicit AudioStreamService(std::shared_ptr<ConfigService> cfg)
: m_cfg(std::move(cfg)) {}
~AudioStreamService() {
this->m_isConnected = false;
this->m_isInStreaming = false;
StopWhip();
}
void SendAudioData( const char* input, size_t size ) {
if( !this->m_isConnected || !this->m_isInStreaming ) {
return;
// Feed raw PCM (float32 interleaved), frames = samples per channel
void OnPCM(const float* interleaved, size_t frames) {
std::lock_guard lk(m_whipMutex);
if (m_whip) m_whip->PushPCM(interleaved, frames);
}
bool StartWhip(const std::string& whipUrl, int sampleRate=48000, int channels=1) {
std::lock_guard lk(m_whipMutex);
if (m_whip) {
spdlog::info("WHIP already started");
return true;
}
if (!m_cfg) {
spdlog::error("StartWhip requires ConfigService");
return false;
}
std::lock_guard lock( m_bufferMutex );
this->m_audioBuffer.insert(m_audioBuffer.end(), PACKET_DELIMITER.begin(), PACKET_DELIMITER.end());
this->m_audioBuffer.insert( m_audioBuffer.end(), input, input + size );
// certs from enrollment
std::filesystem::path ca = "/etc/iot/keys/issuing_ca.pem";
if (!std::filesystem::exists(ca)) ca = "/etc/iot/keys/ca_chain.pem";
std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
auto now = std::chrono::system_clock::now();
auto currentTime = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count();
if( currentTime >= this->m_flushedAt + FLUSH_PERIOD ) {
FlushBuffer();
// extract client key via keyctl
auto tmpKey = ExtractClientKeyTemp();
if (!tmpKey) {
spdlog::error("Cannot extract client key for WHIP (keyctl user iot-client-key)");
return false;
}
WhipClient::Params p{
.whipUrl = whipUrl,
.caPath = ca.string(),
.crtPath = crt.string(),
.keyPath = tmpKey->string(),
.sampleRate= sampleRate,
.channels = channels
};
m_whip = std::make_unique<WhipClient>(p);
try {
m_whip->Start();
spdlog::info("WHIP started");
m_tmpKeyPath = *tmpKey;
return true;
} catch (const std::exception& e) {
spdlog::error("WHIP start failed: {}", e.what());
std::error_code ec; std::filesystem::remove(*tmpKey, ec);
m_whip.reset();
return false;
}
}
void StopWhip() {
std::lock_guard lk(m_whipMutex);
if (m_whip) {
m_whip->Stop();
m_whip.reset();
}
if (!m_tmpKeyPath.empty()) {
std::error_code ec; std::filesystem::remove(m_tmpKeyPath, ec);
m_tmpKeyPath.clear();
}
}
private:
void SetupEventListeners() {
this->m_client->set_open_listener( [this]() {
spdlog::info( "Connected to server" );
this->m_client->socket( "/livestream" )->emit( "register_device", m_guid );
this->m_isConnected = true;
} );
static std::optional<std::filesystem::path> ExtractClientKeyTemp() {
auto exec = [](const std::string& cmd) {
std::array<char, 4096> buf{};
std::string out;
FILE* pipe = popen((cmd + " 2>&1").c_str(), "r");
if (!pipe) return std::string{};
while (fgets(buf.data(), (int)buf.size(), pipe) != nullptr) out.append(buf.data());
pclose(pipe);
return out;
};
auto trim = [](std::string s){
auto b=s.find_first_not_of(" \t\r\n"), e=s.find_last_not_of(" \t\r\n");
return (b==std::string::npos) ? std::string{} : s.substr(b, e-b+1);
};
this->m_client->set_close_listener( [this]( sio::client::close_reason const& reason ) {
this->m_isConnected = false;
this->m_isInStreaming = false;
spdlog::info( "Disconnected from server" );
} );
std::string id = trim(exec("keyctl search @s user iot-client-key | tail -n1"));
if (id.empty()) return std::nullopt;
this->m_client->set_fail_listener( []() {
spdlog::info( "Failed to connect to server" );
} );
this->m_client->socket( "/livestream" )->on( "start_streaming", [this]( sio::event& ev ) {
spdlog::info( "Start streaming command received" );
this->m_isInStreaming = true;
auto now = std::chrono::system_clock::now();
this->m_flushedAt = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count();
} );
this->m_client->socket( "/livestream" )->on( "stop_streaming", [this]( sio::event& ev ) {
spdlog::info( "Stop streaming command received" );
this->m_isInStreaming = false;
std::lock_guard lock( this->m_bufferMutex );
this->m_audioBuffer.clear();
} );
}
void FlushBuffer() {
if( this->m_audioBuffer.empty() ) {
return;
char tmpl[] = "/run/iot-whip-keyXXXXXX";
int fd = mkstemp(tmpl);
if (fd < 0) return std::nullopt;
close(fd);
std::filesystem::path p(tmpl);
exec("keyctl pipe " + id + " > " + p.string());
if (!std::filesystem::exists(p) || std::filesystem::file_size(p) == 0) {
std::error_code ec; std::filesystem::remove(p, ec);
return std::nullopt;
}
this->m_client->socket( "/livestream" )->emit( "audio_data",
std::make_shared<std::string>( this->m_audioBuffer.data(), this->m_audioBuffer.size() )
);
this->m_audioBuffer.clear();
auto now = std::chrono::system_clock::now();
this->m_flushedAt = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count();
return p;
}
};
}
} // namespace snoop

View File

@@ -5,177 +5,459 @@
#include <thread>
#include <atomic>
#include <chrono>
#include <httplib.h>
#include <mutex>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <array>
#include <optional>
#include <spdlog/spdlog.h>
#include <httplib.h> // build with CPPHTTPLIB_OPENSSL_SUPPORT
#include <sys/wait.h>
#include "AudioWriters/OggAudioWriter.h"
#include "ConfigService.h"
namespace snoop
{
namespace snoop {
class AudioWriterService
{
std::shared_ptr<ConfigService> m_configService;
std::shared_ptr<OggAudioWriter> m_oggWriter;
std::string m_destinationDirectoryPath;
std::string m_queueDirectoryPath;
class AudioWriterService {
std::shared_ptr<ConfigService> m_configService;
std::shared_ptr<OggAudioWriter> m_oggWriter;
std::string m_destinationDirectoryPath;
std::string m_queueDirectoryPath;
std::thread m_writingThread;
std::thread m_uploadThread;
std::mutex m_fetchFilePathsMutex;
unsigned long long int m_currentRecordStartedAt = 0;
std::string m_currentRecordFilePath;
std::atomic<bool> m_isIntermission = false;
std::thread m_writingThread;
std::thread m_uploadThread;
public:
explicit AudioWriterService( std::shared_ptr<ConfigService> configService, std::string destinationDirectoryPath ) :
m_configService( std::move( configService ) ),
m_destinationDirectoryPath( std::move( destinationDirectoryPath ) ),
m_oggWriter( std::make_shared<OggAudioWriter>( 48000, 1 ) ) {
if( !this->m_destinationDirectoryPath.empty() && !this->m_destinationDirectoryPath.ends_with( "/" ) ) {
this->m_destinationDirectoryPath.append( "/" );
}
this->m_queueDirectoryPath = this->m_destinationDirectoryPath + "queue/";
std::filesystem::create_directories( this->m_queueDirectoryPath );
this->MoveToQueueUncompletedRecords();
this->m_writingThread = std::thread( [this]() {
this->WritingThread();
} );
this->m_uploadThread = std::thread( [this]() {
this->UploadThread();
} );
spdlog::info( "AudioWriterService::AudioWriterService()" );
}
std::mutex m_fetchFilePathsMutex;
std::mutex m_stateMutex;
void WriteAudioData( const char* data, size_t size, size_t frames ) {
this->m_oggWriter->Write( data, size, frames );
}
unsigned long long int m_currentRecordStartedAt = 0;
std::string m_currentRecordFilePath;
~AudioWriterService() {
this->m_isIntermission = true;
this->m_writingThread.join();
this->m_uploadThread.join();
}
std::atomic<bool> m_isIntermission = false;
private:
void MoveToQueueUncompletedRecords() {
std::vector<std::filesystem::path> files;
for( const auto& entry: std::filesystem::directory_iterator( this->m_destinationDirectoryPath ) ) {
files.push_back( entry.path() );
}
for( const auto& file: files ) {
if( file.filename().string() != "queue" ) {
spdlog::info( "Move uncompleted record {} to queue", file.filename().string() );
this->MoveToUploadQueue( file.string() );
}
}
}
// New recording control flags
std::atomic<bool> m_recordingEnabled{false};
std::atomic<bool> m_stopAfterCurrentSegment{false};
void WritingThread() {
auto now = std::chrono::system_clock::now();
this->m_currentRecordStartedAt = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count();
this->m_currentRecordFilePath = this->m_destinationDirectoryPath + std::to_string( this->m_currentRecordStartedAt );
this->m_oggWriter->StartWriting( this->m_currentRecordFilePath );
public:
explicit AudioWriterService(std::shared_ptr<ConfigService> configService, std::string destinationDirectoryPath)
: m_configService(std::move(configService)),
m_destinationDirectoryPath(std::move(destinationDirectoryPath)),
m_oggWriter(std::make_shared<OggAudioWriter>(48000, 1))
{
while( !m_isIntermission ) {
now = std::chrono::system_clock::now();
auto currentRecordDuration = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count() -
this->m_currentRecordStartedAt;
if( currentRecordDuration >= this->m_configService->GetRecordingDuration() ) {
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue( this->m_currentRecordFilePath );
this->m_currentRecordStartedAt = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count();
this->m_currentRecordFilePath = this->m_destinationDirectoryPath + std::to_string( this->m_currentRecordStartedAt );
this->m_oggWriter->StartWriting( this->m_currentRecordFilePath );
}
std::this_thread::sleep_for( std::chrono::milliseconds( 1000 ) );
}
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue( this->m_currentRecordFilePath );
// TODO: Move to upload queue
}
void MoveToUploadQueue( const std::string& filePath ) {
spdlog::info( "AudioWriterService::MoveToUploadQueue( {} )", filePath );
std::lock_guard lock( this->m_fetchFilePathsMutex );
auto now = std::chrono::system_clock::now();
auto recordStoppedAt = std::chrono::duration_cast<std::chrono::milliseconds>( now.time_since_epoch() ).count();
if( std::filesystem::exists( filePath ) ) {
auto fileName = std::filesystem::path( filePath ).filename().string() + "-" + std::to_string( recordStoppedAt );
std::filesystem::rename( filePath, m_queueDirectoryPath + fileName );
}
}
void UploadThread() {
while (!m_isIntermission) {
std::vector<std::filesystem::path> files;
if (!this->m_destinationDirectoryPath.empty() && !this->m_destinationDirectoryPath.ends_with("/"))
{
std::lock_guard l(this->m_fetchFilePathsMutex);
try {
for (const auto& entry : std::filesystem::directory_iterator(this->m_queueDirectoryPath)) {
files.push_back(entry.path());
}
} catch (const std::exception& e) {
spdlog::error("Error reading queue directory: {}", e.what());
this->m_destinationDirectoryPath.append("/");
}
this->m_queueDirectoryPath = this->m_destinationDirectoryPath + "queue/";
std::filesystem::create_directories(this->m_queueDirectoryPath);
this->MoveToQueueUncompletedRecords();
// Start background threads
this->m_writingThread = std::thread([this]()
{ this->WritingThread(); });
this->m_uploadThread = std::thread([this]()
{ this->UploadThread(); });
spdlog::info("AudioWriterService constructed; initial recordingEnabled=false");
}
~AudioWriterService()
{
this->m_isIntermission = true;
if (this->m_writingThread.joinable())
this->m_writingThread.join();
if (this->m_uploadThread.joinable())
this->m_uploadThread.join();
}
// -------- Public control API (called from DeviceControlService handlers) --------
// Begin a new recording cycle immediately (creates a fresh segment and starts writing).
void StartRecording()
{
std::lock_guard<std::mutex> lk(this->m_stateMutex);
if (m_recordingEnabled.load())
{
spdlog::info("StartRecording ignored: already recording");
return;
}
m_stopAfterCurrentSegment = false;
m_recordingEnabled = true;
spdlog::info("Recording enabled");
}
// Graceful stop: finish the current segment at the next rotation boundary, then stop.
void StopRecordingGracefully()
{
std::lock_guard<std::mutex> lk(this->m_stateMutex);
if (!m_recordingEnabled.load())
{
spdlog::info("StopRecordingGracefully ignored: not recording");
return;
}
m_stopAfterCurrentSegment = true;
spdlog::info("Recording will stop after current segment completes");
}
bool IsRecording() const
{
return m_recordingEnabled.load();
}
// Called from the encoder callback — only writes when recording is enabled
void WriteAudioData(const char *data, size_t size, size_t frames)
{
if (!m_recordingEnabled.load())
return;
this->m_oggWriter->Write(data, size, frames);
}
// stops recording immidiately, when deep_sleep received
void StopRecordingNow()
{
std::lock_guard<std::mutex> lk(this->m_stateMutex);
if (!m_recordingEnabled.load())
{
spdlog::info("StopRecordingNow ignored: not recording");
return;
}
// Force-close current segment right away, enqueue, and disable recording.
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue(this->m_currentRecordFilePath);
m_recordingEnabled = false;
m_stopAfterCurrentSegment = false;
spdlog::info("Recording stopped immediately (deep sleep)");
}
private:
// ----------------------- Helpers (exec, keyctl, HTTPS mTLS) -----------------------
static std::string Trim(const std::string &s)
{
auto b = s.find_first_not_of(" \t\r\n");
auto e = s.find_last_not_of(" \t\r\n");
if (b == std::string::npos)
return "";
return s.substr(b, e - b + 1);
}
static std::string Exec(const std::string &cmd)
{
std::array<char, 4096> buf{};
std::string out;
FILE *pipe = popen((cmd + " 2>&1").c_str(), "r");
if (!pipe)
throw std::runtime_error("popen failed: " + cmd);
while (fgets(buf.data(), (int)buf.size(), pipe) != nullptr)
out.append(buf.data());
int rc = pclose(pipe);
int exitCode = WIFEXITED(rc) ? WEXITSTATUS(rc) : rc;
if (exitCode != 0)
spdlog::warn("Command '{}' exited with code {}", cmd, exitCode);
return out;
}
static std::filesystem::path ExtractClientKeyFromKernelKeyring()
{
std::string id = Trim(Exec("keyctl search @s user iot-client-key | tail -n1"));
if (id.empty())
throw std::runtime_error("iot-client-key not found in keyring");
char tmpl[] = "/run/iot-keyXXXXXX";
int fd = mkstemp(tmpl);
if (fd < 0)
throw std::runtime_error("mkstemp failed for client key");
close(fd);
std::filesystem::path p(tmpl);
Exec("keyctl pipe " + id + " > " + p.string());
if (std::filesystem::file_size(p) == 0)
{
std::error_code ec;
std::filesystem::remove(p, ec);
throw std::runtime_error("keyctl pipe produced empty client key");
}
return p;
}
struct Url
{
std::string scheme;
std::string host;
int port = 0;
};
static Url ParseBase(const std::string &base)
{
std::regex re(R"(^\s*(https?)://([^/:]+)(?::(\d+))?\s*$)");
std::smatch m;
if (!std::regex_match(base, m, re))
{
throw std::runtime_error("Invalid base URL: " + base);
}
Url u;
u.scheme = m[1].str();
u.host = m[2].str();
u.port = m[3].matched ? std::stoi(m[3].str()) : (u.scheme == "https" ? 443 : 80);
return u;
}
std::unique_ptr<httplib::Client> MakeClientMTLS(const Url &u,
const std::filesystem::path &ca,
const std::filesystem::path &crt,
const std::filesystem::path &key)
{
if (u.scheme == "https")
{
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
auto cli = std::make_unique<httplib::SSLClient>(u.host.c_str(), u.port);
cli->enable_server_certificate_verification(true);
cli->set_ca_cert_path(ca.string().c_str());
cli->set_client_cert_file(crt.string().c_str(), key.string().c_str(), nullptr);
cli->set_connection_timeout(10);
cli->set_read_timeout(120);
cli->set_write_timeout(120);
return cli;
#else
throw std::runtime_error("HTTPS baseUrl but CPPHTTPLIB_OPENSSL_SUPPORT is not enabled");
#endif
}
else
{
auto cli = std::make_unique<httplib::Client>(u.host.c_str(), u.port);
cli->set_connection_timeout(10);
cli->set_read_timeout(120);
cli->set_write_timeout(120);
return cli;
}
}
// ----------------------------- Existing logic (adjusted) -----------------------------
void MoveToQueueUncompletedRecords()
{
std::vector<std::filesystem::path> files;
for (const auto &entry : std::filesystem::directory_iterator(this->m_destinationDirectoryPath))
{
files.push_back(entry.path());
}
for (const auto &file : files)
{
if (file.filename().string() != "queue")
{
spdlog::info("Move uncompleted record {} to queue", file.filename().string());
this->MoveToUploadQueue(file.string());
}
}
}
void WritingThread()
{
// recording starts ONLY when StartRecording() is called
while (!m_isIntermission)
{
if (!m_recordingEnabled.load())
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
continue;
}
// Start a fresh segment
auto now = std::chrono::system_clock::now();
this->m_currentRecordStartedAt =
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
this->m_currentRecordFilePath = this->m_destinationDirectoryPath + std::to_string(this->m_currentRecordStartedAt);
this->m_oggWriter->StartWriting(this->m_currentRecordFilePath);
spdlog::info("Recording segment started: {}", this->m_currentRecordFilePath);
// Write until duration elapses
const auto segDurationMs = this->m_configService->GetRecordingDuration();
while (!m_isIntermission && m_recordingEnabled.load())
{
now = std::chrono::system_clock::now();
auto currentRecordDuration =
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() - this->m_currentRecordStartedAt;
if (currentRecordDuration >= segDurationMs)
break;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
// Close current segment and enqueue
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue(this->m_currentRecordFilePath);
spdlog::info("Recording segment finished: {}", this->m_currentRecordFilePath);
// If graceful stop requested, stop after finishing this segment
if (m_stopAfterCurrentSegment.load())
{
m_recordingEnabled = false;
m_stopAfterCurrentSegment = false;
spdlog::info("Recording disabled after graceful stop");
}
}
for (const auto& filePath : files) {
auto fileName = filePath.filename().string();
spdlog::info("Processing file: {}", fileName);
// If exiting service while in a middle of a segment, ensure clean close
if (m_recordingEnabled.load())
{
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue(this->m_currentRecordFilePath);
m_recordingEnabled = false;
}
}
size_t delimiterPos = fileName.find('-');
if (delimiterPos != std::string::npos) {
std::string startedAt = fileName.substr(0, delimiterPos);
std::string stoppedAt = fileName.substr(delimiterPos + 1);
void MoveToUploadQueue(const std::string &filePath)
{
spdlog::info("MoveToUploadQueue( {} )", filePath);
std::lock_guard lock(this->m_fetchFilePathsMutex);
auto now = std::chrono::system_clock::now();
auto recordStoppedAt = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
if (std::filesystem::exists(filePath))
{
auto fileName = std::filesystem::path(filePath).filename().string() + "-" + std::to_string(recordStoppedAt);
std::filesystem::rename(filePath, m_queueDirectoryPath + fileName);
}
}
try {
spdlog::info("Attempting to upload file...");
if (SendRecordedFile(filePath.string(), stoull(startedAt), stoull(stoppedAt))) {
spdlog::info("File uploaded, deleting: {}", filePath.string());
std::filesystem::remove(filePath);
} else {
spdlog::warn("Failed to upload file: {}", filePath.string());
void UploadThread()
{
const auto baseUrl = this->m_configService->GetBaseUrl();
Url url = ParseBase(baseUrl);
// certs from enrollment
std::filesystem::path ca = "/etc/iot/keys/issuing_ca.pem";
if (!std::filesystem::exists(ca))
ca = "/etc/iot/keys/ca_chain.pem";
const std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
while (!m_isIntermission)
{
std::vector<std::filesystem::path> files;
{
std::lock_guard l(this->m_fetchFilePathsMutex);
try
{
for (const auto &entry : std::filesystem::directory_iterator(this->m_queueDirectoryPath))
{
if (entry.is_regular_file())
files.push_back(entry.path());
}
} catch (const std::exception& e) {
spdlog::error("Exception during file upload: {}", e.what());
}
catch (const std::exception &e)
{
spdlog::error("Error reading queue directory: {}", e.what());
}
}
// Prepare client key (temp file) for this upload pass
std::optional<std::filesystem::path> tmpKey;
try
{
tmpKey = ExtractClientKeyFromKernelKeyring();
}
catch (const std::exception &e)
{
spdlog::error("Cannot extract client key for mTLS: {}", e.what());
// Wait a bit and retry later
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
try
{
auto client = MakeClientMTLS(url, ca, crt, *tmpKey);
for (const auto &filePath : files)
{
auto fileName = filePath.filename().string();
spdlog::info("Uploading file: {}", fileName);
size_t delimiterPos = fileName.find('-');
if (delimiterPos == std::string::npos)
{
spdlog::warn("Unexpected filename format, skipping: {}", fileName);
continue;
}
std::string startedAt = fileName.substr(0, delimiterPos);
std::string stoppedAt = fileName.substr(delimiterPos + 1);
try
{
if (SendRecordedFileMTLS(*client, filePath.string(),
std::stoull(startedAt),
std::stoull(stoppedAt)))
{
spdlog::info("File uploaded, deleting: {}", filePath.string());
std::filesystem::remove(filePath);
}
else
{
spdlog::warn("Failed to upload file: {}", filePath.string());
}
}
catch (const std::exception &e)
{
spdlog::error("Exception during file upload: {}", e.what());
}
}
}
catch (const std::exception &e)
{
spdlog::error("mTLS client setup failed: {}", e.what());
}
// cleanup temp key asap
if (tmpKey)
{
std::error_code ec;
std::filesystem::remove(*tmpKey, ec);
}
std::this_thread::sleep_for(std::chrono::milliseconds(800));
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
bool SendRecordedFile( const std::string& filepath, unsigned long long int startedAt, unsigned long long stoppedAt ) {
spdlog::info( "SendRecordedFile: {}", filepath );
httplib::Client client( this->m_configService->GetBaseUrl() );
std::ifstream ifs( filepath, std::ios::binary );
if( !ifs ) {
throw std::runtime_error( "Failed to open file" );
}
std::vector<char> buffer( ( std::istreambuf_iterator<char>( ifs ) ), ( std::istreambuf_iterator<char>() ) );
auto res = client.Post(
std::string( "/records/upload/" ),
httplib::MultipartFormDataItems{
{ "file", std::string( buffer.begin(), buffer.end() ), "file.ogg", "audio/ogg" },
{ "guid", this->m_configService->GetGuid() },
{ "startedAt", std::to_string( startedAt ) },
{ "stoppedAt", std::to_string( stoppedAt ) },
} );
bool SendRecordedFileMTLS(httplib::Client &client,
const std::string &filepath,
unsigned long long int startedAt,
unsigned long long int stoppedAt)
{
spdlog::info("SendRecordedFile (mTLS): {}", filepath);
std::ifstream ifs(filepath, std::ios::binary);
if (!ifs)
throw std::runtime_error("Failed to open file: " + filepath);
if( res && res->status == 201 ) {
spdlog::info( "File uploaded successfully" );
return true;
std::vector<char> buffer((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>());
// Multipart form: file + guid + times (same fields as before)
const std::string guid = this->m_configService->GetGuid();
httplib::MultipartFormDataItems items = {
{"file", std::string(buffer.begin(), buffer.end()), "file.ogg", "audio/ogg"},
{"guid", guid, "", "text/plain"},
{"startedAt", std::to_string(startedAt), "", "text/plain"},
{"stoppedAt", std::to_string(stoppedAt), "", "text/plain"},
};
auto res = client.Post("/api/records/upload/", items);
if (res && (res->status == 201 || res->status == 200))
{
spdlog::info("File uploaded successfully: HTTP {}", res->status);
return true;
}
if (res)
{
spdlog::error("Upload failed: HTTP {}, body: {}", res->status, res->body);
}
else
{
spdlog::error("Upload failed: no response");
}
return false;
}
};
spdlog::error( "Failed to upload file" );
return false;
}
};
}
} // namespace snoop

View File

@@ -17,279 +17,505 @@
#include <spdlog/spdlog.h>
#include <nlohmann/json.hpp>
#include <httplib.h> // build with CPPHTTPLIB_OPENSSL_SUPPORT
#include <sys/wait.h> // for WEXITSTATUS
#include <httplib.h> // build with CPPHTTPLIB_OPENSSL_SUPPORT
#include <sys/wait.h> // for WEXITSTATUS
#include "ConfigService.h"
namespace snoop {
namespace snoop
{
class DeviceControlService {
public:
struct Task {
uint64_t id{};
std::string type;
nlohmann::json payload = nlohmann::json::object();
};
// Handler returns: {success, result_json_string, error_message}
using HandlerResult = std::tuple<bool, std::string, std::string>;
using TaskHandler = std::function<HandlerResult(const Task&)>;
struct Handlers {
// Fill any you implement. Unset => default “not implemented”.
TaskHandler onStartStream;
TaskHandler onStopStream;
TaskHandler onStartRecording;
TaskHandler onStopRecording;
TaskHandler onUpdateConfig;
TaskHandler onSetDeepSleep;
};
DeviceControlService(std::shared_ptr<ConfigService> cfg, Handlers handlers)
: m_cfg(std::move(cfg)), m_handlers(std::move(handlers)) {
m_stop = false;
m_thread = std::thread(&DeviceControlService::RunLoop, this);
}
~DeviceControlService() {
m_stop = true;
if (m_thread.joinable()) m_thread.join();
}
private:
std::shared_ptr<ConfigService> m_cfg;
Handlers m_handlers;
std::thread m_thread;
std::atomic<bool> m_stop{false};
// --- helpers ---
static std::string Trim(const std::string& s) {
auto b = s.find_first_not_of(" \t\r\n");
auto e = s.find_last_not_of(" \t\r\n");
if (b == std::string::npos) return "";
return s.substr(b, e - b + 1);
}
static std::string Exec(const std::string& cmd) {
std::array<char, 4096> buf{};
std::string out;
FILE* pipe = popen((cmd + " 2>&1").c_str(), "r");
if (!pipe) throw std::runtime_error("popen failed: " + cmd);
while (fgets(buf.data(), (int)buf.size(), pipe) != nullptr) out.append(buf.data());
int rc = pclose(pipe);
int exitCode = WIFEXITED(rc) ? WEXITSTATUS(rc) : rc;
if (exitCode != 0) spdlog::warn("Command '{}' exited with code {}", cmd, exitCode);
return out;
}
struct Url {
std::string scheme; // http/https
std::string host; // hostname or ip
int port = 0; // default if 0
};
static Url ParseBase(const std::string& base) {
// very small parser: scheme://host[:port]
std::regex re(R"(^\s*(https?)://([^/:]+)(?::(\d+))?\s*$)");
std::smatch m;
if (!std::regex_match(base, m, re)) {
throw std::runtime_error("Invalid base URL for DeviceControlService: " + base);
}
Url u;
u.scheme = m[1].str();
u.host = m[2].str();
u.port = m[3].matched ? std::stoi(m[3].str()) : (u.scheme == "https" ? 443 : 80);
return u;
}
// dumps the client key from keyring to a temp file and returns its path
static std::filesystem::path ExtractClientKeyFromKernelKeyring() {
std::string id = Trim(Exec("keyctl search @s user iot-client-key | tail -n1"));
if (id.empty()) throw std::runtime_error("iot-client-key not found in keyring");
// Create a secure temp file
char tmpl[] = "/run/iot-keyXXXXXX";
int fd = mkstemp(tmpl);
if (fd < 0) throw std::runtime_error("mkstemp failed for client key");
close(fd);
std::filesystem::path p(tmpl);
// Pipe the key payload into the temp file
std::string cmd = "keyctl pipe " + id + " > " + p.string();
Exec(cmd);
// quick sanity
if (std::filesystem::file_size(p) == 0) {
std::error_code ec; std::filesystem::remove(p, ec);
throw std::runtime_error("keyctl pipe produced empty client key");
}
return p;
}
// Create HTTPS client configured for mTLS (or HTTP if base is http)
std::unique_ptr<httplib::Client> MakeClient(const Url& u,
const std::filesystem::path& ca,
const std::filesystem::path& crt,
const std::filesystem::path& key) {
if (u.scheme == "https") {
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
auto cli = std::make_unique<httplib::SSLClient>(u.host.c_str(), u.port);
cli->enable_server_certificate_verification(true);
cli->set_ca_cert_path(ca.string().c_str());
cli->set_client_cert_file(crt.string().c_str(), key.string().c_str(), nullptr);
// Recommended timeouts for long polling-ish flows
cli->set_connection_timeout(10);
cli->set_read_timeout(60);
cli->set_write_timeout(60);
return cli;
#else
throw std::runtime_error("CPPHTTPLIB_OPENSSL_SUPPORT not enabled but https URL provided");
#endif
} else {
auto cli = std::make_unique<httplib::Client>(u.host.c_str(), u.port);
cli->set_connection_timeout(10);
cli->set_read_timeout(60);
cli->set_write_timeout(60);
return cli;
}
}
// simple jittered sleep
void SleepWithJitterOnce(std::mt19937& rng, int baseSec, int jitterSec) {
if (baseSec < 0) baseSec = 0;
if (jitterSec < 0) jitterSec = 0;
std::uniform_int_distribution<int> dist(-jitterSec, +jitterSec);
int delay = baseSec + dist(rng);
if (delay < 0) delay = 0;
std::this_thread::sleep_for(std::chrono::seconds(delay));
}
// default fallback if handler is not provided
static HandlerResult NotImplemented(const Task& t) {
std::string msg = "Handler not implemented for type: " + t.type;
spdlog::warn("{}", msg);
return {false, "{}", msg};
}
TaskHandler ResolveHandler(std::string_view type) const {
if (type == "start_stream") return m_handlers.onStartStream ? m_handlers.onStartStream : NotImplemented;
if (type == "stop_stream") return m_handlers.onStopStream ? m_handlers.onStopStream : NotImplemented;
if (type == "start_recording") return m_handlers.onStartRecording ? m_handlers.onStartRecording : NotImplemented;
if (type == "stop_recording") return m_handlers.onStopRecording ? m_handlers.onStopRecording : NotImplemented;
if (type == "update_config") return m_handlers.onUpdateConfig ? m_handlers.onUpdateConfig : NotImplemented;
if (type == "set_deep_sleep") return m_handlers.onSetDeepSleep ? m_handlers.onSetDeepSleep : NotImplemented;
return NotImplemented;
}
static std::vector<Task> ParseTasks(const std::string& body) {
// server might return single object or array
std::vector<Task> tasks;
auto j = nlohmann::json::parse(body);
auto push_one = [&](const nlohmann::json& x){
Task t;
t.id = x.value("id", 0);
t.type = x.value("type", "");
try {
if (x.contains("payload")) {
const auto& raw = x.at("payload");
if (raw.is_string()) {
// payload is quoted JSON string -> parse inner if valid, else keep as string
try { t.payload = nlohmann::json::parse(raw.get<std::string>()); }
catch (...) { t.payload = raw.get<std::string>(); }
} else if (raw.is_object() || raw.is_array()) {
t.payload = raw;
} else {
t.payload = nlohmann::json::object();
}
}
} catch (...) { t.payload = nlohmann::json::object(); }
tasks.push_back(std::move(t));
class DeviceControlService
{
public:
struct Task
{
uint64_t id{};
std::string type;
nlohmann::json payload = nlohmann::json::object();
};
if (j.is_array()) {
for (auto& it : j) push_one(it);
} else if (j.is_object()) {
push_one(j);
}
return tasks;
}
// Handler returns: {success, result_json_string, error_message}
using HandlerResult = std::tuple<bool, std::string, std::string>;
using TaskHandler = std::function<HandlerResult(const Task &)>;
void PostResult(httplib::Client& cli, const std::string& guid,
uint64_t taskId, bool success,
const std::string& resultJson, const std::string& err) {
nlohmann::json dto = {
{"taskId", taskId},
{"success", success},
{"result", resultJson.empty() ? "{}" : resultJson},
{"error", err}
struct Handlers
{
// Fill any you implement. Unset => default “not implemented”.
TaskHandler onStartStream;
TaskHandler onStopStream;
TaskHandler onStartRecording;
TaskHandler onStopRecording;
TaskHandler onUpdateConfig;
TaskHandler onSetDeepSleep;
};
std::string path = "/api/tasks/" + guid;
auto res = cli.Post(path.c_str(), dto.dump(), "application/json");
if (!res) {
spdlog::error("POST {} failed (no response)", path);
return;
struct Controls
{
std::function<void()> stopStreamNow; // e.g., streamer.StopWhip()
std::function<void()> stopRecordingNow; // e.g., writerService->StopRecordingNow()
};
// DeviceControlService(std::shared_ptr<ConfigService> cfg, Handlers handlers)
// : m_cfg(std::move(cfg)), m_handlers(std::move(handlers))
// {
// m_stop = false;
// m_thread = std::thread(&DeviceControlService::RunLoop, this);
// }
DeviceControlService(std::shared_ptr<ConfigService> cfg, Handlers handlers, Controls controls)
: m_cfg(std::move(cfg)), m_handlers(std::move(handlers)), m_controls(std::move(controls))
{
m_stop = false;
m_thread = std::thread(&DeviceControlService::RunLoop, this);
}
spdlog::info("POST {} -> HTTP {}", path, res->status);
}
void RunLoop() {
const std::string guid = m_cfg->GetGuid();
const auto base = m_cfg->GetBaseUrl();
Url url = ParseBase(base);
~DeviceControlService()
{
m_stop = true;
if (m_thread.joinable())
m_thread.join();
}
// Cert paths from enrollment step
std::filesystem::path ca = "/etc/iot/keys/issuing_ca.pem"; // or ca_chain.pem if you prefer
if (!std::filesystem::exists(ca)) ca = "/etc/iot/keys/ca_chain.pem";
std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
std::mt19937 rng{std::random_device{}()};
while (!m_stop) {
// Extract client key from kernel keyring to a temp file each cycle (kept minimal on disk)
std::optional<std::filesystem::path> tmpKey;
try {
tmpKey = ExtractClientKeyFromKernelKeyring();
} catch (const std::exception& e) {
spdlog::error("Key extraction failed: {}", e.what());
SleepWithJitterOnce(rng, m_cfg->GetPollingSeconds(), m_cfg->GetJitterSeconds());
continue;
void ArmDeepSleep(long long startMs, long long stopMs)
{
// Record sleep-until, and if were already inside the window, act now.
const auto now = NowMs();
if (stopMs <= startMs)
{
spdlog::warn("ArmDeepSleep: stop<=start");
return;
}
try {
auto cli = MakeClient(url, ca, crt, *tmpKey);
// --- GET /tasks/:guid
const std::string getPath = "/api/tasks/" + guid;
spdlog::info("GET {}", getPath);
auto res = cli->Get(getPath.c_str());
if (!res) {
spdlog::warn("GET {} failed (no response)", getPath);
} else if (res->status == 204) {
spdlog::debug("No tasks (204).");
} else if (res->status >= 200 && res->status < 300) {
auto tasks = ParseTasks(res->body);
for (const auto& t : tasks) {
auto handler = ResolveHandler(t.type);
auto [ok, resultJson, err] = handler(t);
PostResult(*cli, guid, t.id, ok, resultJson, err);
}
} else {
spdlog::warn("GET {} -> HTTP {}, body: {}", getPath, res->status, res->body);
}
} catch (const std::exception& e) {
spdlog::error("Task loop error: {}", e.what());
// If already inside window -> enter immediately
if (now >= startMs && now < stopMs)
{
EnterDeepSleepUntil(stopMs);
}
else
{
// Otherwise, set a future "until" and the loop will enter at startMs.
// Well keep both values:
m_sleepStartMs = startMs;
m_sleepUntilMs = stopMs;
}
}
// cleanup temp key ASAP
if (tmpKey) {
private:
std::shared_ptr<ConfigService> m_cfg;
Handlers m_handlers;
std::thread m_thread;
std::atomic<bool> m_stop{false};
std::atomic<long long> m_sleepUntilMs{0}; // epoch ms; 0 = not sleeping
std::atomic<long long> m_sleepStartMs{0};
Controls m_controls;
// --- helpers ---
static std::string Trim(const std::string &s)
{
auto b = s.find_first_not_of(" \t\r\n");
auto e = s.find_last_not_of(" \t\r\n");
if (b == std::string::npos)
return "";
return s.substr(b, e - b + 1);
}
static std::string Exec(const std::string &cmd)
{
std::array<char, 4096> buf{};
std::string out;
FILE *pipe = popen((cmd + " 2>&1").c_str(), "r");
if (!pipe)
throw std::runtime_error("popen failed: " + cmd);
while (fgets(buf.data(), (int)buf.size(), pipe) != nullptr)
out.append(buf.data());
int rc = pclose(pipe);
int exitCode = WIFEXITED(rc) ? WEXITSTATUS(rc) : rc;
if (exitCode != 0)
spdlog::warn("Command '{}' exited with code {}", cmd, exitCode);
return out;
}
struct Url
{
std::string scheme; // http/https
std::string host; // hostname or ip
int port = 0; // default if 0
};
static Url ParseBase(const std::string &base)
{
// very small parser: scheme://host[:port]
std::regex re(R"(^\s*(https?)://([^/:]+)(?::(\d+))?\s*$)");
std::smatch m;
if (!std::regex_match(base, m, re))
{
throw std::runtime_error("Invalid base URL for DeviceControlService: " + base);
}
Url u;
u.scheme = m[1].str();
u.host = m[2].str();
u.port = m[3].matched ? std::stoi(m[3].str()) : (u.scheme == "https" ? 443 : 80);
return u;
}
static long long NowMs()
{
using namespace std::chrono;
return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}
static std::optional<long long> ParseRfc3339UtcToMs(const std::string &s)
{
// Minimal, robust-ish parser for "YYYY-MM-DDTHH:MM:SS[.frac]Z"
// For production, consider date::parse or a small RFC3339 lib.
std::tm tm{};
char z = 'Z';
double frac = 0.0;
int n = 0;
// accept with optional fractional seconds
if (sscanf(s.c_str(), "%d-%d-%dT%d:%d:%d%n", &tm.tm_year, &tm.tm_mon, &tm.tm_mday,
&tm.tm_hour, &tm.tm_min, &tm.tm_sec, &n) != 6)
return std::nullopt;
tm.tm_year -= 1900;
tm.tm_mon -= 1;
const char *p = s.c_str() + n;
if (*p == '.')
{
// consume fractional
++p;
while (isdigit(*p))
++p; // ignore exact fraction
}
if (*p != 'Z')
return std::nullopt;
time_t tt = timegm(&tm);
if (tt < 0)
return std::nullopt;
return (long long)tt * 1000LL;
}
static std::optional<long long> JsonTimeToMs(const nlohmann::json &j)
{
if (j.contains("start_ms") && j.contains("stop_ms"))
{
return j.at("stop_ms").get<long long>(); // caller will use start_ms separately
}
if (j.contains("start") && j.contains("stop") && j["start"].is_string() && j["stop"].is_string())
{
auto sMs = ParseRfc3339UtcToMs(j["start"].get<std::string>());
auto eMs = ParseRfc3339UtcToMs(j["stop"].get<std::string>());
if (sMs && eMs)
return *eMs; // caller will use start separately
}
if (j.contains("start") && j["start"].is_string() &&
j["start"].get<std::string>() == "now" && j.contains("for_s"))
{
long long forS = j["for_s"].get<long long>();
return NowMs() + forS * 1000LL;
}
return std::nullopt;
}
// dumps the client key from keyring to a temp file and returns its path
static std::filesystem::path ExtractClientKeyFromKernelKeyring()
{
std::string id = Trim(Exec("keyctl search @s user iot-client-key | tail -n1"));
if (id.empty())
throw std::runtime_error("iot-client-key not found in keyring");
// Create a secure temp file
char tmpl[] = "/run/iot-keyXXXXXX";
int fd = mkstemp(tmpl);
if (fd < 0)
throw std::runtime_error("mkstemp failed for client key");
close(fd);
std::filesystem::path p(tmpl);
// Pipe the key payload into the temp file
std::string cmd = "keyctl pipe " + id + " > " + p.string();
Exec(cmd);
// quick sanity
if (std::filesystem::file_size(p) == 0)
{
std::error_code ec;
std::filesystem::remove(*tmpKey, ec);
std::filesystem::remove(p, ec);
throw std::runtime_error("keyctl pipe produced empty client key");
}
SleepWithJitterOnce(rng, m_cfg->GetPollingSeconds(), m_cfg->GetJitterSeconds());
return p;
}
}
};
// Create HTTPS client configured for mTLS (or HTTP if base is http)
std::unique_ptr<httplib::Client> MakeClient(const Url &u,
const std::filesystem::path &ca,
const std::filesystem::path &crt,
const std::filesystem::path &key)
{
if (u.scheme == "https")
{
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
auto cli = std::make_unique<httplib::SSLClient>(u.host.c_str(), u.port);
cli->enable_server_certificate_verification(true);
cli->set_ca_cert_path(ca.string().c_str());
cli->set_client_cert_file(crt.string().c_str(), key.string().c_str(), nullptr);
// Recommended timeouts for long polling-ish flows
cli->set_connection_timeout(10);
cli->set_read_timeout(60);
cli->set_write_timeout(60);
return cli;
#else
throw std::runtime_error("CPPHTTPLIB_OPENSSL_SUPPORT not enabled but https URL provided");
#endif
}
else
{
auto cli = std::make_unique<httplib::Client>(u.host.c_str(), u.port);
cli->set_connection_timeout(10);
cli->set_read_timeout(60);
cli->set_write_timeout(60);
return cli;
}
}
// simple jittered sleep
void SleepWithJitterOnce(std::mt19937 &rng, int baseSec, int jitterSec)
{
if (baseSec < 0)
baseSec = 0;
if (jitterSec < 0)
jitterSec = 0;
std::uniform_int_distribution<int> dist(-jitterSec, +jitterSec);
int delay = baseSec + dist(rng);
if (delay < 0)
delay = 0;
std::this_thread::sleep_for(std::chrono::seconds(delay));
}
// default fallback if handler is not provided
static HandlerResult NotImplemented(const Task &t)
{
std::string msg = "Handler not implemented for type: " + t.type;
spdlog::warn("{}", msg);
return {false, "{}", msg};
}
TaskHandler ResolveHandler(std::string_view type) const
{
if (type == "start_stream")
return m_handlers.onStartStream ? m_handlers.onStartStream : NotImplemented;
if (type == "stop_stream")
return m_handlers.onStopStream ? m_handlers.onStopStream : NotImplemented;
if (type == "start_recording")
return m_handlers.onStartRecording ? m_handlers.onStartRecording : NotImplemented;
if (type == "stop_recording")
return m_handlers.onStopRecording ? m_handlers.onStopRecording : NotImplemented;
if (type == "update_config")
return m_handlers.onUpdateConfig ? m_handlers.onUpdateConfig : NotImplemented;
if (type == "set_deep_sleep")
return m_handlers.onSetDeepSleep ? m_handlers.onSetDeepSleep : NotImplemented;
return NotImplemented;
}
static std::vector<Task> ParseTasks(const std::string &body)
{
// server might return single object or array
std::vector<Task> tasks;
auto j = nlohmann::json::parse(body);
auto push_one = [&](const nlohmann::json &x)
{
Task t;
t.id = x.value("id", 0);
t.type = x.value("type", "");
try
{
if (x.contains("payload"))
{
const auto &raw = x.at("payload");
if (raw.is_string())
{
// payload is quoted JSON string -> parse inner if valid, else keep as string
try
{
t.payload = nlohmann::json::parse(raw.get<std::string>());
}
catch (...)
{
t.payload = raw.get<std::string>();
}
}
else if (raw.is_object() || raw.is_array())
{
t.payload = raw;
}
else
{
t.payload = nlohmann::json::object();
}
}
}
catch (...)
{
t.payload = nlohmann::json::object();
}
tasks.push_back(std::move(t));
};
if (j.is_array())
{
for (auto &it : j)
push_one(it);
}
else if (j.is_object())
{
push_one(j);
}
return tasks;
}
void PostResult(httplib::Client &cli, const std::string &guid,
uint64_t taskId, bool success,
const std::string &resultJson, const std::string &err)
{
nlohmann::json dto = {
{"taskId", taskId},
{"success", success},
{"result", resultJson.empty() ? "{}" : resultJson},
{"error", err}};
std::string path = "/api/tasks/" + guid;
auto res = cli.Post(path.c_str(), dto.dump(), "application/json");
if (!res)
{
spdlog::error("POST {} failed (no response)", path);
return;
}
spdlog::info("POST {} -> HTTP {}", path, res->status);
}
void EnterDeepSleepUntil(long long stopMs)
{
m_sleepStartMs = NowMs();
m_sleepUntilMs = stopMs;
spdlog::info("Entering deep sleep until {} ms", stopMs);
// Stop stream & recording immediately
if (m_controls.stopStreamNow)
m_controls.stopStreamNow();
if (m_controls.stopRecordingNow)
m_controls.stopRecordingNow();
}
void RunLoop()
{
const std::string guid = m_cfg->GetGuid();
const auto base = m_cfg->GetBaseUrl();
Url url = ParseBase(base);
// Cert paths from enrollment step
std::filesystem::path ca = "/etc/iot/keys/issuing_ca.pem"; // or ca_chain.pem if you prefer
if (!std::filesystem::exists(ca))
ca = "/etc/iot/keys/ca_chain.pem";
std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
std::mt19937 rng{std::random_device{}()};
while (!m_stop)
{
// Extract client key from kernel keyring to a temp file each cycle (kept minimal on disk)
std::optional<std::filesystem::path> tmpKey;
try
{
tmpKey = ExtractClientKeyFromKernelKeyring();
}
catch (const std::exception &e)
{
spdlog::error("Key extraction failed: {}", e.what());
SleepWithJitterOnce(rng, m_cfg->GetPollingSeconds(), m_cfg->GetJitterSeconds());
continue;
}
const auto now = NowMs();
const auto startMs = m_sleepStartMs.load();
const auto untilMs = m_sleepUntilMs.load();
if (untilMs > 0)
{
if (now < startMs)
{
// Not yet time to sleep: wait until start (no network calls)
auto waitMs = std::min<long long>(startMs - now, 60'000);
std::this_thread::sleep_for(std::chrono::milliseconds(waitMs));
continue;
}
if (now >= startMs && now < untilMs)
{
// In deep sleep window: do not send any requests; just wait
auto waitMs = std::min<long long>(untilMs - now, 60'000);
std::this_thread::sleep_for(std::chrono::milliseconds(waitMs));
// On first entry, ensure we stopped immediately
if (now - startMs < 1200)
{ // within ~1.2s window
if (m_controls.stopStreamNow)
m_controls.stopStreamNow();
if (m_controls.stopRecordingNow)
m_controls.stopRecordingNow();
}
continue;
}
if (now >= untilMs)
{
// Sleep period over; clear flags and resume normal polling
m_sleepStartMs = 0;
m_sleepUntilMs = 0;
spdlog::info("Deep sleep finished; resuming operation");
}
try
{
auto cli = MakeClient(url, ca, crt, *tmpKey);
// --- GET /tasks/:guid
const std::string getPath = "/api/tasks/" + guid;
spdlog::info("GET {}", getPath);
auto res = cli->Get(getPath.c_str());
if (!res)
{
spdlog::warn("GET {} failed (no response)", getPath);
}
else if (res->status == 204)
{
spdlog::debug("No tasks (204).");
}
else if (res->status >= 200 && res->status < 300)
{
auto tasks = ParseTasks(res->body);
for (const auto &t : tasks)
{
auto handler = ResolveHandler(t.type);
auto [ok, resultJson, err] = handler(t);
PostResult(*cli, guid, t.id, ok, resultJson, err);
}
}
else
{
spdlog::warn("GET {} -> HTTP {}, body: {}", getPath, res->status, res->body);
}
}
catch (const std::exception &e)
{
spdlog::error("Task loop error: {}", e.what());
}
// cleanup temp key ASAP
if (tmpKey)
{
std::error_code ec;
std::filesystem::remove(*tmpKey, ec);
}
SleepWithJitterOnce(rng, m_cfg->GetPollingSeconds(), m_cfg->GetJitterSeconds());
}
}
}
};
} // namespace snoop

223
src/Services/WhipClient.h Normal file
View File

@@ -0,0 +1,223 @@
// src/Services/WhipClient.h
#pragma once
#include <memory>
#include <string>
#include <string_view>
#include <atomic>
#include <mutex>
#include <optional>
#include <filesystem>
#include <vector>
#include <regex>
#include <array>
#include <spdlog/spdlog.h>
#include <nlohmann/json.hpp>
#include <httplib.h> // build with CPPHTTPLIB_OPENSSL_SUPPORT
#include <rtc/rtc.hpp> // libdatachannel
namespace snoop {
class WhipClient {
public:
struct Params {
std::string whipUrl; // full WHIP endpoint (may already include ?token=...)
std::string caPath; // CA chain
std::string crtPath; // client cert
std::string keyPath; // client key (temp extracted from keyctl)
int sampleRate = 48000;
int channels = 1;
};
explicit WhipClient(Params p)
: m_p(std::move(p)) {}
~WhipClient() {
Stop();
}
void Start() {
std::lock_guard lk(m_mtx);
if (m_started) return;
// ----- PeerConnection -----
rtc::Configuration cfg;
// No STUN/TURN required for MediaMTX WHIP (server is public/ICE-lite).
// cfg.iceServers = { }; // default
m_pc = std::make_shared<rtc::PeerConnection>(cfg);
// Optional: observe connection state / ICE
m_pc->onStateChange([this](rtc::PeerConnection::State s){
spdlog::info("WHIP pc state: {}", (int)s);
});
m_pc->onGatheringStateChange([this](rtc::PeerConnection::GatheringState s){
spdlog::info("WHIP gathering state: {}", (int)s);
});
m_pc->onLocalDescription([this](rtc::Description desc){
// When we have the local SDP, POST (WHIP) to get the answer
try {
const std::string offer = std::string(desc);
auto [answer, resourceUrl] = PostOfferWHIP(offer);
m_resourceUrl = resourceUrl;
m_pc->setRemoteDescription(rtc::Description(answer, "answer"));
spdlog::info("WHIP remote description set");
} catch (const std::exception& e) {
spdlog::error("WHIP POST offer failed: {}", e.what());
}
});
m_pc->onLocalCandidate([this](rtc::Candidate c) {
// Trickle via PATCH to WHIP resource (if server requires)
if (!m_resourceUrl.has_value()) return;
try {
PatchCandidateWHIP(c);
} catch (const std::exception& e) {
spdlog::warn("WHIP PATCH candidate failed: {}", e.what());
}
});
// ----- Audio track / source -----
m_audioSource = rtc::CreateAudioSource(m_p.sampleRate, m_p.channels);
m_track = m_pc->addTrack(m_audioSource);
// ----- Create offer -----
m_pc->setLocalDescription(); // triggers onLocalDescription callback
m_started = true;
}
void Stop() {
std::lock_guard lk(m_mtx);
if (!m_started) return;
if (m_track) m_track = nullptr;
if (m_audioSource) m_audioSource = nullptr;
if (m_pc) m_pc = nullptr;
// No explicit WHIP DELETE here, but you can add it if server supports deleting the resource.
// cleanup resource url
m_resourceUrl.reset();
m_started = false;
}
// Feed PCM (float32) in interleaved format. frames = samples per channel.
void PushPCM(const float* interleaved, size_t frames) {
std::lock_guard lk(m_mtx);
if (!m_audioSource) return;
// libdatachannel expects planar/int16? Actually CreateAudioSource accepts float32
// and uses (frames, channels) to encode Opus internally.
// Push signature is: audioSource->pushFloat(interleaved, frames, channels, sampleRate)
m_audioSource->pushFloat(interleaved, (int)frames, m_p.channels, m_p.sampleRate);
}
private:
Params m_p;
std::shared_ptr<rtc::PeerConnection> m_pc;
std::shared_ptr<rtc::AudioSource> m_audioSource;
std::shared_ptr<rtc::Track> m_track;
std::optional<std::string> m_resourceUrl;
std::mutex m_mtx;
std::atomic<bool> m_started{false};
// --- WHIP HTTP helpers (mTLS-capable) ---
static std::tuple<std::string,std::string> ExtractAnswerAndLocation(const httplib::Result& r) {
// WHIP server responds 201 Created, body = SDP answer (text/plain),
// Location header = resource URL for PATCH candidates.
if (!r) throw std::runtime_error("No HTTP result");
if (r->status != 201 && r->status != 200)
throw std::runtime_error("Unexpected WHIP status: " + std::to_string(r->status));
std::string answer = r->body;
std::string resourceUrl;
if (r->has_header("Location")) resourceUrl = r->get_header_value("Location");
return {answer, resourceUrl};
}
std::pair<std::string,std::string> PostOfferWHIP(const std::string& sdpOffer) {
auto [cli, path] = MakeClientForUrl(m_p.whipUrl);
// WHIP requires:
// POST <endpoint> Content-Type: application/sdp Body: offer SDP
// Response: 201 Created, body: answer SDP, Location: resource URL
auto res = cli->Post(path.c_str(), sdpOffer, "application/sdp");
auto [answer, resUrl] = ExtractAnswerAndLocation(res);
return {answer, resUrl};
}
void PatchCandidateWHIP(const rtc::Candidate& cand) {
if (!m_resourceUrl) return;
auto [cli, path] = MakeClientForUrl(*m_resourceUrl);
// WHIP trickle: PATCH resource with Content-Type: application/trickle-ice-sdpfrag
// Body is an SDP fragment with "a=candidate:..."
std::string frag = "a=" + std::string(cand);
auto res = cli->Patch(path.c_str(), frag, "application/trickle-ice-sdpfrag");
if (!res || (res->status < 200 || res->status >= 300)) {
spdlog::warn("WHIP PATCH {} -> {}", path, res ? res->status : -1);
}
}
// Parse URL, return httplib client + path, configured with mTLS if https
static std::pair<std::unique_ptr<httplib::Client>, std::string>
MakeClientForUrl(const std::string& url) {
// scheme://host[:port]/path...
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)");
std::smatch m;
if (!std::regex_match(url, m, re)) {
throw std::runtime_error("Invalid URL: " + url);
}
std::string scheme = m[1].str();
std::string host = m[2].str();
int port = m[3].matched ? std::stoi(m[3].str()) : (scheme=="https"?443:80);
std::string path = m[4].matched ? m[4].str() : "/";
// We need the mTLS params; capture via thread-local? Well pass via this pointer.
// To access instance fields here, make this non-static or pass params through a lambda.
// Simpler: look up global thread-local; or we restructure to capture this.
// For cleanliness, well make a non-static wrapper below.
throw std::logic_error("Use MakeClientForUrl_inst instead");
}
std::pair<std::unique_ptr<httplib::Client>, std::string>
MakeClientForUrl_inst(const std::string& url) {
static const std::regex re(R"(^(https?)://([^/:]+)(?::(\d+))?(/.*)?$)");
std::smatch m;
if (!std::regex_match(url, m, re)) {
throw std::runtime_error("Invalid URL: " + url);
}
std::string scheme = m[1].str();
std::string host = m[2].str();
int port = m[3].matched ? std::stoi(m[3].str()) : (scheme=="https"?443:80);
std::string path = m[4].matched ? m[4].str() : "/";
if (scheme == "https") {
#ifndef CPPHTTPLIB_OPENSSL_SUPPORT
throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled");
#else
auto cli = std::make_unique<httplib::SSLClient>(host.c_str(), port);
cli->enable_server_certificate_verification(true);
cli->set_ca_cert_path(m_p.caPath.c_str());
cli->set_client_cert_file(m_p.crtPath.c_str(), m_p.keyPath.c_str(), nullptr);
cli->set_connection_timeout(10);
cli->set_read_timeout(60);
cli->set_write_timeout(60);
return {std::move(cli), path};
#endif
} else {
auto cli = std::make_unique<httplib::Client>(host.c_str(), port);
cli->set_connection_timeout(10);
cli->set_read_timeout(60);
cli->set_write_timeout(60);
return {std::move(cli), path};
}
}
// Overload to call instance version
std::pair<std::unique_ptr<httplib::Client>, std::string>
MakeClientForUrl(const std::string& url) {
return MakeClientForUrl_inst(url);
}
};
} // namespace snoop

View File

@@ -43,6 +43,35 @@ namespace snoop
spdlog::info("First-run enrollment completed.");
}
}
auto writerService = std::make_shared<AudioWriterService>(configService, "records");
// WHIP-only streamer
AudioStreamService streamer(configService);
// PCM tap -> feed WHIP
std::function<void(const float *, int)> pcmTap =
[&](const float *interleaved, int frames)
{
streamer.OnPCM(interleaved, static_cast<size_t>(frames));
};
// Encoder: keep writing to local Ogg via writerService; (no Socket.IO send)
TAudioEncoder encoder(sampleRate, channels, framesPerBuffer, [&](auto input, auto size) -> int
{
writerService->WriteAudioData(input, size, framesPerBuffer);
return paContinue; },
pcmTap // NEW: raw PCM to WHIP
);
snoop::DeviceControlService::Controls controls{
.stopStreamNow = [&]()
{ streamer.StopWhip(); },
.stopRecordingNow = [&]()
{ writerService->StopRecordingNow(); }};
g_taskSvc = std::make_unique<snoop::DeviceControlService>(configService, handlers, controls);
// ------------------------------
{
snoop::DeviceControlService::Handlers handlers{};
@@ -51,24 +80,41 @@ namespace snoop
{
spdlog::info("start_stream payload: {}", t.payload.dump());
// TODO: start your streaming pipeline using payload["whipUrl"], etc.
return snoop::DeviceControlService::HandlerResult{true, R"({"status":"started"})", ""};
std::string whipUrl;
try
{
if (t.payload.contains("whipUrl"))
whipUrl = t.payload.at("whipUrl").get<std::string>();
}
catch (...)
{
}
if (whipUrl.empty())
{
return snoop::DeviceControlService::HandlerResult{false, "{}", "whipUrl missing"};
}
bool ok = streamer.StartWhip(whipUrl, sampleRate, channels);
return snoop::DeviceControlService::HandlerResult{ok, R"({"whip":"started"})", ok ? "" : "failed"};
};
handlers.onStopStream = [](const snoop::DeviceControlService::Task &t)
{
spdlog::info("stop_stream payload: {}", t.payload.dump());
// TODO: stop streaming
streamer.StopWhip();
return snoop::DeviceControlService::HandlerResult{true, R"({"status":"stopped"})", ""};
};
handlers.onStartRecording = [&](const snoop::DeviceControlService::Task &t)
{
spdlog::info("start_recording payload: {}", t.payload.dump());
// TODO: if you gate writes in AudioWriterService, flip to recording mode here
writerService->StartRecording();
return snoop::DeviceControlService::HandlerResult{true, R"({"recording":"on"})", ""};
};
handlers.onStopRecording = [&](const snoop::DeviceControlService::Task &t)
{
spdlog::info("stop_recording payload: {}", t.payload.dump());
// TODO: flip to recording off
writerService->StopRecordingGracefully();
return snoop::DeviceControlService::HandlerResult{true, R"({"recording":"off"})", ""};
};
handlers.onUpdateConfig = [&](const snoop::DeviceControlService::Task &t)
@@ -104,26 +150,55 @@ namespace snoop
{
spdlog::info("set_deep_sleep payload: {}", t.payload.dump());
// TODO: use platform power management or just extend sleep window
return snoop::DeviceControlService::HandlerResult{true, R"({"sleep":"scheduled"})", ""};
try
{
long long startMs = 0, stopMs = 0;
if (t.payload.contains("start_ms") && t.payload.contains("stop_ms"))
{
startMs = t.payload.at("start_ms").get<long long>();
stopMs = t.payload.at("stop_ms").get<long long>();
}
else if (t.payload.contains("start") && t.payload.contains("stop"))
{
auto s = snoop::DeviceControlService::ParseRfc3339UtcToMs(t.payload["start"].get<std::string>());
auto e = snoop::DeviceControlService::ParseRfc3339UtcToMs(t.payload["stop"].get<std::string>());
if (!s || !e)
throw std::runtime_error("Invalid RFC3339 times");
startMs = *s;
stopMs = *e;
}
else if (t.payload.contains("start") && t.payload["start"].is_string() && t.payload["start"].get<std::string>() == "now" && t.payload.contains("for_s"))
{
startMs = snoop::DeviceControlService::NowMs();
stopMs = startMs + t.payload["for_s"].get<long long>() * 1000LL;
}
else
{
throw std::runtime_error("Missing/invalid start/stop time");
}
if (stopMs <= startMs)
throw std::runtime_error("stop <= start");
g_taskSvc->ArmDeepSleep(startMs, stopMs);
return snoop::DeviceControlService::HandlerResult{
true,
std::string("{\"start_ms\":") + std::to_string(startMs) + ",\"stop_ms\":" + std::to_string(stopMs) + "}",
""};
}
catch (const std::exception &e)
{
return snoop::DeviceControlService::HandlerResult{false, "{}", e.what()};
}
};
static std::unique_ptr<snoop::DeviceControlService> g_taskSvc;
g_taskSvc = std::make_unique<snoop::DeviceControlService>(configService, handlers);
// static std::unique_ptr<snoop::DeviceControlService> g_taskSvc;
// g_taskSvc = std::make_unique<snoop::DeviceControlService>(configService, handlers);
}
auto sioClient = std::make_shared<sio::client>();
sioClient->connect(configService->GetBaseUrl());
auto writerService = std::make_shared<AudioWriterService>(configService, "records");
AudioStreamService streamer(sioClient, configService->GetGuid());
TAudioEncoder encoder(sampleRate, channels, framesPerBuffer, [&](auto input, auto size) -> int
{
streamer.SendAudioData( input, size );
writerService->WriteAudioData( input, size, framesPerBuffer );
return paContinue; });
while (true)
{
sleep(1000);
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
catch (const std::exception &ex)