created tasks endpoint for devices
This commit is contained in:
@@ -19,5 +19,6 @@ func AutoMigrate(db *gorm.DB) error {
|
|||||||
&models.UserDevice{},
|
&models.UserDevice{},
|
||||||
&models.Tracker{},
|
&models.Tracker{},
|
||||||
&models.UserTracker{},
|
&models.UserTracker{},
|
||||||
|
&models.DEviceTask{},
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
57
server/internal/dto/task.go
Normal file
57
server/internal/dto/task.go
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
package dto
|
||||||
|
|
||||||
|
import (
|
||||||
|
"smoop-api/internal/models"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TaskDto struct {
|
||||||
|
ID uint `json:"id"`
|
||||||
|
DeviceGUID string `json:"deviceGuid"`
|
||||||
|
Type models.DeviceTaskType `json:"type"`
|
||||||
|
Payload string `json:"payload"` // raw JSON string
|
||||||
|
Status models.TaskStatus `json:"status"`
|
||||||
|
ErrorMsg string `json:"error,omitempty"`
|
||||||
|
Result string `json:"result,omitempty"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
StartedAt *time.Time `json:"startedAt,omitempty"`
|
||||||
|
FinishedAt *time.Time `json:"finishedAt,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func MapTask(t models.DEviceTask) TaskDto {
|
||||||
|
return TaskDto{
|
||||||
|
ID: t.ID,
|
||||||
|
DeviceGUID: t.DeviceGUID,
|
||||||
|
Type: t.Type,
|
||||||
|
Payload: t.Payload,
|
||||||
|
Status: t.Status,
|
||||||
|
ErrorMsg: t.ErrorMsg,
|
||||||
|
Result: t.Result,
|
||||||
|
CreatedAt: t.CreatedAt,
|
||||||
|
StartedAt: t.StartedAt,
|
||||||
|
FinishedAt: t.FinishedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a new task (server/user -> device)
|
||||||
|
type CreateTaskDto struct {
|
||||||
|
Type models.DeviceTaskType `json:"type" binding:"required,oneof=start_stream stop_stream start_recording stop_recording update_config set_deep_sleep"`
|
||||||
|
// Pass raw JSON string as payload (e.g. {"sleepTimeout":5,"jitterMs":50,"recordingDurationSec":60})
|
||||||
|
// Keep it string to let device/server evolve freely.
|
||||||
|
Payload string `json:"payload" binding:"required"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Device polls: single next task
|
||||||
|
type NextTaskResponseDto struct {
|
||||||
|
HasTask bool `json:"hasTask"`
|
||||||
|
Task *TaskDto `json:"task,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Device posts result
|
||||||
|
type TaskResultDto struct {
|
||||||
|
TaskID uint `json:"taskId" binding:"required"`
|
||||||
|
// success=true => Finished, success=false => Error
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Result string `json:"result"` // raw JSON result from device
|
||||||
|
ErrorMsg string `json:"error"` // device-side reason if !Success
|
||||||
|
}
|
||||||
251
server/internal/handlers/tasks.go
Normal file
251
server/internal/handlers/tasks.go
Normal file
@@ -0,0 +1,251 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"net/http"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/gin-gonic/gin"
|
||||||
|
"gorm.io/gorm"
|
||||||
|
|
||||||
|
"smoop-api/internal/dto"
|
||||||
|
"smoop-api/internal/models"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TasksHandler struct {
|
||||||
|
db *gorm.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewTasksHandler(db *gorm.DB) *TasksHandler { return &TasksHandler{db: db} }
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// 1) Device heartbeat + fetch next task
|
||||||
|
// GET /tasks/:guid
|
||||||
|
// Returns 204 (no task) OR next pending task (and marks it Running)
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
func (h *TasksHandler) DeviceNextTask(c *gin.Context) {
|
||||||
|
guid := c.Param("guid")
|
||||||
|
|
||||||
|
// Optional: verify device exists to avoid orphan tasks
|
||||||
|
var dev models.Device
|
||||||
|
if err := h.db.First(&dev, "guid = ?", guid).Error; err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
c.JSON(http.StatusNotFound, gin.H{"error": "device not found"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var task models.DEviceTask
|
||||||
|
now := time.Now()
|
||||||
|
|
||||||
|
// Atomically pick the oldest pending task for device and mark Running
|
||||||
|
err := h.db.Transaction(func(tx *gorm.DB) error {
|
||||||
|
// FOR UPDATE SKIP LOCKED emulation is not universal in GORM;
|
||||||
|
// simplest: select oldest pending, then optimistic update.
|
||||||
|
if err := tx.
|
||||||
|
// NOTE: if using Postgres, you can append `Clauses(clause.Locking{Strength: "UPDATE SKIP LOCKED"})`
|
||||||
|
// for better concurrency. Keeping generic here.
|
||||||
|
Where("device_guid = ? AND status = ?", guid, models.TaskStatusPending).
|
||||||
|
Order("created_at ASC").
|
||||||
|
Take(&task).Error; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
task.Status = models.TaskStatusRunning
|
||||||
|
task.StartedAt = &now
|
||||||
|
return tx.Save(&task).Error
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
// Heartbeat: no task right now
|
||||||
|
c.Status(http.StatusNoContent)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "update failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, dto.NextTaskResponseDto{
|
||||||
|
HasTask: true,
|
||||||
|
Task: ptr(dto.MapTask(task)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// 2) Device posts result
|
||||||
|
// POST /tasks/:guid
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
func (h *TasksHandler) DevicePostResult(c *gin.Context) {
|
||||||
|
guid := c.Param("guid")
|
||||||
|
var req dto.TaskResultDto
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var task models.DEviceTask
|
||||||
|
if err := h.db.Where("id = ? AND device_guid = ?", req.TaskID, guid).First(&task).Error; err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
c.JSON(http.StatusNotFound, gin.H{"error": "task not found"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Only Running tasks can be completed
|
||||||
|
if task.Status != models.TaskStatusRunning {
|
||||||
|
c.JSON(http.StatusConflict, gin.H{"error": "task not in running state"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
if req.Success {
|
||||||
|
task.Status = models.TaskStatusFinished
|
||||||
|
task.Result = req.Result
|
||||||
|
task.ErrorMsg = ""
|
||||||
|
} else {
|
||||||
|
task.Status = models.TaskStatusError
|
||||||
|
task.Result = req.Result
|
||||||
|
task.ErrorMsg = req.ErrorMsg
|
||||||
|
}
|
||||||
|
task.FinishedAt = &now
|
||||||
|
|
||||||
|
if err := h.db.Save(&task).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "save failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.JSON(http.StatusOK, dto.MapTask(task))
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// 3) Create task for device (user/admin creates work item)
|
||||||
|
// POST /device/:guid/task
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
func (h *TasksHandler) CreateTask(c *gin.Context) {
|
||||||
|
guid := c.Param("guid")
|
||||||
|
|
||||||
|
// Permission check: device must be visible for user (DeviceAccessFilter already applied)
|
||||||
|
if err := h.requireDeviceVisible(c, guid); err != nil {
|
||||||
|
// err already wrote response
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var req dto.CreateTaskDto
|
||||||
|
if err := c.ShouldBindJSON(&req); err != nil {
|
||||||
|
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate that device exists
|
||||||
|
var d models.Device
|
||||||
|
if err := h.db.Where("guid = ?", guid).First(&d).Error; err != nil {
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
c.JSON(http.StatusNotFound, gin.H{"error": "device not found"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "lookup failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
task := models.DEviceTask{
|
||||||
|
DeviceGUID: guid,
|
||||||
|
Type: req.Type,
|
||||||
|
Payload: req.Payload, // raw JSON string
|
||||||
|
Status: models.TaskStatusPending,
|
||||||
|
}
|
||||||
|
if err := h.db.Create(&task).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "create failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusCreated, dto.MapTask(task))
|
||||||
|
}
|
||||||
|
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
// 4) List tasks for device (with status & pagination)
|
||||||
|
// GET /device/:guid/tasks?offset=0&limit=50&status=pending|running|finished|error
|
||||||
|
// -----------------------------------------------------------------------------
|
||||||
|
func (h *TasksHandler) ListDeviceTasks(c *gin.Context) {
|
||||||
|
guid := c.Param("guid")
|
||||||
|
|
||||||
|
// Permission check
|
||||||
|
if err := h.requireDeviceVisible(c, guid); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
offset := atoiDefault(c.DefaultQuery("offset", "0"), 0)
|
||||||
|
limit := atoiDefault(c.DefaultQuery("limit", "50"), 50)
|
||||||
|
if limit <= 0 || limit > 200 {
|
||||||
|
limit = 50
|
||||||
|
}
|
||||||
|
filterStatus := c.Query("status") // optional
|
||||||
|
|
||||||
|
q := h.db.Model(&models.DEviceTask{}).Where("device_guid = ?", guid)
|
||||||
|
if filterStatus != "" {
|
||||||
|
q = q.Where("status = ?", filterStatus)
|
||||||
|
}
|
||||||
|
|
||||||
|
var total int64
|
||||||
|
if err := q.Count(&total).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "count failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var tasks []models.DEviceTask
|
||||||
|
if err := q.Order("id DESC").Offset(offset).Limit(limit).Find(&tasks).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "query failed"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
out := make([]dto.TaskDto, 0, len(tasks))
|
||||||
|
for _, t := range tasks {
|
||||||
|
out = append(out, dto.MapTask(t))
|
||||||
|
}
|
||||||
|
c.JSON(http.StatusOK, gin.H{
|
||||||
|
"tasks": out,
|
||||||
|
"offset": offset,
|
||||||
|
"limit": limit,
|
||||||
|
"total": total,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- helpers -----------------------------------------------------------------
|
||||||
|
|
||||||
|
func (h *TasksHandler) requireDeviceVisible(c *gin.Context, guid string) error {
|
||||||
|
// Admins can always see it; regular users must be assigned
|
||||||
|
uc, ok := GetUserContext(c)
|
||||||
|
if !ok {
|
||||||
|
c.JSON(http.StatusUnauthorized, gin.H{"error": "unauthorized"})
|
||||||
|
return errors.New("unauthorized")
|
||||||
|
}
|
||||||
|
if uc.Role == models.RoleAdmin {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// check assignment
|
||||||
|
var cnt int64
|
||||||
|
if err := h.db.Table("user_devices").
|
||||||
|
Where("user_id = ? AND device_guid = ?", uc.ID, guid).
|
||||||
|
Count(&cnt).Error; err != nil {
|
||||||
|
c.JSON(http.StatusInternalServerError, gin.H{"error": "permission check failed"})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if cnt == 0 {
|
||||||
|
c.JSON(http.StatusForbidden, gin.H{"error": "forbidden"})
|
||||||
|
return errors.New("forbidden")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func atoiDefault(s string, def int) int {
|
||||||
|
n, err := strconv.Atoi(s)
|
||||||
|
if err != nil {
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
|
||||||
|
func ptr[T any](v T) *T { return &v }
|
||||||
51
server/internal/models/task.go
Normal file
51
server/internal/models/task.go
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
package models
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type TaskStatus string
|
||||||
|
type DeviceTaskType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
TaskStatusPending TaskStatus = "pending"
|
||||||
|
TaskStatusRunning TaskStatus = "running"
|
||||||
|
TaskStatusFinished TaskStatus = "finished"
|
||||||
|
TaskStatusError TaskStatus = "error"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// 1. start/stop audiostream
|
||||||
|
TaskTypeStartStream DeviceTaskType = "start_stream"
|
||||||
|
TaskTypeStopStream DeviceTaskType = "stop_stream"
|
||||||
|
|
||||||
|
// 2. start/stop recording
|
||||||
|
TaskTypeStartRecording DeviceTaskType = "start_recording"
|
||||||
|
TaskTypeStopRecording DeviceTaskType = "stop_recording"
|
||||||
|
|
||||||
|
// 3. change configuration (sleep timeout, jitter, recording duration)
|
||||||
|
TaskTypeUpdateConfig DeviceTaskType = "update_config"
|
||||||
|
|
||||||
|
// 4. set deep sleep duration (minutes)
|
||||||
|
TaskTypeSetDeepSleep DeviceTaskType = "set_deep_sleep"
|
||||||
|
)
|
||||||
|
|
||||||
|
type DEviceTask struct {
|
||||||
|
ID uint `gorm:"primaryKey"`
|
||||||
|
DeviceGUID string `gorm:"index;not null"`
|
||||||
|
Type DeviceTaskType `gorm:"type:varchar(64);not null"`
|
||||||
|
// JSON payload from server to device (parameters)
|
||||||
|
Payload string `gorm:"type:text;not null;default:'{}'"`
|
||||||
|
Status TaskStatus `gorm:"type:varchar(16);not null;index"`
|
||||||
|
// Optional error/reason from server or device
|
||||||
|
ErrorMsg string `gorm:"type:text"`
|
||||||
|
// Raw result JSON from device (success path)
|
||||||
|
Result string `gorm:"type:text"`
|
||||||
|
|
||||||
|
// State timing
|
||||||
|
CreatedAt time.Time
|
||||||
|
UpdatedAt time.Time
|
||||||
|
StartedAt *time.Time // when device fetched/acknowledged (Running)
|
||||||
|
FinishedAt *time.Time // when device posted result (Finished/Error)
|
||||||
|
|
||||||
|
// Optional: small attempt/lease system if you ever need retries/timeouts
|
||||||
|
// Attempts int `gorm:"not null;default:0"`
|
||||||
|
}
|
||||||
@@ -32,6 +32,8 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
|
|||||||
/// --- GPS tracker handler
|
/// --- GPS tracker handler
|
||||||
trackersH := handlers.NewTrackersHandler(db)
|
trackersH := handlers.NewTrackersHandler(db)
|
||||||
|
|
||||||
|
tasksH := handlers.NewTasksHandler(db)
|
||||||
|
|
||||||
// --- Public auth
|
// --- Public auth
|
||||||
r.POST("/auth/signup", authH.SignUp)
|
r.POST("/auth/signup", authH.SignUp)
|
||||||
r.POST("/auth/signin", authH.SignIn)
|
r.POST("/auth/signin", authH.SignIn)
|
||||||
@@ -55,6 +57,8 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
|
|||||||
r.POST("/devices/:guid/add_to_user", authMW, devH.AddToUser)
|
r.POST("/devices/:guid/add_to_user", authMW, devH.AddToUser)
|
||||||
r.POST("/devices/:guid/set_users", authMW, adminOnly, devH.SetUsers)
|
r.POST("/devices/:guid/set_users", authMW, adminOnly, devH.SetUsers)
|
||||||
r.POST("/devices/:guid/remove_from_user", authMW, devH.RemoveFromUser)
|
r.POST("/devices/:guid/remove_from_user", authMW, devH.RemoveFromUser)
|
||||||
|
r.POST("/device/:guid/task", authMW, middleware.DeviceAccessFilter(), tasksH.CreateTask)
|
||||||
|
r.GET("/device/:guid/tasks", authMW, middleware.DeviceAccessFilter(), tasksH.ListDeviceTasks)
|
||||||
|
|
||||||
r.POST("/records/upload", recH.Upload)
|
r.POST("/records/upload", recH.Upload)
|
||||||
r.GET("/records", authMW, recH.List)
|
r.GET("/records", authMW, recH.List)
|
||||||
@@ -80,6 +84,11 @@ func Build(db *gorm.DB, minio *minio.Client, cfg *config.Config) *gin.Engine {
|
|||||||
r.POST("/trackers/create", authMW, trackersH.Create)
|
r.POST("/trackers/create", authMW, trackersH.Create)
|
||||||
r.POST("/trackers/:guid/rename", authMW, trackersH.Rename)
|
r.POST("/trackers/:guid/rename", authMW, trackersH.Rename)
|
||||||
r.POST("/trackers/:guid/set_users", authMW, adminOnly, trackersH.SetUsers)
|
r.POST("/trackers/:guid/set_users", authMW, adminOnly, trackersH.SetUsers)
|
||||||
|
|
||||||
|
// --- Device Job/Task API
|
||||||
|
r.GET("/tasks/:guid", tasksH.DeviceNextTask) // heartbeat + fetch next task
|
||||||
|
r.POST("/tasks/:guid", tasksH.DevicePostResult) // device posts result
|
||||||
|
|
||||||
// sensible defaults
|
// sensible defaults
|
||||||
r.MaxMultipartMemory = 64 << 20 // 64 MiB
|
r.MaxMultipartMemory = 64 << 20 // 64 MiB
|
||||||
_ = time.Now() // appease linters
|
_ = time.Now() // appease linters
|
||||||
|
|||||||
Reference in New Issue
Block a user