package handlers import ( "bytes" "context" "fmt" "io" "net/http" "sync" "time" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/minio/minio-go/v7" ) // --- Minimal in-file hub (per-GUID), fanout to viewers and upload to MinIO --- type stream struct { GUID string writer io.WriteCloser active bool object string mu sync.RWMutex view map[*viewer]struct{} } // Close implements io.WriteCloser. func (s *stream) Close() error { panic("unimplemented") } func (s *stream) Write(p []byte) (int, error) { s.mu.RLock() defer s.mu.RUnlock() if !s.active { return 0, fmt.Errorf("not active") } for v := range s.view { select { case v.out <- p: default: } } return s.writer.Write(p) } type viewer struct{ out chan []byte } type liveHub struct { mu sync.RWMutex streams map[string]*stream minio *minio.Client bucket string } func newLiveHub(mc *minio.Client, bucket string) *liveHub { return &liveHub{streams: map[string]*stream{}, minio: mc, bucket: bucket} } func (h *liveHub) start(guid string) (io.WriteCloser, string, error) { h.mu.Lock() defer h.mu.Unlock() s, ok := h.streams[guid] if !ok { s = &stream{GUID: guid, view: map[*viewer]struct{}{}} h.streams[guid] = s } if s.active { return nil, "", fmt.Errorf("already active") } key := fmt.Sprintf("%s/live_%d.raw", guid, time.Now().Unix()) pr, pw := io.Pipe() s.writer = pw s.object = key s.active = true go func(reader io.Reader, bucket, object string) { // naive buffering for simplicity buf := new(bytes.Buffer) _, _ = io.Copy(buf, reader) _, _ = h.minio.PutObject(context.Background(), bucket, object, bytes.NewReader(buf.Bytes()), int64(buf.Len()), minio.PutObjectOptions{ ContentType: "application/octet-stream", }) }(pr, h.bucket, key) return s, key, nil } func (h *liveHub) stop(guid string) (string, error) { h.mu.Lock() defer h.mu.Unlock() s, ok := h.streams[guid] if !ok || !s.active { return "", fmt.Errorf("not active") } _ = s.writer.Close() s.active = false return s.object, nil } func (h *liveHub) join(guid string) *viewer { h.mu.Lock() defer h.mu.Unlock() s, ok := h.streams[guid] if !ok { s = &stream{GUID: guid, view: map[*viewer]struct{}{}} h.streams[guid] = s } v := &viewer{out: make(chan []byte, 64)} s.view[v] = struct{}{} return v } func (h *liveHub) leave(guid string, v *viewer) { h.mu.Lock() defer h.mu.Unlock() if s, ok := h.streams[guid]; ok { delete(s.view, v) close(v.out) } } // --- Gin handler --- type LivestreamHandler struct { hub *liveHub } func NewLivestreamHandler(minio *minio.Client, bucket string) *LivestreamHandler { return &LivestreamHandler{hub: newLiveHub(minio, bucket)} } var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }} type wsMsg struct { Type string `json:"type"` // "start", "stop" Role string `json:"role"` // "device" or "viewer" GUID string `json:"guid"` // required Format string `json:"format"` // optional } func (h *LivestreamHandler) Upgrade(c *gin.Context) { conn, err := upgrader.Upgrade(c.Writer, c.Request, nil) if err != nil { return } defer conn.Close() for { var m wsMsg if err := conn.ReadJSON(&m); err != nil { return } if m.GUID == "" { _ = conn.WriteJSON(gin.H{"error": "guid required"}) continue } switch m.Role { case "device": switch m.Type { case "start": w, object, err := h.hub.start(m.GUID) if err != nil { _ = conn.WriteJSON(gin.H{"error": err.Error()}) continue } _ = conn.WriteJSON(gin.H{"ok": true, "object": object}) // read binary frames until stop/close for { mt, data, err := conn.ReadMessage() if err != nil { h.hub.stop(m.GUID) return } if mt == websocket.BinaryMessage { _, _ = w.Write(data) } else if mt == websocket.TextMessage { // best-effort: if a text control announcing stop arrives var ctrl wsMsg if err := conn.ReadJSON(&ctrl); err == nil && ctrl.Type == "stop" { h.hub.stop(m.GUID) return } } } case "stop": _, _ = h.hub.stop(m.GUID) _ = conn.WriteJSON(gin.H{"ok": true}) default: _ = conn.WriteJSON(gin.H{"error": "unknown type"}) } case "viewer": v := h.hub.join(m.GUID) defer h.hub.leave(m.GUID, v) _ = conn.WriteJSON(gin.H{"ok": true}) for frame := range v.out { if err := conn.WriteMessage(websocket.BinaryMessage, frame); err != nil { return } } default: _ = conn.WriteJSON(gin.H{"error": "unknown role"}) } } }