package handlers import ( "errors" "net/http" "strconv" "strings" "time" "github.com/gin-gonic/gin" "gorm.io/gorm" "smoop-api/internal/dto" "smoop-api/internal/models" ) type TasksHandler struct { db *gorm.DB mtxH *MediaMTXHandler } func NewTasksHandler(db *gorm.DB, mtxH *MediaMTXHandler) *TasksHandler { return &TasksHandler{ db: db, mtxH: mtxH, } } // ----------------------------------------------------------------------------- // 1) Device heartbeat + fetch next task // GET /tasks/:guid // Returns 204 (no task) OR next pending task (and marks it Running) // ----------------------------------------------------------------------------- func (h *TasksHandler) DeviceNextTask(c *gin.Context) { guid := c.Param("guid") // Optional: verify device exists to avoid orphan tasks var dev models.Device if err := h.db.First(&dev, "guid = ?", guid).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { c.JSON(http.StatusNotFound, gin.H{"error": "device not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } var task models.DEviceTask now := time.Now() // Atomically pick the oldest pending task for device and mark Running err := h.db.Transaction(func(tx *gorm.DB) error { // FOR UPDATE SKIP LOCKED emulation is not universal in GORM; // simplest: select oldest pending, then optimistic update. if err := tx. // NOTE: if using Postgres, you can append `Clauses(clause.Locking{Strength: "UPDATE SKIP LOCKED"})` // for better concurrency. Keeping generic here. Where("device_guid = ? AND status = ?", guid, models.TaskStatusPending). Order("created_at ASC"). Take(&task).Error; err != nil { return err } task.Status = models.TaskStatusRunning task.StartedAt = &now return tx.Save(&task).Error }) if err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { // Heartbeat: no task right now c.Status(http.StatusNoContent) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"}) return } c.JSON(http.StatusOK, dto.NextTaskResponseDto{ HasTask: true, Task: ptr(dto.MapTask(task)), }) } // ----------------------------------------------------------------------------- // 2) Device posts result // POST /tasks/:guid // ----------------------------------------------------------------------------- func (h *TasksHandler) DevicePostResult(c *gin.Context) { guid := c.Param("guid") var req dto.TaskResultDto if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } var task models.DEviceTask if err := h.db.Where("id = ? AND device_guid = ?", req.TaskID, guid).First(&task).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { c.JSON(http.StatusNotFound, gin.H{"error": "task not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } // Only Running tasks can be completed if task.Status != models.TaskStatusRunning { c.JSON(http.StatusConflict, gin.H{"error": "task not in running state"}) return } now := time.Now() if req.Success { task.Status = models.TaskStatusFinished task.Result = req.Result task.ErrorMsg = "" } else { task.Status = models.TaskStatusError task.Result = req.Result task.ErrorMsg = req.ErrorMsg } task.FinishedAt = &now if err := h.db.Save(&task).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "save failed"}) return } c.JSON(http.StatusOK, dto.MapTask(task)) } // ----------------------------------------------------------------------------- // 3) Create task for device (user/admin creates work item) // POST /device/:guid/task // ----------------------------------------------------------------------------- func (h *TasksHandler) CreateTask(c *gin.Context) { guid := c.Param("guid") // Permission check: device must be visible for user (DeviceAccessFilter already applied) if err := h.requireDeviceVisible(c, guid); err != nil { // err already wrote response return } var req dto.CreateTaskDto if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } // Validate that device exists var d models.Device if err := h.db.Where("guid = ?", guid).First(&d).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { c.JSON(http.StatusNotFound, gin.H{"error": "device not found"}) return } c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) return } switch req.Type { case models.TaskTypeStartStream: payload, err := h.mtxH.StartStreamPayload(guid) if err != nil { c.JSON(http.StatusBadGateway, gin.H{"error": "failed to build whip url"}) return } req.Payload = payload case models.TaskTypeStopStream: // best-effort server-side stop (kick publishers/readers on that path) _ = h.mtxH.KickWebRTCSessionsByPath("live/" + guid) if strings.TrimSpace(req.Payload) == "" { req.Payload = `{"reason":"server_stop"}` } } task := models.DEviceTask{ DeviceGUID: guid, Type: req.Type, Payload: req.Payload, // raw JSON string Status: models.TaskStatusPending, } if err := h.db.Create(&task).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "create failed"}) return } c.JSON(http.StatusCreated, dto.MapTask(task)) } // ----------------------------------------------------------------------------- // 4) List tasks for device (with status & pagination) // GET /device/:guid/tasks?offset=0&limit=50&status=pending|running|finished|error // ----------------------------------------------------------------------------- func (h *TasksHandler) ListDeviceTasks(c *gin.Context) { guid := c.Param("guid") // Permission check if err := h.requireDeviceVisible(c, guid); err != nil { return } offset := atoiDefault(c.DefaultQuery("offset", "0"), 0) limit := atoiDefault(c.DefaultQuery("limit", "50"), 50) if limit <= 0 || limit > 200 { limit = 50 } filterStatus := c.Query("status") // optional q := h.db.Model(&models.DEviceTask{}).Where("device_guid = ?", guid) if filterStatus != "" { q = q.Where("status = ?", filterStatus) } var total int64 if err := q.Count(&total).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "count failed"}) return } var tasks []models.DEviceTask if err := q.Order("id DESC").Offset(offset).Limit(limit).Find(&tasks).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) return } out := make([]dto.TaskDto, 0, len(tasks)) for _, t := range tasks { out = append(out, dto.MapTask(t)) } c.JSON(http.StatusOK, gin.H{ "tasks": out, "offset": offset, "limit": limit, "total": total, }) } // --- helpers ----------------------------------------------------------------- func (h *TasksHandler) requireDeviceVisible(c *gin.Context, guid string) error { // Admins can always see it; regular users must be assigned uc, ok := GetUserContext(c) if !ok { c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) return errors.New("unauthorized") } if uc.Role == models.RoleAdmin { return nil } // check assignment var cnt int64 if err := h.db.Table("user_devices"). Where("user_id = ? AND device_guid = ?", uc.ID, guid). Count(&cnt).Error; err != nil { c.JSON(http.StatusInternalServerError, gin.H{"error": "permission check failed"}) return err } if cnt == 0 { c.JSON(http.StatusForbidden, gin.H{"error": "forbidden"}) return errors.New("forbidden") } return nil } func atoiDefault(s string, def int) int { n, err := strconv.Atoi(s) if err != nil { return def } return n } func ptr[T any](v T) *T { return &v }