Files
NewSmoop/server/internal/handlers/mediamtx.go

317 lines
8.5 KiB
Go

package handlers
import (
"encoding/json"
"fmt"
"net/http"
"net/url"
"smoop-api/internal/config"
"smoop-api/internal/crypto"
"smoop-api/internal/dto"
"smoop-api/internal/models"
"strings"
"time"
"github.com/gin-gonic/gin"
"gorm.io/gorm"
)
type MediaMTXHandler struct {
jwtMgr *crypto.JWTManager
db *gorm.DB
cfg config.MediaMTXConfig
}
func NewMediaMTXHandler(db *gorm.DB, jwt *crypto.JWTManager, c config.MediaMTXConfig) *MediaMTXHandler {
return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c}
}
// --- 3.1 External auth endpoint called by MediaMTX
// POST /mediamtx/auth
func (h *MediaMTXHandler) Auth(c *gin.Context) {
var req dto.MediaMTXAuthReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "bad auth body"})
return
}
// token can come from Authorization: Bearer or from query (?token=)
tok := extractBearer(c.GetHeader("Authorization"))
if tok == "" {
tok = tokenFromQuery(req.Query)
}
if tok == "" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
return
}
// parse & validate media token
mc, err := h.jwtMgr.ParseMedia(tok)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
return
}
// enforce act/path
if mc.Act != req.Action {
c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"})
return
}
if mc.Path != req.Path {
c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"})
return
}
sub := mc.Subject // from RegisteredClaims.Subject
switch req.Action {
case "read":
if !h.canRead(sub, req.Path) {
c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"})
return
}
case "publish":
if !h.canPublish(sub, req.Path) {
c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"})
return
}
}
// allowed
c.Status(http.StatusOK)
}
func extractBearer(h string) string {
if strings.HasPrefix(strings.ToLower(h), "bearer ") {
return strings.TrimSpace(h[7:])
}
return ""
}
func tokenFromQuery(raw string) string {
if raw == "" {
return ""
}
q, _ := url.ParseQuery(raw)
return q.Get("token")
}
// naive helpers: replace with real queries to users/devices tables
func (h *MediaMTXHandler) canRead(sub, path string) bool {
// path is "live/<guid>"
parts := strings.SplitN(path, "/", 2)
if len(parts) != 2 {
return false
}
guid := parts[1]
// Find the user; admins -> allow; else check user_devices join
var u models.User
if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin {
return true
}
// check assignment
var count int64
_ = h.db.Table("user_devices").
Where("user_id = ? AND device_guid = ?", sub, guid).
Count(&count).Error
return count > 0
}
func (h *MediaMTXHandler) canPublish(sub, path string) bool {
// For devices you may use sub=0 or map to a device row; here: allow admins only
var u models.User
if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin {
return true
}
return false
}
// --- 3.2 Mint publish token (device flow) -> returns WHIP URL
// POST /mediamtx/token/publish {guid}
func (h *MediaMTXHandler) MintPublish(c *gin.Context) {
user, ok := GetUserContext(c)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}
var req dto.PublishTokenReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"})
return
}
// Permission check (admin or assigned)
if user.Role != models.RoleAdmin {
var count int64
_ = h.db.Table("user_devices").
Where("user_id = ? AND device_guid = ?", user.ID, req.GUID).
Count(&count).Error
if count == 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"})
return
}
}
path := "live/" + req.GUID
// We mint a *read* token for the browser to consume HLS.
tok, err := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "token error"})
return
}
pub := strings.TrimRight(h.cfg.PublicBaseURL, "/")
hls := fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s",
pub,
path,
url.QueryEscape(tok),
)
c.JSON(http.StatusCreated, dto.PublishTokenResp{HLS: hls})
}
// --- 3.3 Mint read token (user flow) -> returns HLS + WHEP URLs
// POST /mediamtx/token/read {guid}
func (h *MediaMTXHandler) MintRead(c *gin.Context) {
user, ok := GetUserContext(c)
if !ok {
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
return
}
var req dto.ReadTokenReq
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"})
return
}
path := "live/" + req.GUID
// check permission before minting
if user.Role != models.RoleAdmin {
var count int64
_ = h.db.Table("user_devices").
Where("user_id = ? AND device_guid = ?", user.ID, req.GUID).
Count(&count).Error
if count == 0 {
c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"})
return
}
}
tok, _ := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL)
pub := strings.TrimRight(h.cfg.PublicBaseURL, "/")
resp := dto.ReadTokenResp{
HLS: fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s", pub, path, url.QueryEscape(tok)),
WHEP: fmt.Sprintf("%s/webrtc/play/%s?token=%s", pub, path, url.QueryEscape(tok)),
}
c.JSON(http.StatusCreated, resp)
}
// --- 3.4 Admin "controls" using MediaMTX Control API (v3)
type pathsListRes struct {
Items []struct {
Name string `json:"name"`
} `json:"items"`
}
func (h *MediaMTXHandler) ListPaths(c *gin.Context) {
// GET {apiBase}/v3/paths/list
resp, err := http.Get(strings.TrimRight(h.cfg.APIBase, "/") + "/v3/paths/list")
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"})
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
c.JSON(resp.StatusCode, gin.H{"error": "mtx api error"})
return
}
var pl pathsListRes
if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": "decode error"})
return
}
c.JSON(200, pl)
}
func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) {
// POST {apiBase}/v3/webrtcsessions/kick/{id}
id := c.Param("id")
reqURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/kick/" + url.PathEscape(id)
httpResp, err := http.Post(reqURL, "application/json", http.NoBody)
if err != nil {
c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"})
return
}
defer httpResp.Body.Close()
if httpResp.StatusCode/100 != 2 {
c.JSON(httpResp.StatusCode, gin.H{"error": "kick failed"})
return
}
c.Status(http.StatusNoContent)
}
func (h *MediaMTXHandler) StartStreamPayload(guid string) (string, error) {
path := "live/" + guid
ttl := time.Duration(h.cfg.TokenTTL) * time.Second
tok, err := h.jwtMgr.GenerateMediaToken(0, "publish", path, ttl) // sub=0 = device
if err != nil {
return "", err
}
whip := fmt.Sprintf("%s/whip/%s?token=%s",
strings.TrimRight(h.cfg.PublicBaseURL, "/"),
path,
url.QueryEscape(tok),
)
payload := map[string]any{
"whipUrl": whip,
"path": path,
"tokenTTL_s": h.cfg.TokenTTL,
}
b, _ := json.Marshal(payload)
return string(b), nil
}
type webrtcSession struct {
ID string `json:"id"`
Path string `json:"path"`
}
type webrtcListRes struct {
Items []webrtcSession `json:"items"`
}
// KickWebRTCSessionsByPath lists and kicks webrtc sessions for a given path.
func (h *MediaMTXHandler) KickWebRTCSessionsByPath(path string) error {
listURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/list"
resp, err := http.Get(listURL)
if err != nil {
return fmt.Errorf("failed to get list: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("mtx list failed: %s", resp.Status)
}
var l webrtcListRes
if err := json.NewDecoder(resp.Body).Decode(&l); err != nil {
return fmt.Errorf("decode error: %w", err)
}
for _, it := range l.Items {
if it.Path == path && it.ID != "" {
kickURL := strings.TrimRight(h.cfg.APIBase, "/") +
"/v3/webrtcsessions/kick/" + url.PathEscape(it.ID)
kresp, err := http.Post(kickURL, "application/json", nil)
if err != nil {
// log and continue
continue
}
kresp.Body.Close()
}
}
return nil
}