added routes and auth for devices and users for MediaMTX server

This commit is contained in:
tdv
2025-09-25 18:47:25 +03:00
parent 4c4d254852
commit e0490e42c5
6 changed files with 452 additions and 7 deletions

View File

@@ -10,6 +10,13 @@ import (
"smoop-api/internal/vault"
)
type MediaMTXConfig struct {
APIBase string // e.g. "http://mediamtx:9997"
WebRTCBaseURL string // e.g. "http://mediamtx:8889"
PublicBaseURL string // e.g. "https://your-host" (for HLS/WHEP URLs returned to SPA)
TokenTTL time.Duration // default ~180s
}
type Config struct {
DB struct {
DSN string
@@ -23,6 +30,7 @@ type Config struct {
LivestreamBucket string
PresignTTL time.Duration
}
MediaMTX MediaMTXConfig
JWTSecret []byte
}
@@ -60,6 +68,12 @@ func Load() (*Config, error) {
}
return v, nil
}
// getStrOpt := func(k, def string) string {
// if v, ok := raw[k].(string); ok && v != "" {
// return v
// }
// return def
// }
getBool := func(k string) (bool, error) {
v, ok := raw[k]
if !ok {
@@ -77,6 +91,32 @@ func Load() (*Config, error) {
return false, fmt.Errorf("invalid bool for key %s", k)
}
}
getTTL := func(k string, def time.Duration) time.Duration {
if v, ok := raw[k].(string); ok && strings.TrimSpace(v) != "" {
if n, err := strconv.Atoi(strings.TrimSpace(v)); err == nil && n > 0 {
return time.Duration(n) * time.Second
}
}
return def
}
// --- NEW: MediaMTX config FROM ENV (NOT from Vault)
getRequiredEnv := func(k string) (string, error) {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return "", fmt.Errorf("missing required env %s", k)
}
return v, nil
}
getIntEnv := func(k string, def int) int {
if v := strings.TrimSpace(os.Getenv(k)); v != "" {
if n, err := strconv.Atoi(v); err == nil {
return n
}
}
return def
}
dbDSN, err := getStr("db_dsn")
if err != nil {
@@ -111,14 +151,29 @@ func Load() (*Config, error) {
if v, ok := raw["minio_livestream_bucket"].(string); ok && v != "" {
liveBucket = v
}
presignTTL := 15 * time.Minute
if v, ok := raw["minio_presign_ttl_seconds"].(string); ok && v != "" {
var sec int
fmt.Sscanf(v, "%d", &sec)
if sec > 0 {
presignTTL = time.Duration(sec) * time.Second
}
// presignTTL := 15 * time.Minute
// if v, ok := raw["minio_presign_ttl_seconds"].(string); ok && v != "" {
// var sec int
// fmt.Sscanf(v, "%d", &sec)
// if sec > 0 {
// presignTTL = time.Duration(sec) * time.Second
// }
// }
presignTTL := getTTL("minio_presign_ttl_seconds", 15*time.Minute)
apiBase, err := getRequiredEnv("MEDIAMTX_API_BASE")
if err != nil {
return nil, err
}
webrtcBase, err := getRequiredEnv("MEDIAMTX_WEBRTC_BASE_URL")
if err != nil {
return nil, err
}
publicBase, err := getRequiredEnv("PUBLIC_BASE_URL")
if err != nil {
return nil, err
}
tokenTTL := getIntEnv("MEDIAMTX_TOKEN_TTL_SECONDS", 180)
cfg := &Config{}
cfg.DB.DSN = dbDSN
@@ -130,6 +185,14 @@ func Load() (*Config, error) {
cfg.MinIO.LivestreamBucket = liveBucket
cfg.MinIO.PresignTTL = presignTTL
cfg.JWTSecret = []byte(jwt)
cfg.MediaMTX = MediaMTXConfig{
APIBase: apiBase,
WebRTCBaseURL: webrtcBase,
PublicBaseURL: publicBase,
TokenTTL: time.Duration(tokenTTL),
}
return cfg, nil
}
@@ -191,6 +254,21 @@ func LoadDev() (*Config, error) {
}
presignTTL := time.Duration(getIntEnv("MINIO_PRESIGN_TTL_SECONDS", 900)) * time.Second
// NEW: MediaMTX envs
apiBase, err := getRequired("MEDIAMTX_API_BASE")
if err != nil {
return nil, err
}
webrtcBase, err := getRequired("MEDIAMTX_WEBRTC_BASE_URL")
if err != nil {
return nil, err
}
publicBase, err := getRequired("PUBLIC_BASE_URL")
if err != nil {
return nil, err
}
tokenTTL := getIntEnv("MEDIAMTX_TOKEN_TTL_SECONDS", 180)
cfg := &Config{}
cfg.DB.DSN = dbDSN
cfg.MinIO.Endpoint = endpoint
@@ -201,5 +279,12 @@ func LoadDev() (*Config, error) {
cfg.MinIO.LivestreamBucket = liveBucket
cfg.MinIO.PresignTTL = presignTTL
cfg.JWTSecret = []byte(jwt)
cfg.MediaMTX = MediaMTXConfig{
APIBase: apiBase,
WebRTCBaseURL: webrtcBase,
PublicBaseURL: publicBase,
TokenTTL: time.Duration(tokenTTL),
}
return cfg, nil
}

View File

@@ -1,8 +1,11 @@
package crypto
import (
"fmt"
"time"
"smoop-api/internal/dto"
"github.com/golang-jwt/jwt/v5"
)
@@ -28,3 +31,33 @@ func (j *JWTManager) Generate(userID uint, username, role string) (string, error
func (j *JWTManager) Parse(tok string) (*jwt.Token, error) {
return jwt.Parse(tok, func(t *jwt.Token) (interface{}, error) { return j.secret, nil })
}
func (j *JWTManager) GenerateMediaToken(sub uint, act, path string, ttl time.Duration) (string, error) {
now := time.Now()
claims := dto.MediaClaims{
Act: act,
Path: path,
RegisteredClaims: jwt.RegisteredClaims{
Subject: fmt.Sprintf("%d", sub),
IssuedAt: jwt.NewNumericDate(now),
ExpiresAt: jwt.NewNumericDate(now.Add(ttl)),
},
}
t := jwt.NewWithClaims(jwt.SigningMethodHS256, claims)
return t.SignedString([]byte(j.secret))
}
// NEW: parse a media token into typed claims
func (j *JWTManager) ParseMedia(tokenStr string) (*dto.MediaClaims, error) {
tok, err := jwt.ParseWithClaims(tokenStr, &dto.MediaClaims{}, func(t *jwt.Token) (interface{}, error) {
return []byte(j.secret), nil
})
if err != nil {
return nil, err
}
mc, ok := tok.Claims.(*dto.MediaClaims)
if !ok || !tok.Valid {
return nil, fmt.Errorf("invalid media token")
}
return mc, nil
}

View File

@@ -0,0 +1,45 @@
package dto
import "github.com/golang-jwt/jwt/v5"
type MediaClaims struct {
Act string `json:"act"` // "publish" or "read"
Path string `json:"path"` // e.g. "live/<guid>"
jwt.RegisteredClaims
}
// MediaMTX external-auth POST body (exact keys per mediamtx.yml sample)
type MediaMTXAuthReq struct {
User string `json:"user"` // optional
Password string `json:"password"` // optional
Token string `json:"token"` // from Authorization: Bearer or query (?token=)
IP string `json:"ip"`
Action string `json:"action"` // publish|read|playback|api|metrics|pprof
Path string `json:"path"` // e.g. "live/<guid>"
Protocol string `json:"protocol"` // rtsp|rtmp|hls|webrtc|srt
ID string `json:"id"` // session id
Query string `json:"query"` // raw query string
}
type MediaMTXAuthResp struct {
// empty 200 means allowed; add fields if you want to return JSON
// to mediamtx (not required)
}
// Token minting
type PublishTokenReq struct {
GUID string `json:"guid" binding:"required,uuid4"`
}
type PublishTokenResp struct {
WHIP string `json:"whipUrl"` // http://mediamtx:8889/whip/live/<guid>?token=...
}
type ReadTokenReq struct {
GUID string `json:"guid" binding:"required,uuid4"`
}
type ReadTokenResp struct {
HLS string `json:"hlsUrl"` // http://<host>/hls/live/<guid>/index.m3u8?token=...
WHEP string `json:"whepUrl"` // http://<host>/webrtc/play/live/<guid>?token=...
}

View File

@@ -0,0 +1,256 @@
package handlers
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"smoop-api/internal/config"
"smoop-api/internal/crypto"
"smoop-api/internal/dto"
"smoop-api/internal/models"
"strings"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type MediaMTXHandler struct {
jwtMgr *crypto.JWTManager
db *gorm.DB
cfg config.MediaMTXConfig
}
func NewMediaMTXHandler(db *gorm.DB, jwt *crypto.JWTManager, c config.MediaMTXConfig) *MediaMTXHandler {
return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c}
}
// --- 3.1 External auth endpoint called by MediaMTX
// POST /mediamtx/auth
func (h *MediaMTXHandler) Auth(c *gin.Context) {
var req dto.MediaMTXAuthReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "bad auth body"})
return
}
// token can come from Authorization: Bearer or from query (?token=)
tok := extractBearer(c.GetHeader("Authorization"))
if tok == "" {
tok = tokenFromQuery(req.Query)
}
if tok == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
return
}
// parse & validate media token
// parsed, err := h.jwtMgr.Parse(tok)
// if err != nil || !parsed.Valid {
// c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
// return
// }
// var mc dto.MediaClaims
// if err := jwt.MapClaims(parsed.Claims.(jwt.MapClaims)).Decode(&mc); err != nil {
// c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid claims"})
// return
// }
// // enforce act/path
// if mc.Act != req.Action {
// c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"})
// return
// }
// if mc.Path != req.Path {
// c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"})
// return
// }
// // Optional: permission checks by role/device
// // READ: admins can read anything; users only devices assigned
// // PUBLISH: allow devices (sub=0 or special) or admins
// switch req.Action {
// case "read":
// if !h.canRead(mc.Subject, req.Path) {
// c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"})
// return
// }
// case "publish":
// if !h.canPublish(mc.Subject, req.Path) {
// c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"})
// return
// }
// }
// parse & validate media token
mc, err := h.jwtMgr.ParseMedia(tok)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
return
}
// enforce act/path
if mc.Act != req.Action {
c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"})
return
}
if mc.Path != req.Path {
c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"})
return
}
sub := mc.Subject // from RegisteredClaims.Subject
switch req.Action {
case "read":
if !h.canRead(sub, req.Path) {
c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"})
return
}
case "publish":
if !h.canPublish(sub, req.Path) {
c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"})
return
}
}
// allowed
c.Status(http.StatusOK)
}
func extractBearer(h string) string {
if strings.HasPrefix(strings.ToLower(h), "bearer ") {
return strings.TrimSpace(h[7:])
}
return ""
}
func tokenFromQuery(raw string) string {
if raw == "" {
return ""
}
q, _ := url.ParseQuery(raw)
return q.Get("token")
}
// naive helpers: replace with real queries to users/devices tables
func (h *MediaMTXHandler) canRead(sub, path string) bool {
// path is "live/<guid>"
parts := strings.SplitN(path, "/", 2)
if len(parts) != 2 {
return false
}
guid := parts[1]
// Find the user; admins -> allow; else check user_devices join
var u models.User
if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin {
return true
}
// check assignment
var count int64
_ = h.db.Table("user_devices").
Where("user_id = ? AND device_guid = ?", sub, guid).
Count(&count).Error
return count > 0
}
func (h *MediaMTXHandler) canPublish(sub, path string) bool {
// For devices you may use sub=0 or map to a device row; here: allow admins only
var u models.User
if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin {
return true
}
return false
}
// --- 3.2 Mint publish token (device flow) -> returns WHIP URL
// POST /mediamtx/token/publish {guid}
func (h *MediaMTXHandler) MintPublish(c *gin.Context) {
var req dto.PublishTokenReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"})
return
}
path := "live/" + req.GUID
tok, _ := h.jwtMgr.GenerateMediaToken(0, "publish", path, h.cfg.TokenTTL) // sub=0 (device)
whip := fmt.Sprintf("%s/whip/%s?token=%s", strings.TrimRight(h.cfg.WebRTCBaseURL, "/"), path, url.QueryEscape(tok))
c.JSON(http.StatusCreated, dto.PublishTokenResp{WHIP: whip})
}
// --- 3.3 Mint read token (user flow) -> returns HLS + WHEP URLs
// POST /mediamtx/token/read {guid}
func (h *MediaMTXHandler) MintRead(c *gin.Context) {
user, ok := GetUserContext(c)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}
var req dto.ReadTokenReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"})
return
}
path := "live/" + req.GUID
// check permission before minting
if user.Role != models.RoleAdmin {
var count int64
_ = h.db.Table("user_devices").
Where("user_id = ? AND device_guid = ?", user.ID, req.GUID).
Count(&count).Error
if count == 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"})
return
}
}
tok, _ := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL)
pub := strings.TrimRight(h.cfg.PublicBaseURL, "/")
resp := dto.ReadTokenResp{
HLS: fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s", pub, path, url.QueryEscape(tok)),
WHEP: fmt.Sprintf("%s/webrtc/play/%s?token=%s", pub, path, url.QueryEscape(tok)),
}
c.JSON(http.StatusCreated, resp)
}
// --- 3.4 Admin "controls" using MediaMTX Control API (v3)
type pathsListRes struct {
Items []struct {
Name string `json:"name"`
} `json:"items"`
}
func (h *MediaMTXHandler) ListPaths(c *gin.Context) {
// GET {apiBase}/v3/paths/list
resp, err := http.Get(strings.TrimRight(h.cfg.APIBase, "/") + "/v3/paths/list")
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"})
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
c.JSON(resp.StatusCode, gin.H{"error": "mtx api error"})
return
}
var pl pathsListRes
if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": "decode error"})
return
}
c.JSON(200, pl)
}
func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) {
// POST {apiBase}/v3/webrtcsessions/kick/{id}
id := c.Param("id")
reqURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/kick/" + url.PathEscape(id)
httpResp, err := http.Post(reqURL, "application/json", http.NoBody)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"})
return
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
c.JSON(httpResp.StatusCode, gin.H{"error": "kick failed"})
return
}
c.Status(http.StatusNoContent)
}

