package main
import (
"encoding/json"
"fmt"
"math"
"sync"
"time"
"github.com/streadway/amqp"
)
type RobotTask struct {
TaskID string `json:"task_id"`
TaskType string `json:"task_type"`
Priority int `json:"priority"`
EstDuration float64 `json:"est_duration"`
AssignedTo string `json:"assigned_to"`
Status string `json:"status"`
Parameters map[string]interface{} `json:"parameters"`
CreatedAt time.Time `json:"created_at"`
Deadline time.Time `json:"deadline"`
}
type RobotInfo struct {
RobotID string `json:"robot_id"`
Brand string `json:"brand"`
Capabilities []string `json:"capabilities"`
BatteryLevel float64 `json:"battery_level"`
CurrentTask *RobotTask `json:"current_task"`
Position Position `json:"position"`
Status string `json:"status"`
LastHeartbeat time.Time `json:"last_heartbeat"`
}
type Position struct {
X float64 `json:"x"`
Y float64 `json:"y"`
Z float64 `json:"z"`
}
type UnifiedScheduler struct {
robots map[string]*RobotInfo
taskQueue chan *RobotTask
completedQueue chan *RobotTask
mu sync.RWMutex
brandAdapters map[string]BrandAdapter
mqConn *amqp.Connection
mqChannel *amqp.Channel
config SchedulerConfig
}
type BrandAdapter interface {
SendCommand(robotID string, cmd Command) error
GetStatus(robotID string) (RobotStatus, error)
RegisterRobot(robotInfo RobotInfo) error
HeartbeatCheck(robotID string) bool
}
type Command struct {
CmdType string `json:"cmd_type"`
RobotID string `json:"robot_id"`
TaskID string `json:"task_id"`
Parameters map[string]interface{} `json:"parameters"`
Timestamp time.Time `json:"timestamp"`
}
type RobotStatus struct {
RobotID string `json:"robot_id"`
IsBusy bool `json:"is_busy"`
Battery float64 `json:"battery"`
Position Position `json:"position"`
CurrentTask string `json:"current_task"`
TaskProgress float64 `json:"task_progress"`
ErrorCode int `json:"error_code"`
ErrorMsg string `json:"error_msg"`
Timestamp time.Time `json:"timestamp"`
}
type SchedulerConfig struct {
MaxRobots int `json:"max_robots"`
TaskQueueSize int `json:"task_queue_size"`
HeartbeatInterval int `json:"heartbeat_interval"`
TimeoutSeconds int `json:"timeout_seconds"`
OptimizationAlgorithm string `json:"optimization_algorithm"`
LogLevel string `json:"log_level"`
}
func NewUnifiedScheduler(config SchedulerConfig) (*UnifiedScheduler, error) {
scheduler := &UnifiedScheduler{
robots: make(map[string]*RobotInfo),
taskQueue: make(chan *RobotTask, config.TaskQueueSize),
completedQueue: make(chan *RobotTask, config.TaskQueueSize),
brandAdapters: make(map[string]BrandAdapter),
config: config,
}
err := scheduler.initMessageQueue()
if err != nil {
return nil, fmt.Errorf("初始化消息队列失败:%v", err)
}
go scheduler.monitorRobots()
go scheduler.processTasks()
go scheduler.handleCompletedTasks()
return scheduler, nil
}
func (s *UnifiedScheduler) initMessageQueue() error {
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
return err
}
ch, err := conn.Channel()
if err != nil {
conn.Close()
return err
}
err = ch.ExchangeDeclare(
"robot.scheduler",
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
ch.Close()
conn.Close()
return err
}
s.mqConn = conn
s.mqChannel = ch
return nil
}
func (s *UnifiedScheduler) RegisterBrandAdapter(brand string, adapter BrandAdapter) {
s.mu.Lock()
defer s.mu.Unlock()
s.brandAdapters[brand] = adapter
}
func (s *UnifiedScheduler) AddRobot(robot *RobotInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
if len(s.robots) >= s.config.MaxRobots {
return fmt.Errorf("已达到最大机器人数量限制:%d", s.config.MaxRobots)
}
adapter, ok := s.brandAdapters[robot.Brand]
if !ok {
return fmt.Errorf("未找到品牌适配器:%s", robot.Brand)
}
err := adapter.RegisterRobot(*robot)
if err != nil {
return fmt.Errorf("注册机器人失败:%v", err)
}
robot.Status = "idle"
robot.LastHeartbeat = time.Now()
s.robots[robot.RobotID] = robot
s.publishEvent("robot.registered", map[string]interface{}{
"robot_id": robot.RobotID,
"brand": robot.Brand,
"time": time.Now(),
})
return nil
}
func (s *UnifiedScheduler) SubmitTask(task *RobotTask) error {
task.CreatedAt = time.Now()
task.Status = "pending"
if task.Deadline.IsZero() {
task.Deadline = time.Now().Add(time.Duration(task.EstDuration*1.5) * time.Minute)
}
select {
case s.taskQueue <- task:
s.publishEvent("task.submitted", map[string]interface{}{
"task_id": task.TaskID,
"type": task.TaskType,
"time": time.Now(),
})
return nil
default:
return fmt.Errorf("任务队列已满")
}
}
func (s *UnifiedScheduler) processTasks() {
for task := range s.taskQueue {
go s.assignTask(task)
}
}
func (s *UnifiedScheduler) assignTask(task *RobotTask) {
s.mu.RLock()
candidates := make([]*RobotInfo, 0)
for _, robot := range s.robots {
if robot.Status == "idle" && contains(robot.Capabilities, task.TaskType) && robot.BatteryLevel > 20.0 {
candidates = append(candidates, robot)
}
}
s.mu.RUnlock()
if len(candidates) == 0 {
task.Status = "failed"
task.AssignedTo = ""
s.publishEvent("task.failed", map[string]interface{}{
"task_id": task.TaskID,
"reason": "无可用机器人",
"time": time.Now(),
})
return
}
bestRobot := s.findOptimalRobot(task, candidates)
if bestRobot == nil {
task.Status = "failed"
task.AssignedTo = ""
s.publishEvent("task.failed", map[string]interface{}{
"task_id": task.TaskID,
"reason": "匹配算法失败",
"time": time.Now(),
})
return
}
}