fixed audiostream, tasks andd other shit

This commit is contained in:
tdv
2025-10-15 19:14:52 +03:00
parent 2385252fcc
commit 6bfee40029
4 changed files with 105 additions and 31 deletions

View File

@@ -60,17 +60,17 @@ public:
std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
// extract client key via keyctl
auto tmpKey = snoop::device_sec::ExtractClientKeyFromKernelKeyring();
if (!tmpKey.string().empty()) {
spdlog::error("Cannot extract client key for WHIP (keyctl user iot-client-key)");
return false;
}
// auto tmpKey = snoop::device_sec::ExtractClientKeyFromKernelKeyring();
// if (!tmpKey.string().empty()) {
// spdlog::error("Cannot extract client key for WHIP (keyctl user iot-client-key)");
// return false;
// }
WhipClient::Params p{
.whipUrl = whipUrl,
.caPath = ca.string(),
.crtPath = crt.string(),
.keyPath = tmpKey,
// .keyPath = tmpKey,
.sampleRate= sampleRate,
.channels = channels
};
@@ -78,11 +78,11 @@ public:
try {
m_whip->Start();
spdlog::info("WHIP started");
m_tmpKeyPath = tmpKey;
// m_tmpKeyPath = tmpKey;
return true;
} catch (const std::exception& e) {
spdlog::error("WHIP start failed: {}", e.what());
std::error_code ec; std::filesystem::remove(tmpKey, ec);
// std::error_code ec; std::filesystem::remove(tmpKey, ec);
m_whip.reset();
return false;
}

View File

