From 35e59c487909031762e10e543830328e1c53dae0 Mon Sep 17 00:00:00 2001 From: dtv Date: Sat, 4 Oct 2025 22:13:53 +0300 Subject: [PATCH] created tasks endpoint for devices --- server/internal/db/db.go | 1 + server/internal/dto/task.go | 57 +++++++ server/internal/handlers/tasks.go | 251 ++++++++++++++++++++++++++++++ server/internal/models/task.go | 51 ++++++ server/internal/router/router.go | 9 ++ 5 files changed, 369 insertions(+) create mode 100644 server/internal/dto/task.go create mode 100644 server/internal/handlers/tasks.go create mode 100644 server/internal/models/task.go diff --git a/server/internal/db/db.go b/server/internal/db/db.go index 6050894..874ef89 100644 --- a/server/internal/db/db.go +++ b/server/internal/db/db.go @@ -19,5 +19,6 @@ func AutoMigrate(db *gorm.DB) error { &models.UserDevice{}, &models.Tracker{}, &models.UserTracker{}, + &models.DEviceTask{}, ) } diff --git a/server/internal/dto/task.go b/server/internal/dto/task.go new file mode 100644 index 0000000..6edd569 --- /dev/null +++ b/server/internal/dto/task.go @@ -0,0 +1,57 @@ +package dto + +import ( + "smoop-api/internal/models" + "time" +) + +type TaskDto struct { + ID uint `json:"id"` + DeviceGUID string `json:"deviceGuid"` + Type models.DeviceTaskType `json:"type"` + Payload string `json:"payload"` // raw JSON string + Status models.TaskStatus `json:"status"` + ErrorMsg string `json:"error,omitempty"` + Result string `json:"result,omitempty"` + CreatedAt time.Time `json:"createdAt"` + StartedAt *time.Time `json:"startedAt,omitempty"` + FinishedAt *time.Time `json:"finishedAt,omitempty"` +} + +func MapTask(t models.DEviceTask) TaskDto { + return TaskDto{ + ID: t.ID, + DeviceGUID: t.DeviceGUID, + Type: t.Type, + Payload: t.Payload, + Status: t.Status, + ErrorMsg: t.ErrorMsg, + Result: t.Result, + CreatedAt: t.CreatedAt, + StartedAt: t.StartedAt, + FinishedAt: t.FinishedAt, + } +} + +// Create a new task (server/user -> device) +type CreateTaskDto struct { + Type models.DeviceTaskType `json:"type" binding:"required,oneof=start_stream stop_stream start_recording stop_recording update_config set_deep_sleep"` + // Pass raw JSON string as payload (e.g. {"sleepTimeout":5,"jitterMs":50,"recordingDurationSec":60}) + // Keep it string to let device/server evolve freely. + Payload string `json:"payload" binding:"required"` +} + +// Device polls: single next task +type NextTaskResponseDto struct { + HasTask bool `json:"hasTask"` + Task *TaskDto `json:"task,omitempty"` +} + +// Device posts result +type TaskResultDto struct { + TaskID uint `json:"taskId" binding:"required"` + // success=true => Finished, success=false => Error + Success bool `json:"success"` + Result string `json:"result"` // raw JSON result from device + ErrorMsg string `json:"error"` // device-side reason if !Success +} diff --git a/server/internal/handlers/tasks.go b/server/internal/handlers/tasks.go new file mode 100644 index 0000000..f83f0e5 --- /dev/null +++ b/server/internal/handlers/tasks.go @@ -0,0 +1,251 @@ +package handlers + +import ( + "errors" + "net/http" + "strconv" + "time" + + "github.com/gin-gonic/gin" + "gorm.io/gorm" + + "smoop-api/internal/dto" + "smoop-api/internal/models" +) + +type TasksHandler struct { + db *gorm.DB +} + +func NewTasksHandler(db *gorm.DB) *TasksHandler { return &TasksHandler{db: db} } + +// ----------------------------------------------------------------------------- +// 1) Device heartbeat + fetch next task +// GET /tasks/:guid +// Returns 204 (no task) OR next pending task (and marks it Running) +// ----------------------------------------------------------------------------- +func (h *TasksHandler) DeviceNextTask(c *gin.Context) { + guid := c.Param("guid") + + // Optional: verify device exists to avoid orphan tasks + var dev models.Device + if err := h.db.First(&dev, "guid = ?", guid).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "device not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) + return + } + + var task models.DEviceTask + now := time.Now() + + // Atomically pick the oldest pending task for device and mark Running + err := h.db.Transaction(func(tx *gorm.DB) error { + // FOR UPDATE SKIP LOCKED emulation is not universal in GORM; + // simplest: select oldest pending, then optimistic update. + if err := tx. + // NOTE: if using Postgres, you can append `Clauses(clause.Locking{Strength: "UPDATE SKIP LOCKED"})` + // for better concurrency. Keeping generic here. + Where("device_guid = ? AND status = ?", guid, models.TaskStatusPending). + Order("created_at ASC"). + Take(&task).Error; err != nil { + return err + } + task.Status = models.TaskStatusRunning + task.StartedAt = &now + return tx.Save(&task).Error + }) + + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + // Heartbeat: no task right now + c.Status(http.StatusNoContent) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"}) + return + } + + c.JSON(http.StatusOK, dto.NextTaskResponseDto{ + HasTask: true, + Task: ptr(dto.MapTask(task)), + }) +} + +// ----------------------------------------------------------------------------- +// 2) Device posts result +// POST /tasks/:guid +// ----------------------------------------------------------------------------- +func (h *TasksHandler) DevicePostResult(c *gin.Context) { + guid := c.Param("guid") + var req dto.TaskResultDto + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + var task models.DEviceTask + if err := h.db.Where("id = ? AND device_guid = ?", req.TaskID, guid).First(&task).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "task not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) + return + } + + // Only Running tasks can be completed + if task.Status != models.TaskStatusRunning { + c.JSON(http.StatusConflict, gin.H{"error": "task not in running state"}) + return + } + + now := time.Now() + if req.Success { + task.Status = models.TaskStatusFinished + task.Result = req.Result + task.ErrorMsg = "" + } else { + task.Status = models.TaskStatusError + task.Result = req.Result + task.ErrorMsg = req.ErrorMsg + } + task.FinishedAt = &now + + if err := h.db.Save(&task).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "save failed"}) + return + } + + c.JSON(http.StatusOK, dto.MapTask(task)) +} + +// ----------------------------------------------------------------------------- +// 3) Create task for device (user/admin creates work item) +// POST /device/:guid/task +// ----------------------------------------------------------------------------- +func (h *TasksHandler) CreateTask(c *gin.Context) { + guid := c.Param("guid") + + // Permission check: device must be visible for user (DeviceAccessFilter already applied) + if err := h.requireDeviceVisible(c, guid); err != nil { + // err already wrote response + return + } + + var req dto.CreateTaskDto + if err := c.ShouldBindJSON(&req); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + + // Validate that device exists + var d models.Device + if err := h.db.Where("guid = ?", guid).First(&d).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + c.JSON(http.StatusNotFound, gin.H{"error": "device not found"}) + return + } + c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"}) + return + } + + task := models.DEviceTask{ + DeviceGUID: guid, + Type: req.Type, + Payload: req.Payload, // raw JSON string + Status: models.TaskStatusPending, + } + if err := h.db.Create(&task).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "create failed"}) + return + } + c.JSON(http.StatusCreated, dto.MapTask(task)) +} + +// ----------------------------------------------------------------------------- +// 4) List tasks for device (with status & pagination) +// GET /device/:guid/tasks?offset=0&limit=50&status=pending|running|finished|error +// ----------------------------------------------------------------------------- +func (h *TasksHandler) ListDeviceTasks(c *gin.Context) { + guid := c.Param("guid") + + // Permission check + if err := h.requireDeviceVisible(c, guid); err != nil { + return + } + + offset := atoiDefault(c.DefaultQuery("offset", "0"), 0) + limit := atoiDefault(c.DefaultQuery("limit", "50"), 50) + if limit <= 0 || limit > 200 { + limit = 50 + } + filterStatus := c.Query("status") // optional + + q := h.db.Model(&models.DEviceTask{}).Where("device_guid = ?", guid) + if filterStatus != "" { + q = q.Where("status = ?", filterStatus) + } + + var total int64 + if err := q.Count(&total).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "count failed"}) + return + } + + var tasks []models.DEviceTask + if err := q.Order("id DESC").Offset(offset).Limit(limit).Find(&tasks).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"}) + return + } + + out := make([]dto.TaskDto, 0, len(tasks)) + for _, t := range tasks { + out = append(out, dto.MapTask(t)) + } + c.JSON(http.StatusOK, gin.H{ + "tasks": out, + "offset": offset, + "limit": limit, + "total": total, + }) +} + +// --- helpers ----------------------------------------------------------------- + +func (h *TasksHandler) requireDeviceVisible(c *gin.Context, guid string) error { + // Admins can always see it; regular users must be assigned + uc, ok := GetUserContext(c) + if !ok { + c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"}) + return errors.New("unauthorized") + } + if uc.Role == models.RoleAdmin { + return nil + } + // check assignment + var cnt int64 + if err := h.db.Table("user_devices"). + Where("user_id = ? AND device_guid = ?", uc.ID, guid). + Count(&cnt).Error; err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": "permission check failed"}) + return err + } + if cnt == 0 { + c.JSON(http.StatusForbidden, gin.H{"error": "forbidden"}) + return errors.New("forbidden") + } + return nil +} + +func atoiDefault(s string, def int) int { + n, err := strconv.Atoi(s) + if err != nil { + return def + } + return n +} + +func ptr[T any](v T) *T { return &v } diff --git a/server/internal/models/task.go b/server/internal/models/task.go new file mode 100644 index 0000000..d8d033a --- /dev/null +++ b/server/internal/models/task.go @@ -0,0 +1,51 @@ +package models + +import "time" + +type TaskStatus string +type DeviceTaskType string + +const ( + TaskStatusPending TaskStatus = "pending" + TaskStatusRunning TaskStatus = "running" + TaskStatusFinished TaskStatus = "finished" + TaskStatusError TaskStatus = "error" +) + +const ( + // 1. start/stop audiostream + TaskTypeStartStream DeviceTaskType = "start_stream" + TaskTypeStopStream DeviceTaskType = "stop_stream" + + // 2. start/stop recording + TaskTypeStartRecording DeviceTaskType = "start_recording" + TaskTypeStopRecording DeviceTaskType = "stop_recording" + + // 3. change configuration (sleep timeout, jitter, recording duration) + TaskTypeUpdateConfig DeviceTaskType = "update_config" + + // 4. set deep sleep duration (minutes) + TaskTypeSetDeepSleep DeviceTaskType = "set_deep_sleep" +) + +type DEviceTask struct { + ID uint `gorm:"primaryKey"` + DeviceGUID string `gorm:"index;not null"` + Type DeviceTaskType `gorm:"type:varchar(64);not null"` + // JSON payload from server to device (parameters) + Payload string `gorm:"type:text;not null;default:'{}'"` + Status TaskStatus `gorm:"type:varchar(16);not null;index"` + // Optional error/reason from server or device + ErrorMsg string `gorm:"type:text"` + // Raw result JSON from device (success path) + Result string `gorm:"type:text"` + + // State timing + CreatedAt time.Time + UpdatedAt time.Time + StartedAt *time.Time // when device fetched/acknowledged (Running) + FinishedAt *time.Time // when device posted result (Finished/Error) + + // Optional: small attempt/lease system if you ever need retries/timeouts + // Attempts int `gorm:"not null;default:0"` +} diff --git a/server/internal/router/router.go b/server/internal/router/router.go index d833091..7d8eb91 100644 --- a/server/internal/router/router.go +++ b/server/internal/router/router.go @@ -32,6 +32,8 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine { /// --- GPS tracker handler trackersH := handlers.NewTrackersHandler(db) + tasksH := handlers.NewTasksHandler(db) + // --- Public auth r.POST("/auth/signup", authH.SignUp) r.POST("/auth/signin", authH.SignIn) @@ -55,6 +57,8 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine { r.POST("/devices/:guid/add_to_user", authMW, devH.AddToUser) r.POST("/devices/:guid/set_users", authMW, adminOnly, devH.SetUsers) r.POST("/devices/:guid/remove_from_user", authMW, devH.RemoveFromUser) + r.POST("/device/:guid/task", authMW, middleware.DeviceAccessFilter(), tasksH.CreateTask) + r.GET("/device/:guid/tasks", authMW, middleware.DeviceAccessFilter(), tasksH.ListDeviceTasks) r.POST("/records/upload", recH.Upload) r.GET("/records", authMW, recH.List) @@ -80,6 +84,11 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine { r.POST("/trackers/create", authMW, trackersH.Create) r.POST("/trackers/:guid/rename", authMW, trackersH.Rename) r.POST("/trackers/:guid/set_users", authMW, adminOnly, trackersH.SetUsers) + + // --- Device Job/Task API + r.GET("/tasks/:guid", tasksH.DeviceNextTask) // heartbeat + fetch next task + r.POST("/tasks/:guid", tasksH.DevicePostResult) // device posts result + // sensible defaults r.MaxMultipartMemory = 64 << 20 // 64 MiB _ = time.Now() // appease linters