first commit, i i have no idea what i have done
This commit is contained in:
211
server/internal/handlers/livestream.go
Normal file
211
server/internal/handlers/livestream.go
Normal file
@@ -0,0 +1,211 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/minio/minio-go/v7"
|
||||
)
|
||||
|
||||
// --- Minimal in-file hub (per-GUID), fanout to viewers and upload to MinIO ---
|
||||
|
||||
type stream struct {
|
||||
GUID string
|
||||
writer io.WriteCloser
|
||||
active bool
|
||||
object string
|
||||
mu sync.RWMutex
|
||||
view map[*viewer]struct{}
|
||||
}
|
||||
|
||||
// Close implements io.WriteCloser.
|
||||
func (s *stream) Close() error {
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
func (s *stream) Write(p []byte) (int, error) {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if !s.active {
|
||||
return 0, fmt.Errorf("not active")
|
||||
}
|
||||
for v := range s.view {
|
||||
select {
|
||||
case v.out <- p:
|
||||
default:
|
||||
}
|
||||
}
|
||||
return s.writer.Write(p)
|
||||
}
|
||||
|
||||
type viewer struct{ out chan []byte }
|
||||
|
||||
type liveHub struct {
|
||||
mu sync.RWMutex
|
||||
streams map[string]*stream
|
||||
minio *minio.Client
|
||||
bucket string
|
||||
}
|
||||
|
||||
func newLiveHub(mc *minio.Client, bucket string) *liveHub {
|
||||
return &liveHub{streams: map[string]*stream{}, minio: mc, bucket: bucket}
|
||||
}
|
||||
|
||||
func (h *liveHub) start(guid string) (io.WriteCloser, string, error) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
s, ok := h.streams[guid]
|
||||
if !ok {
|
||||
s = &stream{GUID: guid, view: map[*viewer]struct{}{}}
|
||||
h.streams[guid] = s
|
||||
}
|
||||
if s.active {
|
||||
return nil, "", fmt.Errorf("already active")
|
||||
}
|
||||
key := fmt.Sprintf("%s/live_%d.raw", guid, time.Now().Unix())
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
s.writer = pw
|
||||
s.object = key
|
||||
s.active = true
|
||||
|
||||
go func(reader io.Reader, bucket, object string) {
|
||||
// naive buffering for simplicity
|
||||
buf := new(bytes.Buffer)
|
||||
_, _ = io.Copy(buf, reader)
|
||||
_, _ = h.minio.PutObject(context.Background(), bucket, object, bytes.NewReader(buf.Bytes()), int64(buf.Len()), minio.PutObjectOptions{
|
||||
ContentType: "application/octet-stream",
|
||||
})
|
||||
}(pr, h.bucket, key)
|
||||
|
||||
return s, key, nil
|
||||
}
|
||||
|
||||
func (h *liveHub) stop(guid string) (string, error) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
s, ok := h.streams[guid]
|
||||
if !ok || !s.active {
|
||||
return "", fmt.Errorf("not active")
|
||||
}
|
||||
_ = s.writer.Close()
|
||||
s.active = false
|
||||
return s.object, nil
|
||||
}
|
||||
|
||||
func (h *liveHub) join(guid string) *viewer {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
s, ok := h.streams[guid]
|
||||
if !ok {
|
||||
s = &stream{GUID: guid, view: map[*viewer]struct{}{}}
|
||||
h.streams[guid] = s
|
||||
}
|
||||
v := &viewer{out: make(chan []byte, 64)}
|
||||
s.view[v] = struct{}{}
|
||||
return v
|
||||
}
|
||||
|
||||
func (h *liveHub) leave(guid string, v *viewer) {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if s, ok := h.streams[guid]; ok {
|
||||
delete(s.view, v)
|
||||
close(v.out)
|
||||
}
|
||||
}
|
||||
|
||||
// --- Gin handler ---
|
||||
|
||||
type LivestreamHandler struct {
|
||||
hub *liveHub
|
||||
}
|
||||
|
||||
func NewLivestreamHandler(minio *minio.Client, bucket string) *LivestreamHandler {
|
||||
return &LivestreamHandler{hub: newLiveHub(minio, bucket)}
|
||||
}
|
||||
|
||||
var upgrader = websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}
|
||||
|
||||
type wsMsg struct {
|
||||
Type string `json:"type"` // "start", "stop"
|
||||
Role string `json:"role"` // "device" or "viewer"
|
||||
GUID string `json:"guid"` // required
|
||||
Format string `json:"format"` // optional
|
||||
}
|
||||
|
||||
func (h *LivestreamHandler) Upgrade(c *gin.Context) {
|
||||
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
for {
|
||||
var m wsMsg
|
||||
if err := conn.ReadJSON(&m); err != nil {
|
||||
return
|
||||
}
|
||||
if m.GUID == "" {
|
||||
_ = conn.WriteJSON(gin.H{"error": "guid required"})
|
||||
continue
|
||||
}
|
||||
switch m.Role {
|
||||
case "device":
|
||||
switch m.Type {
|
||||
case "start":
|
||||
w, object, err := h.hub.start(m.GUID)
|
||||
if err != nil {
|
||||
_ = conn.WriteJSON(gin.H{"error": err.Error()})
|
||||
continue
|
||||
}
|
||||
_ = conn.WriteJSON(gin.H{"ok": true, "object": object})
|
||||
// read binary frames until stop/close
|
||||
for {
|
||||
mt, data, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
h.hub.stop(m.GUID)
|
||||
return
|
||||
}
|
||||
if mt == websocket.BinaryMessage {
|
||||
_, _ = w.Write(data)
|
||||
} else if mt == websocket.TextMessage {
|
||||
// best-effort: if a text control announcing stop arrives
|
||||
var ctrl wsMsg
|
||||
if err := conn.ReadJSON(&ctrl); err == nil && ctrl.Type == "stop" {
|
||||
h.hub.stop(m.GUID)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
case "stop":
|
||||
_, _ = h.hub.stop(m.GUID)
|
||||
_ = conn.WriteJSON(gin.H{"ok": true})
|
||||
default:
|
||||
_ = conn.WriteJSON(gin.H{"error": "unknown type"})
|
||||
}
|
||||
case "viewer":
|
||||
v := h.hub.join(m.GUID)
|
||||
defer h.hub.leave(m.GUID, v)
|
||||
_ = conn.WriteJSON(gin.H{"ok": true})
|
||||
for frame := range v.out {
|
||||
if err := conn.WriteMessage(websocket.BinaryMessage, frame); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
default:
|
||||
_ = conn.WriteJSON(gin.H{"error": "unknown role"})
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user