212 lines
4.7 KiB
Go
212 lines
4.7 KiB
Go
package handlers
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/minio/minio-go/v7"
|
|
)
|
|
|
|
// --- Minimal in-file hub (per-GUID), fanout to viewers and upload to MinIO ---
|
|
|
|
type stream struct {
|
|
GUID string
|
|
writer io.WriteCloser
|
|
active bool
|
|
object string
|
|
mu sync.RWMutex
|
|
view map[*viewer]struct{}
|
|
}
|
|
|
|
// Close implements io.WriteCloser.
|
|
func (s *stream) Close() error {
|
|
panic("unimplemented")
|
|
}
|
|
|
|
func (s *stream) Write(p []byte) (int, error) {
|
|
s.mu.RLock()
|
|
defer s.mu.RUnlock()
|
|
if !s.active {
|
|
return 0, fmt.Errorf("not active")
|
|
}
|
|
for v := range s.view {
|
|
select {
|
|
case v.out <- p:
|
|
default:
|
|
}
|
|
}
|
|
return s.writer.Write(p)
|
|
}
|
|
|
|
type viewer struct{ out chan []byte }
|
|
|
|
type liveHub struct {
|
|
mu sync.RWMutex
|
|
streams map[string]*stream
|
|
minio *minio.Client
|
|
bucket string
|
|
}
|
|
|
|
func newLiveHub(mc *minio.Client, bucket string) *liveHub {
|
|
return &liveHub{streams: map[string]*stream{}, minio: mc, bucket: bucket}
|
|
}
|
|
|
|
func (h *liveHub) start(guid string) (io.WriteCloser, string, error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
s, ok := h.streams[guid]
|
|
if !ok {
|
|
s = &stream{GUID: guid, view: map[*viewer]struct{}{}}
|
|
h.streams[guid] = s
|
|
}
|
|
if s.active {
|
|
return nil, "", fmt.Errorf("already active")
|
|
}
|
|
key := fmt.Sprintf("%s/live_%d.raw", guid, time.Now().Unix())
|
|
|
|
pr, pw := io.Pipe()
|
|
s.writer = pw
|
|
s.object = key
|
|
s.active = true
|
|
|
|
go func(reader io.Reader, bucket, object string) {
|
|
// naive buffering for simplicity
|
|
buf := new(bytes.Buffer)
|
|
_, _ = io.Copy(buf, reader)
|
|
_, _ = h.minio.PutObject(context.Background(), bucket, object, bytes.NewReader(buf.Bytes()), int64(buf.Len()), minio.PutObjectOptions{
|
|
ContentType: "application/octet-stream",
|
|
})
|
|
}(pr, h.bucket, key)
|
|
|
|
return s, key, nil
|
|
}
|
|
|
|
func (h *liveHub) stop(guid string) (string, error) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
s, ok := h.streams[guid]
|
|
if !ok || !s.active {
|
|
return "", fmt.Errorf("not active")
|
|
}
|
|
_ = s.writer.Close()
|
|
s.active = false
|
|
return s.object, nil
|
|
}
|
|
|
|
func (h *liveHub) join(guid string) *viewer {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
s, ok := h.streams[guid]
|
|
if !ok {
|
|
s = &stream{GUID: guid, view: map[*viewer]struct{}{}}
|
|
h.streams[guid] = s
|
|
}
|
|
v := &viewer{out: make(chan []byte, 64)}
|
|
s.view[v] = struct{}{}
|
|
return v
|
|
}
|
|
|
|
func (h *liveHub) leave(guid string, v *viewer) {
|
|
h.mu.Lock()
|
|
defer h.mu.Unlock()
|
|
|
|
if s, ok := h.streams[guid]; ok {
|
|
delete(s.view, v)
|
|
close(v.out)
|
|
}
|
|
}
|
|
|
|
// --- Gin handler ---
|
|
|
|
type LivestreamHandler struct {
|
|
hub *liveHub
|
|
}
|
|
|
|
func NewLivestreamHandler(minio *minio.Client, bucket string) *LivestreamHandler {
|
|
return &LivestreamHandler{hub: newLiveHub(minio, bucket)}
|
|
}
|
|
|
|
var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
|
|
|
|
type wsMsg struct {
|
|
Type string `json:"type"` // "start", "stop"
|
|
Role string `json:"role"` // "device" or "viewer"
|
|
GUID string `json:"guid"` // required
|
|
Format string `json:"format"` // optional
|
|
}
|
|
|
|
func (h *LivestreamHandler) Upgrade(c *gin.Context) {
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
for {
|
|
var m wsMsg
|
|
if err := conn.ReadJSON(&m); err != nil {
|
|
return
|
|
}
|
|
if m.GUID == "" {
|
|
_ = conn.WriteJSON(gin.H{"error": "guid required"})
|
|
continue
|
|
}
|
|
switch m.Role {
|
|
case "device":
|
|
switch m.Type {
|
|
case "start":
|
|
w, object, err := h.hub.start(m.GUID)
|
|
if err != nil {
|
|
_ = conn.WriteJSON(gin.H{"error": err.Error()})
|
|
continue
|
|
}
|
|
_ = conn.WriteJSON(gin.H{"ok": true, "object": object})
|
|
// read binary frames until stop/close
|
|
for {
|
|
mt, data, err := conn.ReadMessage()
|
|
if err != nil {
|
|
h.hub.stop(m.GUID)
|
|
return
|
|
}
|
|
if mt == websocket.BinaryMessage {
|
|
_, _ = w.Write(data)
|
|
} else if mt == websocket.TextMessage {
|
|
// best-effort: if a text control announcing stop arrives
|
|
var ctrl wsMsg
|
|
if err := conn.ReadJSON(&ctrl); err == nil && ctrl.Type == "stop" {
|
|
h.hub.stop(m.GUID)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
case "stop":
|
|
_, _ = h.hub.stop(m.GUID)
|
|
_ = conn.WriteJSON(gin.H{"ok": true})
|
|
default:
|
|
_ = conn.WriteJSON(gin.H{"error": "unknown type"})
|
|
}
|
|
case "viewer":
|
|
v := h.hub.join(m.GUID)
|
|
defer h.hub.leave(m.GUID, v)
|
|
_ = conn.WriteJSON(gin.H{"ok": true})
|
|
for frame := range v.out {
|
|
if err := conn.WriteMessage(websocket.BinaryMessage, frame); err != nil {
|
|
return
|
|
}
|
|
}
|
|
default:
|
|
_ = conn.WriteJSON(gin.H{"error": "unknown role"})
|
|
}
|
|
}
|
|
}
|