View File

@@ -26,6 +26,9 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
recH := handlers.NewRecordsHandler(db, minio, cfg.MinIO.RecordsBucket, cfg.MinIO.PresignTTL)
liveH := handlers.NewLivestreamHandler(minio, cfg.MinIO.LivestreamBucket)
// --- MediaMTX handler
mediamtxH := handlers.NewMediaMTXHandler(db, jwtMgr, cfg.MediaMTX)
// --- Public auth
r.POST("/auth/signup", authH.SignUp)
r.POST("/auth/signin", authH.SignIn)
@@ -60,6 +63,16 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
// health
r.GET("/healthz", func(c *gin.Context) { c.String(http.StatusOK, "ok") })
// --- NEW: MediaMTX integration routes
// External auth (called by MediaMTX)
r.POST("/mediamtx/auth", mediamtxH.Auth)
// Token minting for device/user flows
r.POST("/mediamtx/token/publish", mediamtxH.MintPublish)
r.POST("/mediamtx/token/read", authMW, mediamtxH.MintRead)
// Admin controls
r.GET("/mediamtx/paths", authMW, adminOnly, mediamtxH.ListPaths)
r.POST("/mediamtx/webrtc/kick/:id", authMW, adminOnly, mediamtxH.KickWebRTC)
// sensible defaults
r.MaxMultipartMemory = 64 << 20 // 64 MiB
_ = time.Now() // appease linters