created medaimtx handlers for audio streams commands, server returns payloads that need some adjustments
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"smoop-api/internal/dto"
|
"smoop-api/internal/dto"
|
||||||
"smoop-api/internal/models"
|
"smoop-api/internal/models"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
"gorm.io/gorm"
|
"gorm.io/gorm"
|
||||||
@@ -44,44 +45,6 @@ func (h *MediaMTXHandler) Auth(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse & validate media token
|
|
||||||
// parsed, err := h.jwtMgr.Parse(tok)
|
|
||||||
// if err != nil || !parsed.Valid {
|
|
||||||
// c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// var mc dto.MediaClaims
|
|
||||||
// if err := jwt.MapClaims(parsed.Claims.(jwt.MapClaims)).Decode(&mc); err != nil {
|
|
||||||
// c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid claims"})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // enforce act/path
|
|
||||||
// if mc.Act != req.Action {
|
|
||||||
// c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// if mc.Path != req.Path {
|
|
||||||
// c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
|
|
||||||
// // Optional: permission checks by role/device
|
|
||||||
// // READ: admins can read anything; users only devices assigned
|
|
||||||
// // PUBLISH: allow devices (sub=0 or special) or admins
|
|
||||||
// switch req.Action {
|
|
||||||
// case "read":
|
|
||||||
// if !h.canRead(mc.Subject, req.Path) {
|
|
||||||
// c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// case "publish":
|
|
||||||
// if !h.canPublish(mc.Subject, req.Path) {
|
|
||||||
// c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"})
|
|
||||||
// return
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// parse & validate media token
|
// parse & validate media token
|
||||||
mc, err := h.jwtMgr.ParseMedia(tok)
|
mc, err := h.jwtMgr.ParseMedia(tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -254,3 +217,69 @@ func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) {
|
|||||||
}
|
}
|
||||||
c.Status(http.StatusNoContent)
|
c.Status(http.StatusNoContent)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *MediaMTXHandler) StartStreamPayload(guid string) (string, error) {
|
||||||
|
path := "live/" + guid
|
||||||
|
ttl := time.Duration(h.cfg.TokenTTL) * time.Second
|
||||||
|
|
||||||
|
tok, err := h.jwtMgr.GenerateMediaToken(0, "publish", path, ttl) // sub=0 = device
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
whip := fmt.Sprintf("%s/whip/%s?token=%s",
|
||||||
|
strings.TrimRight(h.cfg.WebRTCBaseURL, "/"),
|
||||||
|
path,
|
||||||
|
url.QueryEscape(tok),
|
||||||
|
)
|
||||||
|
|
||||||
|
payload := map[string]any{
|
||||||
|
"whipUrl": whip,
|
||||||
|
"path": path,
|
||||||
|
"tokenTTL_s": h.cfg.TokenTTL,
|
||||||
|
}
|
||||||
|
b, _ := json.Marshal(payload)
|
||||||
|
return string(b), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type webrtcSession struct {
|
||||||
|
ID string `json:"id"`
|
||||||
|
Path string `json:"path"`
|
||||||
|
}
|
||||||
|
type webrtcListRes struct {
|
||||||
|
Items []webrtcSession `json:"items"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// KickWebRTCSessionsByPath lists and kicks webrtc sessions for a given path.
|
||||||
|
func (h *MediaMTXHandler) KickWebRTCSessionsByPath(path string) error {
|
||||||
|
listURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/list"
|
||||||
|
|
||||||
|
resp, err := http.Get(listURL)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get list: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return fmt.Errorf("mtx list failed: %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
var l webrtcListRes
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&l); err != nil {
|
||||||
|
return fmt.Errorf("decode error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, it := range l.Items {
|
||||||
|
if it.Path == path && it.ID != "" {
|
||||||
|
kickURL := strings.TrimRight(h.cfg.APIBase, "/") +
|
||||||
|
"/v3/webrtcsessions/kick/" + url.PathEscape(it.ID)
|
||||||
|
|
||||||
|
kresp, err := http.Post(kickURL, "application/json", nil)
|
||||||
|
if err != nil {
|
||||||
|
// log and continue
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
kresp.Body.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/gin-gonic/gin"
|
"github.com/gin-gonic/gin"
|
||||||
@@ -14,10 +15,16 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TasksHandler struct {
|
type TasksHandler struct {
|
||||||
db *gorm.DB
|
db *gorm.DB
|
||||||
|
mtxH *MediaMTXHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTasksHandler(db *gorm.DB) *TasksHandler { return &TasksHandler{db: db} }
|
func NewTasksHandler(db *gorm.DB, mtxH *MediaMTXHandler) *TasksHandler {
|
||||||
|
return &TasksHandler{
|
||||||
|
db: db,
|
||||||
|
mtxH: mtxH,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// -----------------------------------------------------------------------------
|
// -----------------------------------------------------------------------------
|
||||||
// 1) Device heartbeat + fetch next task
|
// 1) Device heartbeat + fetch next task
|
||||||
@@ -151,6 +158,22 @@ func (h *TasksHandler) CreateTask(c *gin.Context) {
|
|||||||
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
switch req.Type {
|
||||||
|
case models.TaskTypeStartStream:
|
||||||
|
payload, err := h.mtxH.StartStreamPayload(guid)
|
||||||
|
if err != nil {
|
||||||
|
c.JSON(http.StatusBadGateway, gin.H{"error": "failed to build whip url"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
req.Payload = payload
|
||||||
|
|
||||||
|
case models.TaskTypeStopStream:
|
||||||
|
// best-effort server-side stop (kick publishers/readers on that path)
|
||||||
|
_ = h.mtxH.KickWebRTCSessionsByPath("live/" + guid)
|
||||||
|
if strings.TrimSpace(req.Payload) == "" {
|
||||||
|
req.Payload = `{"reason":"server_stop"}`
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
task := models.DEviceTask{
|
task := models.DEviceTask{
|
||||||
DeviceGUID: guid,
|
DeviceGUID: guid,
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
|
|||||||
/// --- GPS tracker handler
|
/// --- GPS tracker handler
|
||||||
trackersH := handlers.NewTrackersHandler(db)
|
trackersH := handlers.NewTrackersHandler(db)
|
||||||
|
|
||||||
tasksH := handlers.NewTasksHandler(db)
|
tasksH := handlers.NewTasksHandler(db, mediamtxH)
|
||||||
certsH := handlers.NewCertsHandler(db, &cfg.PkiIot, "720h")
|
certsH := handlers.NewCertsHandler(db, &cfg.PkiIot, "720h")
|
||||||
certsAdminH := handlers.NewCertsAdminHandler(db, &cfg.PkiIot)
|
certsAdminH := handlers.NewCertsAdminHandler(db, &cfg.PkiIot)
|
||||||
// --- Public auth
|
// --- Public auth
|
||||||
|
|||||||
Reference in New Issue
Block a user