diff --git a/server/internal/handlers/mediamtx.go b/server/internal/handlers/mediamtx.go index d0b2b70..e751c9b 100644 --- a/server/internal/handlers/mediamtx.go +++ b/server/internal/handlers/mediamtx.go @@ -10,6 +10,7 @@ import ( "smoop-api/internal/dto" "smoop-api/internal/models" "strings" + "time" "github.com/gin-gonic/gin" "gorm.io/gorm" @@ -44,44 +45,6 @@ func (h *MediaMTXHandler) Auth(c *gin.Context) { return } - // parse & validate media token - // parsed, err := h.jwtMgr.Parse(tok) - // if err != nil || !parsed.Valid { - // c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) - // return - // } - // var mc dto.MediaClaims - // if err := jwt.MapClaims(parsed.Claims.(jwt.MapClaims)).Decode(&mc); err != nil { - // c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid claims"}) - // return - // } - - // // enforce act/path - // if mc.Act != req.Action { - // c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"}) - // return - // } - // if mc.Path != req.Path { - // c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"}) - // return - // } - - // // Optional: permission checks by role/device - // // READ: admins can read anything; users only devices assigned - // // PUBLISH: allow devices (sub=0 or special) or admins - // switch req.Action { - // case "read": - // if !h.canRead(mc.Subject, req.Path) { - // c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"}) - // return - // } - // case "publish": - // if !h.canPublish(mc.Subject, req.Path) { - // c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"}) - // return - // } - // } - // parse & validate media token mc, err := h.jwtMgr.ParseMedia(tok) if err != nil { @@ -254,3 +217,69 @@ func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) { } c.Status(http.StatusNoContent) } + +func (h *MediaMTXHandler) StartStreamPayload(guid string) (string, error) { + path := "live/" + guid + ttl := time.Duration(h.cfg.TokenTTL) * time.Second + + tok, err := h.jwtMgr.GenerateMediaToken(0, "publish", path, ttl) // sub=0 = device + if err != nil { + return "", err + } + whip := fmt.Sprintf("%s/whip/%s?token=%s", + strings.TrimRight(h.cfg.WebRTCBaseURL, "/"), + path, + url.QueryEscape(tok), + ) + + payload := map[string]any{ + "whipUrl": whip, + "path": path, + "tokenTTL_s": h.cfg.TokenTTL, + } + b, _ := json.Marshal(payload) + return string(b), nil +} + +type webrtcSession struct { + ID string `json:"id"` + Path string `json:"path"` +} +type webrtcListRes struct { + Items []webrtcSession `json:"items"` +} + +// KickWebRTCSessionsByPath lists and kicks webrtc sessions for a given path. +func (h *MediaMTXHandler) KickWebRTCSessionsByPath(path string) error { + listURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/list" + + resp, err := http.Get(listURL) + if err != nil { + return fmt.Errorf("failed to get list: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("mtx list failed: %s", resp.Status) + } + + var l webrtcListRes + if err := json.NewDecoder(resp.Body).Decode(&l); err != nil { + return fmt.Errorf("decode error: %w", err) + } + + for _, it := range l.Items { + if it.Path == path && it.ID != "" { + kickURL := strings.TrimRight(h.cfg.APIBase, "/") + + "/v3/webrtcsessions/kick/" + url.PathEscape(it.ID) + + kresp, err := http.Post(kickURL, "application/json", nil) + if err != nil { + // log and continue + continue + } + kresp.Body.Close() + } + } + return nil +} diff --git a/server/internal/handlers/tasks.go b/server/internal/handlers/tasks.go index f83f0e5..18c587d 100644 --- a/server/internal/handlers/tasks.go +++ b/server/internal/handlers/tasks.go @@ -4,6 +4,7 @@ import ( "errors" "net/http" "strconv" + "strings" "time" "github.com/gin-gonic/gin" @@ -14,10 +15,16 @@ import ( ) type TasksHandler struct { - db *gorm.DB + db *gorm.DB + mtxH *MediaMTXHandler } -func NewTasksHandler(db *gorm.DB) *TasksHandler { return &TasksHandler{db: db} } +func NewTasksHandler(db *gorm.DB, mtxH *MediaMTXHandler) *TasksHandler { + return &TasksHandler{ + db: db, + mtxH: mtxH, + } +} // ----------------------------------------------------------------------------- // 1) Device heartbeat + fetch next task @@ -151,6 +158,22 @@ func (h *TasksHandler) CreateTask(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } + switch req.Type { + case models.TaskTypeStartStream: + payload, err := h.mtxH.StartStreamPayload(guid) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": "failed to build whip url"}) + return + } + req.Payload = payload + + case models.TaskTypeStopStream: + // best-effort server-side stop (kick publishers/readers on that path) + _ = h.mtxH.KickWebRTCSessionsByPath("live/" + guid) + if strings.TrimSpace(req.Payload) == "" { + req.Payload = `{"reason":"server_stop"}` + } + } task := models.DEviceTask{ DeviceGUID: guid, diff --git a/server/internal/router/router.go b/server/internal/router/router.go index c9bfce0..bdd16f9 100644 --- a/server/internal/router/router.go +++ b/server/internal/router/router.go @@ -32,7 +32,7 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine { /// --- GPS tracker handler trackersH := handlers.NewTrackersHandler(db) - tasksH := handlers.NewTasksHandler(db) + tasksH := handlers.NewTasksHandler(db, mediamtxH) certsH := handlers.NewCertsHandler(db, &cfg.PkiIot, "720h") certsAdminH := handlers.NewCertsAdminHandler(db, &cfg.PkiIot) // --- Public auth