505 lines
13 KiB
Go
505 lines
13 KiB
Go
package handlers
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"smoop-api/internal/config"
|
|
"smoop-api/internal/crypto"
|
|
"smoop-api/internal/dto"
|
|
"smoop-api/internal/models"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"gorm.io/gorm"
|
|
)
|
|
|
|
type MediaMTXHandler struct {
|
|
jwtMgr *crypto.JWTManager
|
|
db *gorm.DB
|
|
cfg config.MediaMTXConfig
|
|
bus *Broker
|
|
}
|
|
|
|
func NewMediaMTXHandler(db *gorm.DB, jwt *crypto.JWTManager, c config.MediaMTXConfig) *MediaMTXHandler {
|
|
return &MediaMTXHandler{db: db, jwtMgr: jwt, cfg: c, bus: NewBroker()}
|
|
}
|
|
|
|
// --- 3.1 External auth endpoint called by MediaMTX
|
|
// POST /mediamtx/auth
|
|
func (h *MediaMTXHandler) Auth(c *gin.Context) {
|
|
var req dto.MediaMTXAuthReq
|
|
body, _ := c.GetRawData()
|
|
c.Request.Body = io.NopCloser(bytes.NewReader(body))
|
|
|
|
c.Writer.WriteString(fmt.Sprintf("DEBUG BODY:\n%s\n", string(body)))
|
|
if err := c.ShouldBindJSON(&req); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "bad auth body"})
|
|
return
|
|
}
|
|
|
|
// token can come from Authorization: Bearer or from query (?token=)
|
|
tok := extractBearer(c.GetHeader("Authorization"))
|
|
|
|
if tok == "" && req.Query != "" {
|
|
tok = tokenFromQuery(req.Query) // Parse "token=" from the raw query string
|
|
}
|
|
|
|
if tok == "" {
|
|
tok = strings.TrimSpace(req.Token)
|
|
}
|
|
if tok == "" {
|
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
|
|
return
|
|
}
|
|
|
|
// parse & validate media token
|
|
mc, err := h.jwtMgr.ParseMedia(tok)
|
|
if err != nil {
|
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "invalid token"})
|
|
return
|
|
}
|
|
|
|
// enforce act/path
|
|
if mc.Act != req.Action {
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "action mismatch"})
|
|
return
|
|
}
|
|
if mc.Path != req.Path {
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "path mismatch"})
|
|
return
|
|
}
|
|
|
|
sub := mc.Subject // from RegisteredClaims.Subject
|
|
switch req.Action {
|
|
case "read":
|
|
if !h.canRead(sub, req.Path) {
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "no read permission"})
|
|
return
|
|
}
|
|
case "publish":
|
|
if !h.canPublish(sub, req.Path) {
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "no publish permission"})
|
|
return
|
|
}
|
|
}
|
|
|
|
// allowed
|
|
if req.Action == "publish" {
|
|
if guid, ok := guidFromPath(req.Path); ok {
|
|
_ = guid // not used here, but available
|
|
// tell listeners this path is live (or at least authorized to start)
|
|
if h.bus != nil {
|
|
h.bus.Publish(req.Path)
|
|
}
|
|
}
|
|
}
|
|
|
|
c.Status(http.StatusOK)
|
|
}
|
|
|
|
func extractBearer(h string) string {
|
|
if strings.HasPrefix(strings.ToLower(h), "bearer ") {
|
|
return strings.TrimSpace(h[7:])
|
|
}
|
|
return ""
|
|
}
|
|
func tokenFromQuery(raw string) string {
|
|
if raw == "" {
|
|
return ""
|
|
}
|
|
s := strings.TrimPrefix(raw, "?")
|
|
q, _ := url.ParseQuery(s)
|
|
return q.Get("token")
|
|
}
|
|
|
|
// naive helpers: replace with real queries to users/devices tables
|
|
func (h *MediaMTXHandler) canRead(sub, path string) bool {
|
|
// path is "live/<guid>"
|
|
parts := strings.SplitN(path, "/", 2)
|
|
if len(parts) != 2 {
|
|
return false
|
|
}
|
|
guid := parts[1]
|
|
|
|
// Find the user; admins -> allow; else check user_devices join
|
|
var u models.User
|
|
if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin {
|
|
return true
|
|
}
|
|
// check assignment
|
|
var count int64
|
|
_ = h.db.Table("user_devices").
|
|
Where("user_id = ? AND device_guid = ?", sub, guid).
|
|
Count(&count).Error
|
|
return count > 0
|
|
}
|
|
func (h *MediaMTXHandler) canPublish(sub, path string) bool {
|
|
// For devices you may use sub=0 or map to a device row; here: allow admins only
|
|
if sub == "0" {
|
|
return true
|
|
}
|
|
var u models.User
|
|
if err := h.db.Where("id = ?", sub).First(&u).Error; err == nil && u.Role == models.RoleAdmin {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// --- 3.2 Mint publish token (device flow) -> returns WHIP URL
|
|
// POST /mediamtx/token/publish {guid}
|
|
func (h *MediaMTXHandler) MintPublish(c *gin.Context) {
|
|
user, ok := GetUserContext(c)
|
|
if !ok {
|
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
|
|
return
|
|
}
|
|
var req dto.PublishTokenReq
|
|
if err := c.ShouldBindJSON(&req); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"})
|
|
return
|
|
}
|
|
|
|
// Permission check (admin or assigned)
|
|
if user.Role != models.RoleAdmin {
|
|
var count int64
|
|
_ = h.db.Table("user_devices").
|
|
Where("user_id = ? AND device_guid = ?", user.ID, req.GUID).
|
|
Count(&count).Error
|
|
if count == 0 {
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"})
|
|
return
|
|
}
|
|
}
|
|
|
|
path := "live/" + req.GUID
|
|
|
|
// We mint a *read* token for the browser to consume HLS.
|
|
tok, err := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL)
|
|
if err != nil {
|
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "token error"})
|
|
return
|
|
}
|
|
|
|
pub := strings.TrimRight(h.cfg.PublicBaseURL, "/")
|
|
hls := fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s",
|
|
pub,
|
|
path,
|
|
url.QueryEscape(tok),
|
|
)
|
|
|
|
c.JSON(http.StatusCreated, dto.PublishTokenResp{HLS: hls})
|
|
}
|
|
|
|
// --- 3.3 Mint read token (user flow) -> returns HLS + WHEP URLs
|
|
// POST /mediamtx/token/read {guid}
|
|
func (h *MediaMTXHandler) MintRead(c *gin.Context) {
|
|
user, ok := GetUserContext(c)
|
|
if !ok {
|
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
|
|
return
|
|
}
|
|
var req dto.ReadTokenReq
|
|
if err := c.ShouldBindJSON(&req); err != nil {
|
|
c.JSON(http.StatusBadRequest, gin.H{"error": "bad request"})
|
|
return
|
|
}
|
|
path := "live/" + req.GUID
|
|
|
|
// check permission before minting
|
|
if user.Role != models.RoleAdmin {
|
|
var count int64
|
|
_ = h.db.Table("user_devices").
|
|
Where("user_id = ? AND device_guid = ?", user.ID, req.GUID).
|
|
Count(&count).Error
|
|
if count == 0 {
|
|
c.JSON(http.StatusForbidden, gin.H{"error": "not allowed for this device"})
|
|
return
|
|
}
|
|
}
|
|
|
|
tok, _ := h.jwtMgr.GenerateMediaToken(user.ID, "read", path, h.cfg.TokenTTL)
|
|
|
|
pub := strings.TrimRight(h.cfg.PublicBaseURL, "/")
|
|
resp := dto.ReadTokenResp{
|
|
HLS: fmt.Sprintf("%s/hls/%s/index.m3u8?token=%s", pub, path, url.QueryEscape(tok)),
|
|
WHEP: fmt.Sprintf("%s/webrtc/play/%s?token=%s", pub, path, url.QueryEscape(tok)),
|
|
}
|
|
c.JSON(http.StatusCreated, resp)
|
|
}
|
|
|
|
// --- 3.4 Admin "controls" using MediaMTX Control API (v3)
|
|
type pathsListRes struct {
|
|
Items []struct {
|
|
Name string `json:"name"`
|
|
} `json:"items"`
|
|
}
|
|
|
|
func (h *MediaMTXHandler) ListPaths(c *gin.Context) {
|
|
// GET {apiBase}/v3/paths/list
|
|
resp, err := http.Get(strings.TrimRight(h.cfg.APIBase, "/") + "/v3/paths/list")
|
|
if err != nil {
|
|
c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"})
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
c.JSON(resp.StatusCode, gin.H{"error": "mtx api error"})
|
|
return
|
|
}
|
|
var pl pathsListRes
|
|
if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil {
|
|
c.JSON(http.StatusBadGateway, gin.H{"error": "decode error"})
|
|
return
|
|
}
|
|
c.JSON(200, pl)
|
|
}
|
|
|
|
func (h *MediaMTXHandler) KickWebRTC(c *gin.Context) {
|
|
// POST {apiBase}/v3/webrtcsessions/kick/{id}
|
|
id := c.Param("id")
|
|
reqURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/kick/" + url.PathEscape(id)
|
|
httpResp, err := http.Post(reqURL, "application/json", http.NoBody)
|
|
if err != nil {
|
|
c.JSON(http.StatusBadGateway, gin.H{"error": "mtx api unreachable"})
|
|
return
|
|
}
|
|
defer httpResp.Body.Close()
|
|
if httpResp.StatusCode/100 != 2 {
|
|
c.JSON(httpResp.StatusCode, gin.H{"error": "kick failed"})
|
|
return
|
|
}
|
|
c.Status(http.StatusNoContent)
|
|
}
|
|
|
|
func (h *MediaMTXHandler) StartStreamPayload(guid string) (string, error) {
|
|
path := "live/" + guid
|
|
ttl := time.Duration(h.cfg.TokenTTL) * time.Second
|
|
|
|
tok, err := h.jwtMgr.GenerateMediaToken(0, "publish", path, ttl) // sub=0 = device
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
whip := fmt.Sprintf("%s/whip/%s?token=%s",
|
|
strings.TrimRight(h.cfg.PublicBaseURL, "/"),
|
|
path,
|
|
url.QueryEscape(tok),
|
|
)
|
|
|
|
payload := map[string]any{
|
|
"whipUrl": whip,
|
|
"path": path,
|
|
"tokenTTL_s": h.cfg.TokenTTL,
|
|
}
|
|
b, _ := json.Marshal(payload)
|
|
return string(b), nil
|
|
}
|
|
|
|
type webrtcSession struct {
|
|
ID string `json:"id"`
|
|
Path string `json:"path"`
|
|
}
|
|
type webrtcListRes struct {
|
|
Items []webrtcSession `json:"items"`
|
|
}
|
|
|
|
// KickWebRTCSessionsByPath lists and kicks webrtc sessions for a given path.
|
|
func (h *MediaMTXHandler) KickWebRTCSessionsByPath(path string) error {
|
|
listURL := strings.TrimRight(h.cfg.APIBase, "/") + "/v3/webrtcsessions/list"
|
|
|
|
resp, err := http.Get(listURL)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get list: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return fmt.Errorf("mtx list failed: %s", resp.Status)
|
|
}
|
|
|
|
var l webrtcListRes
|
|
if err := json.NewDecoder(resp.Body).Decode(&l); err != nil {
|
|
return fmt.Errorf("decode error: %w", err)
|
|
}
|
|
|
|
for _, it := range l.Items {
|
|
if it.Path == path && it.ID != "" {
|
|
kickURL := strings.TrimRight(h.cfg.APIBase, "/") +
|
|
"/v3/webrtcsessions/kick/" + url.PathEscape(it.ID)
|
|
|
|
kresp, err := http.Post(kickURL, "application/json", nil)
|
|
if err != nil {
|
|
// log and continue
|
|
continue
|
|
}
|
|
kresp.Body.Close()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func BodyLogger() gin.HandlerFunc {
|
|
return func(c *gin.Context) {
|
|
if c.Request.Method == "POST" && strings.Contains(c.Request.URL.Path, "/mediamtx/auth") {
|
|
body, _ := c.GetRawData()
|
|
fmt.Fprintf(gin.DefaultWriter, "[MTX-AUTH] %s\n", string(body))
|
|
c.Request.Body = io.NopCloser(bytes.NewBuffer(body))
|
|
}
|
|
c.Next()
|
|
}
|
|
}
|
|
|
|
// --- poll MediaMTX API until path is live ------------------------------------
|
|
|
|
func (h *MediaMTXHandler) WaitUntilLive(path string, timeout time.Duration) bool {
|
|
api := strings.TrimRight(h.cfg.APIBase, "/")
|
|
deadline := time.Now().Add(timeout)
|
|
|
|
for time.Now().Before(deadline) {
|
|
resp, err := http.Get(api + "/v3/paths/list")
|
|
if err == nil && resp.StatusCode == 200 {
|
|
var pl pathsListRes
|
|
_ = json.NewDecoder(resp.Body).Decode(&pl)
|
|
resp.Body.Close()
|
|
for _, it := range pl.Items {
|
|
if it.Name == path {
|
|
return true
|
|
}
|
|
}
|
|
} else if resp != nil {
|
|
resp.Body.Close()
|
|
}
|
|
time.Sleep(500 * time.Millisecond)
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (h *MediaMTXHandler) expectedStartWait(guid string) time.Duration {
|
|
var cfg models.DeviceConfig
|
|
if err := h.db.Where("device_guid = ?", guid).First(&cfg).Error; err != nil {
|
|
return 60 * time.Second // fallback if no config row
|
|
}
|
|
|
|
poll := cfg.MPolling
|
|
jit := cfg.MJitter
|
|
if poll <= 0 {
|
|
poll = 60
|
|
}
|
|
if jit < 0 {
|
|
jit = 10
|
|
}
|
|
|
|
safety := 5 // seconds
|
|
return time.Duration(poll+jit+safety) * time.Second
|
|
}
|
|
|
|
// GET /streams/:guid/wait
|
|
func (h *MediaMTXHandler) WaitLiveSSE(c *gin.Context) {
|
|
guid := c.Param("guid")
|
|
if guid == "" {
|
|
c.Status(http.StatusBadRequest)
|
|
return
|
|
}
|
|
path := "live/" + guid
|
|
|
|
// Per-device max wait = MPolling + MJitter + safety
|
|
timeout := h.expectedStartWait(guid)
|
|
|
|
c.Header("Content-Type", "text/event-stream")
|
|
c.Header("Cache-Control", "no-cache")
|
|
c.Header("Connection", "keep-alive")
|
|
|
|
flush := func() {
|
|
if f, ok := c.Writer.(http.Flusher); ok {
|
|
f.Flush()
|
|
}
|
|
}
|
|
|
|
// If already live, notify immediately and exit.
|
|
if h.IsLive(path) {
|
|
fmt.Fprintf(c.Writer, "event: live\ndata: %s\n\n", path)
|
|
flush()
|
|
return
|
|
}
|
|
|
|
// Subscribe to bus for publish-auth events on this path
|
|
ch := h.bus.Subscribe(path)
|
|
defer h.bus.Unsubscribe(path, ch)
|
|
|
|
// Background short poller (safety net in case bus event is missed)
|
|
ctx, cancel := context.WithTimeout(c.Request.Context(), timeout)
|
|
defer cancel()
|
|
|
|
pollDone := make(chan struct{})
|
|
go func() {
|
|
t := time.NewTicker(500 * time.Millisecond)
|
|
defer t.Stop()
|
|
defer close(pollDone)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case <-t.C:
|
|
if h.IsLive(path) {
|
|
// Normalize through bus so waiter below handles it uniformly
|
|
h.bus.Publish(path)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Wait for either: bus event, timeout, or client disconnect.
|
|
select {
|
|
case <-ch:
|
|
fmt.Fprintf(c.Writer, "event: live\ndata: %s\n\n", path)
|
|
flush()
|
|
return
|
|
case <-ctx.Done():
|
|
// Optional: tell client we timed out so it can keep a gentle retry loop
|
|
fmt.Fprintf(c.Writer, "event: timeout\ndata: {\"path\":\"%s\"}\n\n", path)
|
|
flush()
|
|
return
|
|
case <-c.Request.Context().Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
// --- helpers: path/guid ------------------------------------------------------
|
|
|
|
func guidFromPath(path string) (string, bool) {
|
|
parts := strings.SplitN(path, "/", 2)
|
|
if len(parts) != 2 || parts[0] != "live" || parts[1] == "" {
|
|
return "", false
|
|
}
|
|
return parts[1], true
|
|
}
|
|
|
|
// IsLive performs a single check against MTX API to see if the path exists now.
|
|
func (h *MediaMTXHandler) IsLive(path string) bool {
|
|
api := strings.TrimRight(h.cfg.APIBase, "/")
|
|
resp, err := http.Get(api + "/v3/paths/list")
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer resp.Body.Close()
|
|
if resp.StatusCode != 200 {
|
|
return false
|
|
}
|
|
var pl pathsListRes
|
|
if err := json.NewDecoder(resp.Body).Decode(&pl); err != nil {
|
|
return false
|
|
}
|
|
for _, it := range pl.Items {
|
|
if it.Name == path {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|