diff --git a/mediamtx/mediamtx.yml b/mediamtx/mediamtx.yml index bf3168d..5691373 100644 --- a/mediamtx/mediamtx.yml +++ b/mediamtx/mediamtx.yml @@ -27,6 +27,10 @@ srtAddress: :8890 authMethod: http authHTTPAddress: http://snoop-api:8080/mediamtx/auth +authHTTPExclude: + - action: api + - action: metrics + - action: pprof # Recording (optional) pathDefaults: @@ -57,6 +61,15 @@ pathDefaults: \"dstFs\":\"minio:livestream\", \"dstRemote\":\"$MTX_PATH/$f\"}"' +authInternalUsers: + - user: any + pass: + ips: ['127.0.0.1','::1'] + permissions: + - action: api + - action: metrics + - action: pprof + # Allow all paths by default paths: all: diff --git a/server/internal/config/config.go b/server/internal/config/config.go index fa0be6c..15cf27a 100644 --- a/server/internal/config/config.go +++ b/server/internal/config/config.go @@ -10,6 +10,13 @@ import ( "smoop-api/internal/vault" ) +type MediaMTXConfig struct { + APIBase string // e.g. "http://mediamtx:9997" + WebRTCBaseURL string // e.g. "http://mediamtx:8889" + PublicBaseURL string // e.g. "https://your-host" (for HLS/WHEP URLs returned to SPA) + TokenTTL time.Duration // default ~180s +} + type Config struct { DB struct { DSN string @@ -23,6 +30,7 @@ type Config struct { LivestreamBucket string PresignTTL time.Duration } + MediaMTX MediaMTXConfig JWTSecret []byte } @@ -60,6 +68,12 @@ func Load() (*Config, error) { } return v, nil } + // getStrOpt := func(k, def string) string { + // if v, ok := raw[k].(string); ok && v != "" { + // return v + // } + // return def + // } getBool := func(k string) (bool, error) { v, ok := raw[k] if !ok { @@ -77,6 +91,32 @@ func Load() (*Config, error) { return false, fmt.Errorf("invalid bool for key %s", k) } } + getTTL := func(k string, def time.Duration) time.Duration { + if v, ok := raw[k].(string); ok && strings.TrimSpace(v) != "" { + if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil && n > 0 { + return time.Duration(n) * time.Second + } + } + return def + } + + // --- NEW: MediaMTX config FROM ENV (NOT from Vault) + getRequiredEnv := func(k string) (string, error) { + v := strings.TrimSpace(os.Getenv(k)) + if v == "" { + return "", fmt.Errorf("missing required env %s", k) + } + return v, nil + } + + getIntEnv := func(k string, def int) int { + if v := strings.TrimSpace(os.Getenv(k)); v != "" { + if n, err := strconv.Atoi(v); err == nil { + return n + } + } + return def + } dbDSN, err := getStr("db_dsn") if err != nil { @@ -111,14 +151,29 @@ func Load() (*Config, error) { if v, ok := raw["minio_livestream_bucket"].(string); ok && v != "" { liveBucket = v } - presignTTL := 15 * time.Minute - if v, ok := raw["minio_presign_ttl_seconds"].(string); ok && v != "" { - var sec int - fmt.Sscanf(v, "%d", &sec) - if sec > 0 { - presignTTL = time.Duration(sec) * time.Second - } + // presignTTL := 15 * time.Minute + // if v, ok := raw["minio_presign_ttl_seconds"].(string); ok && v != "" { + // var sec int + // fmt.Sscanf(v, "%d", &sec) + // if sec > 0 { + // presignTTL = time.Duration(sec) * time.Second + // } + // } + presignTTL := getTTL("minio_presign_ttl_seconds", 15*time.Minute) + + apiBase, err := getRequiredEnv("MEDIAMTX_API_BASE") + if err != nil { + return nil, err } + webrtcBase, err := getRequiredEnv("MEDIAMTX_WEBRTC_BASE_URL") + if err != nil { + return nil, err + } + publicBase, err := getRequiredEnv("PUBLIC_BASE_URL") + if err != nil { + return nil, err + } + tokenTTL := getIntEnv("MEDIAMTX_TOKEN_TTL_SECONDS", 180) cfg := &Config{} cfg.DB.DSN = dbDSN @@ -130,6 +185,14 @@ func Load() (*Config, error) { cfg.MinIO.LivestreamBucket = liveBucket cfg.MinIO.PresignTTL = presignTTL cfg.JWTSecret = []byte(jwt) + + cfg.MediaMTX = MediaMTXConfig{ + APIBase: apiBase, + WebRTCBaseURL: webrtcBase, + PublicBaseURL: publicBase, + TokenTTL: time.Duration(tokenTTL), + } + return cfg, nil } @@ -191,6 +254,21 @@ func LoadDev() (*Config, error) { } presignTTL := time.Duration(getIntEnv("MINIO_PRESIGN_TTL_SECONDS", 900)) * time.Second + // NEW: MediaMTX envs + apiBase, err := getRequired("MEDIAMTX_API_BASE") + if err != nil { + return nil, err + } + webrtcBase, err := getRequired("MEDIAMTX_WEBRTC_BASE_URL") + if err != nil { + return nil, err + } + publicBase, err := getRequired("PUBLIC_BASE_URL") + if err != nil { + return nil, err + } + tokenTTL := getIntEnv("MEDIAMTX_TOKEN_TTL_SECONDS", 180) + cfg := &Config{} cfg.DB.DSN = dbDSN cfg.MinIO.Endpoint = endpoint @@ -201,5 +279,12 @@ func LoadDev() (*Config, error) { cfg.MinIO.LivestreamBucket = liveBucket cfg.MinIO.PresignTTL = presignTTL cfg.JWTSecret = []byte(jwt) + + cfg.MediaMTX = MediaMTXConfig{ + APIBase: apiBase, + WebRTCBaseURL: webrtcBase, + PublicBaseURL: publicBase, + TokenTTL: time.Duration(tokenTTL), + } return cfg, nil } diff --git a/server/internal/crypto/jwt.go b/server/internal/crypto/jwt.go index 04e822b..949187f 100644 --- a/server/internal/crypto/jwt.go +++ b/server/internal/crypto/jwt.go @@ -1,8 +1,11 @@ package crypto import ( + "fmt" "time" + "smoop-api/internal/dto" + "github.com/golang-jwt/jwt/v5" ) @@ -28,3 +31,33 @@ func (j *JWTManager) Generate(userID uint, username, role string) (string, error func (j *JWTManager) Parse(tok string) (*jwt.Token, error) { return jwt.Parse(tok, func(t *jwt.Token) (interface{}, error) { return j.secret, nil }) } + +func (j *JWTManager) GenerateMediaToken(sub uint, act, path string, ttl time.Duration) (string, error) { + now := time.Now() + claims := dto.MediaClaims{ + Act: act, + Path: path, + RegisteredClaims: jwt.RegisteredClaims{ + Subject: fmt.Sprintf("%d", sub), + IssuedAt: jwt.NewNumericDate(now), + ExpiresAt: jwt.NewNumericDate(now.Add(ttl)), + }, + } + t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) + return t.SignedString([]byte(j.secret)) +} + +// NEW: parse a media token into typed claims +func (j *JWTManager) ParseMedia(tokenStr string) (*dto.MediaClaims, error) { + tok, err := jwt.ParseWithClaims(tokenStr, &dto.MediaClaims{}, func(t *jwt.Token) (interface{}, error) { + return []byte(j.secret), nil + }) + if err != nil { + return nil, err + } + mc, ok := tok.Claims.(*dto.MediaClaims) + if !ok || !tok.Valid { + return nil, fmt.Errorf("invalid media token") + } + return mc, nil +} diff --git a/server/internal/dto/mediamtx.go b/server/internal/dto/mediamtx.go new file mode 100644 index 0000000..f322c87 --- /dev/null +++ b/server/internal/dto/mediamtx.go @@ -0,0 +1,45 @@ +package dto + +import "github.com/golang-jwt/jwt/v5" + +type MediaClaims struct { + Act string `json:"act"` // "publish" or "read" + Path string `json:"path"` // e.g. "live/" + jwt.RegisteredClaims +} + +// MediaMTX external-auth POST body (exact keys per mediamtx.yml sample) +type MediaMTXAuthReq struct { + User string `json:"user"` // optional + Password string `json:"password"` // optional + Token string `json:"token"` // from Authorization: Bearer or query (?token=) + IP string `json:"ip"` + Action string `json:"action"` // publish|read|playback|api|metrics|pprof + Path string `json:"path"` // e.g. "live/" + Protocol string `json:"protocol"` // rtsp|rtmp|hls|webrtc|srt + ID string `json:"id"` // session id + Query string `json:"query"` // raw query string +} + +type MediaMTXAuthResp struct { + // empty 200 means allowed; add fields if you want to return JSON + // to mediamtx (not required) +} + +// Token minting +type PublishTokenReq struct { + GUID string `json:"guid" binding:"required,uuid4"` +} + +type PublishTokenResp struct { + WHIP string `json:"whipUrl"` // http://mediamtx:8889/whip/live/?token=... +} + +type ReadTokenReq struct { + GUID string `json:"guid" binding:"required,uuid4"` +} + +type ReadTokenResp struct { + HLS string `json:"hlsUrl"` // http:///hls/live//index.m3u8?token=... + WHEP string `json:"whepUrl"` // http:///webrtc/play/live/?token=... +} diff --git a/server/internal/handlers/mediamtx.go b/server/internal/handlers/mediamtx.go new file mode 100644 index 0000000..d0b2b70 --- /dev/null +++ b/server/internal/handlers/mediamtx.go @@ -0,0 +1,256 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "net/http" + "net/url" + "smoop-api/internal/config" + "smoop-api/internal/crypto" + "smoop-api/internal/dto" + "smoop-api/internal/models" + "strings" + + "github.com/gin-gonic/gin" + "gorm.io/gorm" +) + +type MediaMTXHandler struct { + jwtMgr *crypto.JWTManager + db *gorm.DB + cfg config.MediaMTXConfig +} + +func NewMediaMTXHandler(db *gorm.DB, jwt *crypto.JWTManager, c config.MediaMTXConfig) *MediaMTXHandler { + return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c} +} + +// --- 3.1 External auth endpoint called by MediaMTX +// POST /mediamtx/auth +func (h *MediaMTXHandler) Auth(c *gin.Context) { + var req dto.MediaMTXAuthReq + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "bad auth body"}) + return + } + + // token can come from Authorization: Bearer or from query (?token=) + tok := extractBearer(c.GetHeader("Authorization")) + if tok == "" { + tok = tokenFromQuery(req.Query) + } + if tok == "" { + c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"}) + return + } + + // parse & validate media token + // parsed, err := h.jwtMgr.Parse(tok) + // if err != nil || !parsed.Valid { + // c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) + // return + // } + // var mc dto.MediaClaims + // if err := jwt.MapClaims(parsed.Claims.(jwt.MapClaims)).Decode(&mc); err != nil { + // c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid claims"}) + // return + // } + + // // enforce act/path + // if mc.Act != req.Action { + // c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"}) + // return + // } + // if mc.Path != req.Path { + // c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"}) + // return + // } + + // // Optional: permission checks by role/device + // // READ: admins can read anything; users only devices assigned + // // PUBLISH: allow devices (sub=0 or special) or admins + // switch req.Action { + // case "read": + // if !h.canRead(mc.Subject, req.Path) { + // c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"}) + // return + // } + // case "publish": + // if !h.canPublish(mc.Subject, req.Path) { + // c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"}) + // return + // } + // } + + // parse & validate media token + mc, err := h.jwtMgr.ParseMedia(tok) + if err != nil { + c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"}) + return + } + + // enforce act/path + if mc.Act != req.Action { + c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"}) + return + } + if mc.Path != req.Path { + c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"}) + return + } + + sub := mc.Subject // from RegisteredClaims.Subject + switch req.Action { + case "read": + if !h.canRead(sub, req.Path) { + c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"}) + return + } + case "publish": + if !h.canPublish(sub, req.Path) { + c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"}) + return + } + } + // allowed + c.Status(http.StatusOK) +} + +func extractBearer(h string) string { + if strings.HasPrefix(strings.ToLower(h), "bearer ") { + return strings.TrimSpace(h[7:]) + } + return "" +} +func tokenFromQuery(raw string) string { + if raw == "" { + return "" + } + q, _ := url.ParseQuery(raw) + return q.Get("token") +} + +// naive helpers: replace with real queries to users/devices tables +func (h *MediaMTXHandler) canRead(sub, path string) bool { + // path is "live/" + parts := strings.SplitN(path, "/", 2) + if len(parts) != 2 { + return false + } + guid := parts[1] + + // Find the user; admins -> allow; else check user_devices join + var u models.User + if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin { + return true + } + // check assignment + var count int64 + _ = h.db.Table("user_devices"). + Where("user_id = ? AND device_guid = ?", sub, guid). + Count(&count).Error + return count > 0 +} +func (h *MediaMTXHandler) canPublish(sub, path string) bool { + // For devices you may use sub=0 or map to a device row; here: allow admins only + var u models.User + if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin { + return true + } + return false +} + +// --- 3.2 Mint publish token (device flow) -> returns WHIP URL +// POST /mediamtx/token/publish {guid} +func (h *MediaMTXHandler) MintPublish(c *gin.Context) { + var req dto.PublishTokenReq + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"}) + return + } + path := "live/" + req.GUID + tok, _ := h.jwtMgr.GenerateMediaToken(0, "publish", path, h.cfg.TokenTTL) // sub=0 (device) + whip := fmt.Sprintf("%s/whip/%s?token=%s", strings.TrimRight(h.cfg.WebRTCBaseURL, "/"), path, url.QueryEscape(tok)) + c.JSON(http.StatusCreated, dto.PublishTokenResp{WHIP: whip}) +} + +// --- 3.3 Mint read token (user flow) -> returns HLS + WHEP URLs +// POST /mediamtx/token/read {guid} +func (h *MediaMTXHandler) MintRead(c *gin.Context) { + user, ok := GetUserContext(c) + if !ok { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return + } + var req dto.ReadTokenReq + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"}) + return + } + path := "live/" + req.GUID + + // check permission before minting + if user.Role != models.RoleAdmin { + var count int64 + _ = h.db.Table("user_devices"). + Where("user_id = ? AND device_guid = ?", user.ID, req.GUID). + Count(&count).Error + if count == 0 { + c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"}) + return + } + } + + tok, _ := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL) + + pub := strings.TrimRight(h.cfg.PublicBaseURL, "/") + resp := dto.ReadTokenResp{ + HLS: fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s", pub, path, url.QueryEscape(tok)), + WHEP: fmt.Sprintf("%s/webrtc/play/%s?token=%s", pub, path, url.QueryEscape(tok)), + } + c.JSON(http.StatusCreated, resp) +} + +// --- 3.4 Admin "controls" using MediaMTX Control API (v3) +type pathsListRes struct { + Items []struct { + Name string `json:"name"` + } `json:"items"` +} + +func (h *MediaMTXHandler) ListPaths(c *gin.Context) { + // GET {apiBase}/v3/paths/list + resp, err := http.Get(strings.TrimRight(h.cfg.APIBase, "/") + "/v3/paths/list") + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"}) + return + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + c.JSON(resp.StatusCode, gin.H{"error": "mtx api error"}) + return + } + var pl pathsListRes + if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": "decode error"}) + return + } + c.JSON(200, pl) +} + +func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) { + // POST {apiBase}/v3/webrtcsessions/kick/{id} + id := c.Param("id") + reqURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/kick/" + url.PathEscape(id) + httpResp, err := http.Post(reqURL, "application/json", http.NoBody) + if err != nil { + c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"}) + return + } + defer httpResp.Body.Close() + if httpResp.StatusCode/100 != 2 { + c.JSON(httpResp.StatusCode, gin.H{"error": "kick failed"}) + return + } + c.Status(http.StatusNoContent) +} diff --git a/server/internal/router/router.go b/server/internal/router/router.go index 3bb05ac..f0ab723 100644 --- a/server/internal/router/router.go +++ b/server/internal/router/router.go @@ -26,6 +26,9 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine { recH := handlers.NewRecordsHandler(db, minio, cfg.MinIO.RecordsBucket, cfg.MinIO.PresignTTL) liveH := handlers.NewLivestreamHandler(minio, cfg.MinIO.LivestreamBucket) + // --- MediaMTX handler + mediamtxH := handlers.NewMediaMTXHandler(db, jwtMgr, cfg.MediaMTX) + // --- Public auth r.POST("/auth/signup", authH.SignUp) r.POST("/auth/signin", authH.SignIn) @@ -60,6 +63,16 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine { // health r.GET("/healthz", func(c *gin.Context) { c.String(http.StatusOK, "ok") }) + // --- NEW: MediaMTX integration routes + // External auth (called by MediaMTX) + r.POST("/mediamtx/auth", mediamtxH.Auth) + // Token minting for device/user flows + r.POST("/mediamtx/token/publish", mediamtxH.MintPublish) + r.POST("/mediamtx/token/read", authMW, mediamtxH.MintRead) + // Admin controls + r.GET("/mediamtx/paths", authMW, adminOnly, mediamtxH.ListPaths) + r.POST("/mediamtx/webrtc/kick/:id", authMW, adminOnly, mediamtxH.KickWebRTC) + // sensible defaults r.MaxMultipartMemory = 64 << 20 // 64 MiB _ = time.Now() // appease linters