@@ -29,13 +29,20 @@ type AgentService struct {
2929 AgentStreamMap map [uint ]AgentService_AgentStreamServer
3030 AgentStreamMutex sync.Mutex
3131 CacheAgentKey map [uint ]string
32- CacheAgentKeyMutex sync.Mutex
32+ CacheAgentKeyMutex sync.RWMutex
3333 CommandResultChannel map [string ]chan * CommandResult
3434 CommandResultChannelM sync.Mutex
3535
3636 DBConnection * database.DB
3737}
3838
39+ func (s * AgentService ) ValidateAgentKey (key string , id uint ) bool {
40+ s .CacheAgentKeyMutex .RLock ()
41+ defer s .CacheAgentKeyMutex .RUnlock ()
42+ _ , valid := utils .IsKeyPairValid (key , id , s .CacheAgentKey )
43+ return valid
44+ }
45+
3946func InitAgentService () error {
4047 var err error
4148 agentServOnce .Do (func () {
@@ -338,20 +345,35 @@ func (s *AgentService) ProcessCommand(stream PanelService_ProcessCommandServer)
338345 return status .Errorf (codes .Internal , "failed to send command to agent: %v" , err )
339346 }
340347
341- result := <- s .CommandResultChannel [cmdID ]
342- err = s .DBConnection .Upsert (
343- & models.AgentCommand {},
344- "agent_id = ? AND cmd_id = ?" ,
345- map [string ]interface {}{"command_status" : models .Executed , "result" : result .Result },
346- cmd .AgentId , cmdID ,
347- )
348- if err != nil {
349- catcher .Error ("failed to update command status" , err , map [string ]any {"process" : "agent-manager" })
350- }
348+ select {
349+ case result := <- s .CommandResultChannel [cmdID ]:
350+ err = s .DBConnection .Upsert (
351+ & models.AgentCommand {},
352+ "agent_id = ? AND cmd_id = ?" ,
353+ map [string ]interface {}{"command_status" : models .Executed , "result" : result .Result },
354+ cmd .AgentId , cmdID ,
355+ )
356+ if err != nil {
357+ catcher .Error ("failed to update command status" , err , map [string ]any {"process" : "agent-manager" })
358+ }
351359
352- err = stream .Send (result )
353- if err != nil {
354- return err
360+ err = stream .Send (result )
361+ if err != nil {
362+ return err
363+ }
364+ case <- time .After (5 * time .Minute ):
365+ s .CommandResultChannelM .Lock ()
366+ delete (s .CommandResultChannel , cmdID )
367+ s .CommandResultChannelM .Unlock ()
368+
369+ _ = s .DBConnection .Upsert (
370+ & models.AgentCommand {},
371+ "agent_id = ? AND cmd_id = ?" ,
372+ map [string ]interface {}{"command_status" : models .Error , "result" : "command timed out after 5 minutes" },
373+ cmd .AgentId , cmdID ,
374+ )
375+
376+ return status .Errorf (codes .DeadlineExceeded , "agent did not respond within 5 minutes" )
355377 }
356378
357379 s .CommandResultChannelM .Lock ()
0 commit comments