package handlers import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "smoop-api/internal/config" "smoop-api/internal/crypto" "smoop-api/internal/dto" "smoop-api/internal/models" "strings" "time" "github.com/gin-gonic/gin" "gorm.io/gorm" ) type MediaMTXHandler struct { jwtMgr *crypto.JWTManager db *gorm.DB cfg config.MediaMTXConfig bus *Broker } func NewMediaMTXHandler(db *gorm.DB, jwt *crypto.JWTManager, c config.MediaMTXConfig) *MediaMTXHandler { return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c, bus: NewBroker()} } // --- 3.1 External auth endpoint called by MediaMTX // POST /mediamtx/auth func (h *MediaMTXHandler) Auth(c *gin.Context) { var req dto.MediaMTXAuthReq body, _ := c.GetRawData() c.Request.Body = io.NopCloser(bytes.NewReader(body)) c.Writer.WriteString(fmt.Sprintf("DEBUG BODY:\n%s\n", string(body))) if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "bad auth body"}) return } // token can come from Authorization: Bearer or from query (?token=) tok := extractBearer(c.GetHeader("Authorization")) if tok == "" && req.Query != "" { tok = tokenFromQuery(req.Query) // Parse "token=" from the raw query string } if tok == "" { tok = strings.TrimSpace(req.Token) } if tok == "" { c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"}) return } // parse & validate media token mc, err := h.jwtMgr.ParseMedia(tok) if err != nil { c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) return } // enforce act/path if mc.Act != req.Action { c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"}) return } if mc.Path != req.Path { c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"}) return } sub := mc.Subject // from RegisteredClaims.Subject switch req.Action { case "read": if !h.canRead(sub, req.Path) { c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"}) return } case "publish": if !h.canPublish(sub, req.Path) { c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"}) return } } // allowed if req.Action == "publish" { if guid, ok := guidFromPath(req.Path); ok { _ = guid // not used here, but available // tell listeners this path is live (or at least authorized to start) if h.bus != nil { h.bus.Publish(req.Path) } } } c.Status(http.StatusOK) } func extractBearer(h string) string { if strings.HasPrefix(strings.ToLower(h), "bearer ") { return strings.TrimSpace(h[7:]) } return "" } func tokenFromQuery(raw string) string { if raw == "" { return "" } s := strings.TrimPrefix(raw, "?") q, _ := url.ParseQuery(s) return q.Get("token") } // naive helpers: replace with real queries to users/devices tables func (h *MediaMTXHandler) canRead(sub, path string) bool { // path is "live/" parts := strings.SplitN(path, "/", 2) if len(parts) != 2 { return false } guid := parts[1] // Find the user; admins -> allow; else check user_devices join var u models.User if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin { return true } // check assignment var count int64 _ = h.db.Table("user_devices"). Where("user_id = ? AND device_guid = ?", sub, guid). Count(&count).Error return count > 0 } func (h *MediaMTXHandler) canPublish(sub, path string) bool { // For devices you may use sub=0 or map to a device row; here: allow admins only if sub == "0" { return true } var u models.User if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin { return true } return false } // --- 3.2 Mint publish token (device flow) -> returns WHIP URL // POST /mediamtx/token/publish {guid} func (h *MediaMTXHandler) MintPublish(c *gin.Context) { user, ok := GetUserContext(c) if !ok { c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) return } var req dto.PublishTokenReq if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"}) return } // Permission check (admin or assigned) if user.Role != models.RoleAdmin { var count int64 _ = h.db.Table("user_devices"). Where("user_id = ? AND device_guid = ?", user.ID, req.GUID). Count(&count).Error if count == 0 { c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"}) return } } path := "live/" + req.GUID // We mint a *read* token for the browser to consume HLS. tok, err := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "token error"}) return } pub := strings.TrimRight(h.cfg.PublicBaseURL, "/") hls := fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s", pub, path, url.QueryEscape(tok), ) c.JSON(http.StatusCreated, dto.PublishTokenResp{HLS: hls}) } // --- 3.3 Mint read token (user flow) -> returns HLS + WHEP URLs // POST /mediamtx/token/read {guid} func (h *MediaMTXHandler) MintRead(c *gin.Context) { user, ok := GetUserContext(c) if !ok { c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) return } var req dto.ReadTokenReq if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"}) return } path := "live/" + req.GUID // check permission before minting if user.Role != models.RoleAdmin { var count int64 _ = h.db.Table("user_devices"). Where("user_id = ? AND device_guid = ?", user.ID, req.GUID). Count(&count).Error if count == 0 { c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"}) return } } tok, _ := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL) pub := strings.TrimRight(h.cfg.PublicBaseURL, "/") resp := dto.ReadTokenResp{ HLS: fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s", pub, path, url.QueryEscape(tok)), WHEP: fmt.Sprintf("%s/webrtc/play/%s?token=%s", pub, path, url.QueryEscape(tok)), } c.JSON(http.StatusCreated, resp) } // --- 3.4 Admin "controls" using MediaMTX Control API (v3) type pathsListRes struct { Items []struct { Name string `json:"name"` } `json:"items"` } func (h *MediaMTXHandler) ListPaths(c *gin.Context) { // GET {apiBase}/v3/paths/list resp, err := http.Get(strings.TrimRight(h.cfg.APIBase, "/") + "/v3/paths/list") if err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"}) return } defer resp.Body.Close() if resp.StatusCode != 200 { c.JSON(resp.StatusCode, gin.H{"error": "mtx api error"}) return } var pl pathsListRes if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": "decode error"}) return } c.JSON(200, pl) } func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) { // POST {apiBase}/v3/webrtcsessions/kick/{id} id := c.Param("id") reqURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/kick/" + url.PathEscape(id) httpResp, err := http.Post(reqURL, "application/json", http.NoBody) if err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"}) return } defer httpResp.Body.Close() if httpResp.StatusCode/100 != 2 { c.JSON(httpResp.StatusCode, gin.H{"error": "kick failed"}) return } c.Status(http.StatusNoContent) } func (h *MediaMTXHandler) StartStreamPayload(guid string) (string, error) { path := "live/" + guid ttl := time.Duration(h.cfg.TokenTTL) * time.Second tok, err := h.jwtMgr.GenerateMediaToken(0, "publish", path, ttl) // sub=0 = device if err != nil { return "", err } whip := fmt.Sprintf("%s/whip/%s?token=%s", strings.TrimRight(h.cfg.PublicBaseURL, "/"), path, url.QueryEscape(tok), ) payload := map[string]any{ "whipUrl": whip, "path": path, "tokenTTL_s": h.cfg.TokenTTL, } b, _ := json.Marshal(payload) return string(b), nil } type webrtcSession struct { ID string `json:"id"` Path string `json:"path"` } type webrtcListRes struct { Items []webrtcSession `json:"items"` } // KickWebRTCSessionsByPath lists and kicks webrtc sessions for a given path. func (h *MediaMTXHandler) KickWebRTCSessionsByPath(path string) error { listURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/list" resp, err := http.Get(listURL) if err != nil { return fmt.Errorf("failed to get list: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return fmt.Errorf("mtx list failed: %s", resp.Status) } var l webrtcListRes if err := json.NewDecoder(resp.Body).Decode(&l); err != nil { return fmt.Errorf("decode error: %w", err) } for _, it := range l.Items { if it.Path == path && it.ID != "" { kickURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/kick/" + url.PathEscape(it.ID) kresp, err := http.Post(kickURL, "application/json", nil) if err != nil { // log and continue continue } kresp.Body.Close() } } return nil } func BodyLogger() gin.HandlerFunc { return func(c *gin.Context) { if c.Request.Method == "POST" && strings.Contains(c.Request.URL.Path, "/mediamtx/auth") { body, _ := c.GetRawData() fmt.Fprintf(gin.DefaultWriter, "[MTX-AUTH] %s\n", string(body)) c.Request.Body = io.NopCloser(bytes.NewBuffer(body)) } c.Next() } } // --- poll MediaMTX API until path is live ------------------------------------ func (h *MediaMTXHandler) WaitUntilLive(path string, timeout time.Duration) bool { api := strings.TrimRight(h.cfg.APIBase, "/") deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { resp, err := http.Get(api + "/v3/paths/list") if err == nil && resp.StatusCode == 200 { var pl pathsListRes _ = json.NewDecoder(resp.Body).Decode(&pl) resp.Body.Close() for _, it := range pl.Items { if it.Name == path { return true } } } else if resp != nil { resp.Body.Close() } time.Sleep(500 * time.Millisecond) } return false } func (h *MediaMTXHandler) expectedStartWait(guid string) time.Duration { var cfg models.DeviceConfig if err := h.db.Where("device_guid = ?", guid).First(&cfg).Error; err != nil { return 60 * time.Second // fallback if no config row } poll := cfg.MPolling jit := cfg.MJitter if poll <= 0 { poll = 60 } if jit < 0 { jit = 10 } safety := 5 // seconds return time.Duration(poll+jit+safety) * time.Second } // GET /streams/:guid/wait func (h *MediaMTXHandler) WaitLiveSSE(c *gin.Context) { guid := c.Param("guid") if guid == "" { c.Status(http.StatusBadRequest) return } path := "live/" + guid // Per-device max wait = MPolling + MJitter + safety timeout := h.expectedStartWait(guid) c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") flush := func() { if f, ok := c.Writer.(http.Flusher); ok { f.Flush() } } // If already live, notify immediately and exit. if h.IsLive(path) { fmt.Fprintf(c.Writer, "event: live\ndata: %s\n\n", path) flush() return } // Subscribe to bus for publish-auth events on this path ch := h.bus.Subscribe(path) defer h.bus.Unsubscribe(path, ch) // Background short poller (safety net in case bus event is missed) ctx, cancel := context.WithTimeout(c.Request.Context(), timeout) defer cancel() pollDone := make(chan struct{}) go func() { t := time.NewTicker(500 * time.Millisecond) defer t.Stop() defer close(pollDone) for { select { case <-ctx.Done(): return case <-t.C: if h.IsLive(path) { // Normalize through bus so waiter below handles it uniformly h.bus.Publish(path) return } } } }() // Wait for either: bus event, timeout, or client disconnect. select { case <-ch: fmt.Fprintf(c.Writer, "event: live\ndata: %s\n\n", path) flush() return case <-ctx.Done(): // Optional: tell client we timed out so it can keep a gentle retry loop fmt.Fprintf(c.Writer, "event: timeout\ndata: {\"path\":\"%s\"}\n\n", path) flush() return case <-c.Request.Context().Done(): return } } // --- helpers: path/guid ------------------------------------------------------ func guidFromPath(path string) (string, bool) { parts := strings.SplitN(path, "/", 2) if len(parts) != 2 || parts[0] != "live" || parts[1] == "" { return "", false } return parts[1], true } // IsLive performs a single check against MTX API to see if the path exists now. func (h *MediaMTXHandler) IsLive(path string) bool { api := strings.TrimRight(h.cfg.APIBase, "/") resp, err := http.Get(api + "/v3/paths/list") if err != nil { return false } defer resp.Body.Close() if resp.StatusCode != 200 { return false } var pl pathsListRes if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil { return false } for _, it := range pl.Items { if it.Name == path { return true } } return false }