Files
snoop_device/src/Services/AudioWriterService.h

418 lines
16 KiB
C++

#pragma once
#include <memory>
#include <utility>
#include <thread>
#include <atomic>
#include <chrono>
#include <mutex>
#include <filesystem>
#include <fstream>
#include <sstream>
#include <array>
#include <optional>
#include <spdlog/spdlog.h>
#include <httplib.h> // build with CPPHTTPLIB_OPENSSL_SUPPORT
#include <sys/wait.h>
#include "AudioWriters/OggAudioWriter.h"
#include "ConfigService.h"
#include "Security/TlsKeyUtil.h"
namespace snoop
{
class AudioWriterService
{
std::shared_ptr<ConfigService> m_configService;
std::shared_ptr<OggAudioWriter> m_oggWriter;
std::string m_destinationDirectoryPath;
std::string m_queueDirectoryPath;
std::thread m_writingThread;
std::thread m_uploadThread;
std::mutex m_fetchFilePathsMutex;
std::mutex m_stateMutex;
unsigned long long int m_currentRecordStartedAt = 0;
std::string m_currentRecordFilePath;
std::atomic<bool> m_isIntermission = false;
// New recording control flags
std::atomic<bool> m_recordingEnabled{false};
std::atomic<bool> m_stopAfterCurrentSegment{false};
public:
explicit AudioWriterService(std::shared_ptr<ConfigService> configService, std::string destinationDirectoryPath)
: m_configService(std::move(configService)),
m_destinationDirectoryPath(std::move(destinationDirectoryPath)),
m_oggWriter(std::make_shared<OggAudioWriter>(48000, 1))
{
if (!this->m_destinationDirectoryPath.empty() && !this->m_destinationDirectoryPath.ends_with("/"))
{
this->m_destinationDirectoryPath.append("/");
}
this->m_queueDirectoryPath = this->m_destinationDirectoryPath + "queue/";
std::filesystem::create_directories(this->m_queueDirectoryPath);
this->MoveToQueueUncompletedRecords();
// Start background threads
this->m_writingThread = std::thread([this]()
{ this->WritingThread(); });
this->m_uploadThread = std::thread([this]()
{ this->UploadThread(); });
spdlog::info("AudioWriterService constructed; initial recordingEnabled=false");
}
~AudioWriterService()
{
this->m_isIntermission = true;
if (this->m_writingThread.joinable())
this->m_writingThread.join();
if (this->m_uploadThread.joinable())
this->m_uploadThread.join();
}
// -------- Public control API (called from DeviceControlService handlers) --------
// Begin a new recording cycle immediately (creates a fresh segment and starts writing).
void StartRecording()
{
std::lock_guard<std::mutex> lk(this->m_stateMutex);
if (m_recordingEnabled.load())
{
spdlog::info("StartRecording ignored: already recording");
return;
}
m_stopAfterCurrentSegment = false;
m_recordingEnabled = true;
spdlog::info("Recording enabled");
}
// Graceful stop: finish the current segment at the next rotation boundary, then stop.
void StopRecordingGracefully()
{
std::lock_guard<std::mutex> lk(this->m_stateMutex);
if (!m_recordingEnabled.load())
{
spdlog::info("StopRecordingGracefully ignored: not recording");
return;
}
m_stopAfterCurrentSegment = true;
spdlog::info("Recording will stop after current segment completes");
}
bool IsRecording() const
{
return m_recordingEnabled.load();
}
// Called from the encoder callback — only writes when recording is enabled
void WriteAudioData(const char *data, size_t size, size_t frames)
{
if (!m_recordingEnabled.load())
return;
this->m_oggWriter->Write(data, size, frames);
}
// stops recording immidiately, when deep_sleep received
void StopRecordingNow()
{
std::lock_guard<std::mutex> lk(this->m_stateMutex);
if (!m_recordingEnabled.load())
{
spdlog::info("StopRecordingNow ignored: not recording");
return;
}
// Force-close current segment right away, enqueue, and disable recording.
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue(this->m_currentRecordFilePath);
m_recordingEnabled = false;
m_stopAfterCurrentSegment = false;
spdlog::info("Recording stopped immediately (deep sleep)");
}
private:
// ----------------------- Helpers (HTTPS mTLS) -----------------------
struct Url
{
std::string scheme;
std::string host;
int port = 0;
};
static Url ParseBase(const std::string &base)
{
std::regex re(R"(^\s*(https?)://([^/:]+)(?::(\d+))?\s*$)");
std::smatch m;
if (!std::regex_match(base, m, re))
{
throw std::runtime_error("Invalid base URL: " + base);
}
Url u;
u.scheme = m[1].str();
u.host = m[2].str();
u.port = m[3].matched ? std::stoi(m[3].str()) : (u.scheme == "https" ? 443 : 80);
return u;
}
std::unique_ptr<httplib::SSLClient> MakeClientMTLS(const Url &u,
const std::filesystem::path &ca,
const std::filesystem::path &crt,
const std::filesystem::path &key)
{
if (u.scheme == "https")
{
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
auto cli = std::make_unique<httplib::SSLClient>(u.host.c_str(), u.port, crt.string().c_str(), key.string().c_str(), std::string());
cli->enable_server_certificate_verification(false);
cli->set_ca_cert_path(ca.string().c_str());
cli->set_connection_timeout(10);
cli->set_read_timeout(120);
cli->set_write_timeout(120);
return cli;
#else
throw std::runtime_error("HTTPS baseUrl but CPPHTTPLIB_OPENSSL_SUPPORT is not enabled");
#endif
}
// else
// {
// auto cli = std::make_unique<httplib::Client>(u.host.c_str(), u.port);
// cli->set_connection_timeout(10);
// cli->set_read_timeout(120);
// cli->set_write_timeout(120);
// return cli;
// }
}
// ----------------------------- Existing logic (adjusted) -----------------------------
void MoveToQueueUncompletedRecords()
{
std::vector<std::filesystem::path> files;
for (const auto &entry : std::filesystem::directory_iterator(this->m_destinationDirectoryPath))
{
files.push_back(entry.path());
}
for (const auto &file : files)
{
if (file.filename().string() != "queue")
{
spdlog::info("Move uncompleted record {} to queue", file.filename().string());
this->MoveToUploadQueue(file.string());
}
}
}
void WritingThread()
{
// recording starts ONLY when StartRecording() is called
while (!m_isIntermission)
{
if (!m_recordingEnabled.load())
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
continue;
}
// Start a fresh segment
auto now = std::chrono::system_clock::now();
this->m_currentRecordStartedAt =
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
this->m_currentRecordFilePath = this->m_destinationDirectoryPath + std::to_string(this->m_currentRecordStartedAt);
this->m_oggWriter->StartWriting(this->m_currentRecordFilePath);
spdlog::info("Recording segment started: {}", this->m_currentRecordFilePath);
// Write until duration elapses
const auto segDurationMs = this->m_configService->GetRecordingDuration();
while (!m_isIntermission && m_recordingEnabled.load())
{
now = std::chrono::system_clock::now();
auto currentRecordDuration =
std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count() - this->m_currentRecordStartedAt;
if (currentRecordDuration >= segDurationMs)
break;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
// Close current segment and enqueue
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue(this->m_currentRecordFilePath);
spdlog::info("Recording segment finished: {}", this->m_currentRecordFilePath);
// If graceful stop requested, stop after finishing this segment
if (m_stopAfterCurrentSegment.load())
{
m_recordingEnabled = false;
m_stopAfterCurrentSegment = false;
spdlog::info("Recording disabled after graceful stop");
}
}
// If exiting service while in a middle of a segment, ensure clean close
if (m_recordingEnabled.load())
{
this->m_oggWriter->StopWriting();
this->MoveToUploadQueue(this->m_currentRecordFilePath);
m_recordingEnabled = false;
}
}
void MoveToUploadQueue(const std::string &filePath)
{
spdlog::info("MoveToUploadQueue( {} )", filePath);
std::lock_guard lock(this->m_fetchFilePathsMutex);
auto now = std::chrono::system_clock::now();
auto recordStoppedAt = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
if (std::filesystem::exists(filePath))
{
auto fileName = std::filesystem::path(filePath).filename().string() + "-" + std::to_string(recordStoppedAt);
std::filesystem::rename(filePath, m_queueDirectoryPath + fileName);
}
}
void UploadThread()
{
const auto baseUrl = this->m_configService->GetBaseUrl();
Url url = ParseBase(baseUrl);
// certs from enrollment
std::filesystem::path ca = "/etc/iot/keys/issuing_ca.pem";
if (!std::filesystem::exists(ca))
ca = "/etc/iot/keys/ca_chain.pem";
const std::filesystem::path crt = "/etc/iot/keys/device.crt.pem";
while (!m_isIntermission)
{
std::vector<std::filesystem::path> files;
{
std::lock_guard l(this->m_fetchFilePathsMutex);
try
{
for (const auto &entry : std::filesystem::directory_iterator(this->m_queueDirectoryPath))
{
if (entry.is_regular_file())
files.push_back(entry.path());
}
}
catch (const std::exception &e)
{
spdlog::error("Error reading queue directory: {}", e.what());
}
}
// Prepare client key (temp file) for this upload pass
std::optional<std::filesystem::path> tmpKey;
try
{
tmpKey = snoop::device_sec::ExtractClientKeyFromKernelKeyring();
}
catch (const std::exception &e)
{
spdlog::error("Cannot extract client key for mTLS: {}", e.what());
// Wait a bit and retry later
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
try
{
auto client = MakeClientMTLS(url, ca, crt, *tmpKey);
for (const auto &filePath : files)
{
auto fileName = filePath.filename().string();
spdlog::info("Uploading file: {}", fileName);
size_t delimiterPos = fileName.find('-');
if (delimiterPos == std::string::npos)
{
spdlog::warn("Unexpected filename format, skipping: {}", fileName);
continue;
}
std::string startedAt = fileName.substr(0, delimiterPos);
std::string stoppedAt = fileName.substr(delimiterPos + 1);
try
{
if (SendRecordedFileMTLS(*client, filePath.string(),
std::stoull(startedAt),
std::stoull(stoppedAt)))
{
spdlog::info("File uploaded, deleting: {}", filePath.string());
std::filesystem::remove(filePath);
}
else
{
spdlog::warn("Failed to upload file: {}", filePath.string());
}
}
catch (const std::exception &e)
{
spdlog::error("Exception during file upload: {}", e.what());
}
}
}
catch (const std::exception &e)
{
spdlog::error("mTLS client setup failed: {}", e.what());
}
// cleanup temp key asap
if (tmpKey)
{
std::error_code ec;
std::filesystem::remove(*tmpKey, ec);
}
std::this_thread::sleep_for(std::chrono::milliseconds(800));
}
}
bool SendRecordedFileMTLS(httplib::SSLClient &client,
const std::string &filepath,
unsigned long long int startedAt,
unsigned long long int stoppedAt)
{
spdlog::info("SendRecordedFile (mTLS): {}", filepath);
std::ifstream ifs(filepath, std::ios::binary);
if (!ifs)
throw std::runtime_error("Failed to open file: " + filepath);
std::vector<char> buffer((std::istreambuf_iterator<char>(ifs)), std::istreambuf_iterator<char>());
// Multipart form: file + guid + times (same fields as before)
const std::string guid = this->m_configService->GetGuid();
httplib::MultipartFormDataItems items = {
{"file", std::string(buffer.begin(), buffer.end()), "file.ogg", "audio/ogg"},
{"guid", guid, "", "text/plain"},
{"startedAt", std::to_string(startedAt), "", "text/plain"},
{"stoppedAt", std::to_string(stoppedAt), "", "text/plain"},
};
auto res = client.Post("/api/records/upload/", items);
if (res && (res->status == 201 || res->status == 200))
{
spdlog::info("File uploaded successfully: HTTP {}", res->status);
return true;
}
if (res)
{
spdlog::error("Upload failed: HTTP {}, body: {}", res->status, res->body);
}
else
{
spdlog::error("Upload failed: no response");
}
return false;
}
};
} // namespace snoop