added some changes to mediamtx and audiostreams

This commit is contained in:
tdv
2025-10-21 19:31:55 +03:00
parent 89c44d2979
commit 6d2719faeb
8 changed files with 357 additions and 69 deletions

View File

@@ -0,0 +1,48 @@
package handlers
import (
"sync"
)
type Broker struct {
mu sync.Mutex
subs map[string]map[chan struct{}]struct{} // key = path, val = set of channels
}
func NewBroker() *Broker {
return &Broker{subs: make(map[string]map[chan struct{}]struct{})}
}
func (b *Broker) Subscribe(path string) chan struct{} {
ch := make(chan struct{}, 1)
b.mu.Lock()
defer b.mu.Unlock()
if b.subs[path] == nil {
b.subs[path] = make(map[chan struct{}]struct{})
}
b.subs[path][ch] = struct{}{}
return ch
}
func (b *Broker) Unsubscribe(path string, ch chan struct{}) {
b.mu.Lock()
defer b.mu.Unlock()
if set, ok := b.subs[path]; ok {
delete(set, ch)
if len(set) == 0 {
delete(b.subs, path)
}
}
close(ch)
}
func (b *Broker) Publish(path string) {
b.mu.Lock()
defer b.mu.Unlock()
for ch := range b.subs[path] {
select {
case ch <- struct{}{}:
default:
}
}
}

View File

@@ -2,6 +2,7 @@ package handlers
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@@ -22,10 +23,11 @@ type MediaMTXHandler struct {
jwtMgr *crypto.JWTManager
db *gorm.DB
cfg config.MediaMTXConfig
bus *Broker
}
func NewMediaMTXHandler(db *gorm.DB, jwt *crypto.JWTManager, c config.MediaMTXConfig) *MediaMTXHandler {
return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c}
return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c, bus: NewBroker()}
}
// --- 3.1 External auth endpoint called by MediaMTX
@@ -86,7 +88,18 @@ func (h *MediaMTXHandler) Auth(c *gin.Context) {
return
}
}
// allowed
if req.Action == "publish" {
if guid, ok := guidFromPath(req.Path); ok {
_ = guid // not used here, but available
// tell listeners this path is live (or at least authorized to start)
if h.bus != nil {
h.bus.Publish(req.Path)
}
}
}
c.Status(http.StatusOK)
}
@@ -340,3 +353,152 @@ func BodyLogger() gin.HandlerFunc {
c.Next()
}
}
// --- poll MediaMTX API until path is live ------------------------------------
func (h *MediaMTXHandler) WaitUntilLive(path string, timeout time.Duration) bool {
api := strings.TrimRight(h.cfg.APIBase, "/")
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
resp, err := http.Get(api + "/v3/paths/list")
if err == nil && resp.StatusCode == 200 {
var pl pathsListRes
_ = json.NewDecoder(resp.Body).Decode(&pl)
resp.Body.Close()
for _, it := range pl.Items {
if it.Name == path {
return true
}
}
} else if resp != nil {
resp.Body.Close()
}
time.Sleep(500 * time.Millisecond)
}
return false
}
func (h *MediaMTXHandler) expectedStartWait(guid string) time.Duration {
var cfg models.DeviceConfig
if err := h.db.Where("device_guid = ?", guid).First(&cfg).Error; err != nil {
return 60 * time.Second // fallback if no config row
}
poll := cfg.MPolling
jit := cfg.MJitter
if poll <= 0 {
poll = 60
}
if jit < 0 {
jit = 10
}
safety := 5 // seconds
return time.Duration(poll+jit+safety) * time.Second
}
// GET /streams/:guid/wait
func (h *MediaMTXHandler) WaitLiveSSE(c *gin.Context) {
guid := c.Param("guid")
if guid == "" {
c.Status(http.StatusBadRequest)
return
}
path := "live/" + guid
// Per-device max wait = MPolling + MJitter + safety
timeout := h.expectedStartWait(guid)
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
flush := func() {
if f, ok := c.Writer.(http.Flusher); ok {
f.Flush()
}
}
// If already live, notify immediately and exit.
if h.IsLive(path) {
fmt.Fprintf(c.Writer, "event: live\ndata: %s\n\n", path)
flush()
return
}
// Subscribe to bus for publish-auth events on this path
ch := h.bus.Subscribe(path)
defer h.bus.Unsubscribe(path, ch)
// Background short poller (safety net in case bus event is missed)
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
defer cancel()
pollDone := make(chan struct{})
go func() {
t := time.NewTicker(500 * time.Millisecond)
defer t.Stop()
defer close(pollDone)
for {
select {
case <-ctx.Done():
return
case <-t.C:
if h.IsLive(path) {
// Normalize through bus so waiter below handles it uniformly
h.bus.Publish(path)
return
}
}
}
}()
// Wait for either: bus event, timeout, or client disconnect.
select {
case <-ch:
fmt.Fprintf(c.Writer, "event: live\ndata: %s\n\n", path)
flush()
return
case <-ctx.Done():
// Optional: tell client we timed out so it can keep a gentle retry loop
fmt.Fprintf(c.Writer, "event: timeout\ndata: {\"path\":\"%s\"}\n\n", path)
flush()
return
case <-c.Request.Context().Done():
return
}
}
// --- helpers: path/guid ------------------------------------------------------
func guidFromPath(path string) (string, bool) {
parts := strings.SplitN(path, "/", 2)
if len(parts) != 2 || parts[0] != "live" || parts[1] == "" {
return "", false
}
return parts[1], true
}
// IsLive performs a single check against MTX API to see if the path exists now.
func (h *MediaMTXHandler) IsLive(path string) bool {
api := strings.TrimRight(h.cfg.APIBase, "/")
resp, err := http.Get(api + "/v3/paths/list")
if err != nil {
return false
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return false
}
var pl pathsListRes
if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil {
return false
}
for _, it := range pl.Items {
if it.Name == path {
return true
}
}
return false
}

View File

@@ -88,6 +88,8 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
// Admin controls
r.GET("/mediamtx/paths", authMW, adminOnly, mediamtxH.ListPaths)
r.POST("/mediamtx/webrtc/kick/:id", authMW, adminOnly, mediamtxH.KickWebRTC)
// SSE endpoint for audio stream UI
r.GET("/mediamtx/:guid/wait", authMW, middleware.DeviceAccessFilter(), mediamtxH.WaitLiveSSE)
r.GET("/trackers", authMW, middleware.TrackerAccessFilter(), trackersH.List)
r.POST("/trackers/create", authMW, trackersH.Create)