@@ -435,7 +435,7 @@ namespace snoop
{"stoppedAt", std::to_string(stoppedAt), "", "text/plain"},
};
auto res = client.Post("/api/records/upload/", items);
auto res = client.Post("/api/records/upload", items);
if (res && (res->status == 201 || res->status == 200))
{
spdlog::info("File uploaded successfully: HTTP {}", res->status);

View File

@@ -248,9 +248,13 @@ namespace snoop
// default fallback if handler is not provided
static HandlerResult NotImplemented(const Task &t)
{
std::string msg = "Handler not implemented for type: " + t.type;
std::string msg = "Handler not implemented for type: " + (t.type.empty() ? "<empty>" : t.type);
spdlog::warn("{}", msg);
return {false, "{}", msg};
// Include echo of payload for debugging
nlohmann::json res = {
{"type", t.type},
{"payload", t.payload}};
return {false, res.dump(), msg};
}
TaskHandler ResolveHandler(std::string_view type) const
@@ -272,15 +276,37 @@ namespace snoop
static std::vector<Task> ParseTasks(const std::string &body)
{
// server might return single object or array
std::vector<Task> tasks;
auto j = nlohmann::json::parse(body);
std::vector<Task> out;
nlohmann::json j = nlohmann::json::parse(body);
auto push_one = [&](const nlohmann::json &x)
auto to_id = [](const nlohmann::json &v) -> uint64_t
{
if (v.is_number_unsigned())
return v.get<uint64_t>();
if (v.is_number_integer())
return static_cast<uint64_t>(v.get<long long>());
if (v.is_string())
{
try
{
return static_cast<uint64_t>(std::stoull(v.get<std::string>()));
}
catch (...)
{
}
}
return 0ULL;
};
auto parse_one = [&](const nlohmann::json &x)
{
Task t;
t.id = x.value("id", 0);
t.type = x.value("type", "");
if (x.contains("id"))
t.id = to_id(x["id"]);
if (x.contains("type") && x["type"].is_string())
t.type = x["type"].get<std::string>();
// payload can be stringified JSON or object
try
{
if (x.contains("payload"))
@@ -288,7 +314,6 @@ namespace snoop
const auto &raw = x.at("payload");
if (raw.is_string())
{
// payload is quoted JSON string -> parse inner if valid, else keep as string
try
{
t.payload = nlohmann::json::parse(raw.get<std::string>());
@@ -312,38 +337,66 @@ namespace snoop
{
t.payload = nlohmann::json::object();
}
tasks.push_back(std::move(t));
if (t.id == 0)
spdlog::warn("Task without valid id: {}", x.dump());
if (t.type.empty())
spdlog::warn("Task without type: {}", x.dump());
out.push_back(std::move(t));
};
if (j.is_array())
{
for (auto &it : j)
push_one(it);
for (const auto &it : j)
parse_one(it);
}
else if (j.is_object())
{
push_one(j);
// NEW: wrapper shape { hasTask: bool, task: { ... } }
if (j.contains("task") && j["task"].is_object())
{
// If server includes a 'status' gate and you only want runnable tasks, you can check here:
// if (j.value("hasTask", false) == true)
parse_one(j["task"]);
}
else
{
// bare single task
parse_one(j);
}
}
return tasks;
else
{
spdlog::warn("Unexpected tasks JSON: {}", body);
}
return out;
}
void PostResult(httplib::SSLClient &cli, const std::string &guid,
uint64_t taskId, bool success,
const std::string &resultJson, const std::string &err)
{
if (taskId == 0)
{
spdlog::warn("Skip posting result: invalid taskId=0 (success={}, err={})", success, err);
return;
}
nlohmann::json dto = {
{"taskId", taskId},
{"success", success},
{"result", resultJson.empty() ? "{}" : resultJson},
{"error", err}};
std::string path = "/api/tasks/" + guid;
auto res = cli.Post(path.c_str(), dto.dump(), "application/json");
auto body = dto.dump();
spdlog::debug("POST {} body: {}", path, body);
auto res = cli.Post(path.c_str(), body, "application/json");
if (!res)
{
spdlog::error("POST {} failed (no response)", path);
return;
}
spdlog::info("POST {} -> HTTP {}", path, res->status);
spdlog::info("POST {} -> HTTP {} body: {}", path, res->status, res->body);
}
void EnterDeepSleepUntil(long long stopMs)
@@ -504,6 +557,11 @@ namespace snoop
{
auto handler = ResolveHandler(t.type);
auto [ok, resultJson, err] = handler(t);
if (t.id == 0)
{
spdlog::warn("Won't POST result for task with id=0: type='{}'", t.type);
continue;
}
PostResult(*m_taskClient, guid, t.id, ok, resultJson, err);
}
}

View File

@@ -23,6 +23,7 @@
#include <rtc/peerconnection.hpp>
#include <rtc/description.hpp>
#include <rtc/track.hpp>
#include "Security/TlsKeyUtil.h"
namespace snoop
{
@@ -35,7 +36,7 @@ namespace snoop
std::string whipUrl; // full WHIP endpoint (may include ?token=...)
std::string caPath; // CA chain
std::string crtPath; // client cert
std::string keyPath; // client key (temp extracted from keyctl)
// std::string keyPath; // client key (temp extracted from keyctl)
int sampleRate = 48000;
int channels = 1; // 1 or 2; timestamp advance uses PCM frames per channel
};
@@ -79,6 +80,15 @@ namespace snoop
// Opus PT typically negotiated to 111 by browsers; we'll use 111 in RTP header
m_track = m_pc->addTrack(audioDesc);
// IMPORTANT: wait for SRTP sender to be ready
m_track->onOpen([this] {
spdlog::info("WHIP track opened");
m_trackOpen = true;
});
m_track->onClosed([this] {
spdlog::info("WHIP track closed");
m_trackOpen = false;
});
// Initialize RTP state (random SSRC/seq)
std::mt19937 rng{std::random_device{}()};
m_ssrc = std::uniform_int_distribution<uint32_t>()(rng);
@@ -96,7 +106,7 @@ namespace snoop
std::lock_guard lk(m_mtx);
if (!m_started)
return;
m_trackOpen = false;
m_track.reset();
m_pc.reset();
@@ -111,7 +121,7 @@ namespace snoop
void PushOpus(const unsigned char *opusData, size_t opusBytes, int pcmFramesPerChannel)
{
std::lock_guard lk(m_mtx);
if (!m_track || !m_started)
if (!m_track || !m_started || !m_trackOpen.load())
return;
// Build RTP header (12 bytes)
@@ -152,6 +162,7 @@ namespace snoop
std::optional<std::string> m_resourceUrl;
std::mutex m_mtx;
std::atomic<bool> m_started{false};
std::atomic<bool> m_trackOpen{false};
// RTP state
uint32_t m_ssrc = 0;
@@ -214,8 +225,13 @@ namespace snoop
#ifndef CPPHTTPLIB_OPENSSL_SUPPORT
throw std::runtime_error("https URL but CPPHTTPLIB_OPENSSL_SUPPORT not enabled");
#else
auto cli = std::make_unique<httplib::SSLClient>(host.c_str(), port, m_p.crtPath, m_p.keyPath, std::string());
cli->enable_server_certificate_verification(true);
auto payload = snoop::device_sec::ReadClientKeyPayloadFromKeyring();
// pick a writable dir (use /tmp to avoid /run root perms surprises)
snoop::device_sec::TempFile tf(std::filesystem::temp_directory_path());
tf.write_all(payload.data(), payload.size());
auto cli = std::make_unique<httplib::SSLClient>(host.c_str(), port, m_p.crtPath, tf.path, std::string());
cli->enable_server_certificate_verification(false);
cli->set_ca_cert_path(m_p.caPath.c_str());
cli->set_connection_timeout(10);
cli->set_read_timeout(60);