Skip to content

Commit e5a326f

Browse files
committed
refactor(modules-config): improve resilience and code organization
Add resilient module synchronization: - Implement periodic retry mechanism (5-minute interval) - Add StartPeriodicRetry for automatic recovery
1 parent 8b4743e commit e5a326f

File tree

2 files changed

+168
-82
lines changed

2 files changed

+168
-82
lines changed

plugins/modules-config/config/config.go

Lines changed: 166 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,30 @@ type pluginConnection struct {
2222
done <-chan struct{}
2323
}
2424

25+
type failedModule struct {
26+
moduleName string
27+
pluginType PluginType
28+
lastAttempt time.Time
29+
retries int
30+
lastError string
31+
}
32+
2533
type ConfigServer struct {
2634
UnimplementedConfigServiceServer
2735

28-
mu sync.RWMutex
29-
plugins map[PluginType][]*pluginConnection
30-
cache map[PluginType]*ConfigurationSection
36+
mu sync.RWMutex
37+
plugins map[PluginType][]*pluginConnection
38+
cache map[PluginType]*ConfigurationSection
39+
failedModules map[PluginType]*failedModule
40+
failedMu sync.RWMutex
3141
}
3242

3343
func GetConfigServer() *ConfigServer {
3444
configOnce.Do(func() {
3545
configServer = &ConfigServer{
36-
plugins: make(map[PluginType][]*pluginConnection),
37-
cache: make(map[PluginType]*ConfigurationSection),
46+
plugins: make(map[PluginType][]*pluginConnection),
47+
cache: make(map[PluginType]*ConfigurationSection),
48+
failedModules: make(map[PluginType]*failedModule),
3849
}
3950
})
4051
return configServer
@@ -120,96 +131,124 @@ func (s *ConfigServer) monitorDisconnect(t PluginType, conn *pluginConnection) {
120131
}
121132

122133
func (s *ConfigServer) NotifyUpdate(moduleName string, section *ConfigurationSection) {
123-
pluginType := PluginType_UNKNOWN
124-
125-
switch moduleName {
126-
case "AWS_IAM_USER":
127-
pluginType = PluginType_AWS_IAM_USER
128-
case "AZURE":
129-
pluginType = PluginType_AZURE
130-
case "BITDEFENDER":
131-
pluginType = PluginType_BITDEFENDER
132-
case "GCP":
133-
pluginType = PluginType_GCP
134-
case "O365":
135-
pluginType = PluginType_O365
136-
case "SOC_AI":
137-
pluginType = PluginType_SOC_AI
138-
case "SOPHOS":
139-
pluginType = PluginType_SOPHOS
140-
case "CROWDSTRIKE":
141-
pluginType = PluginType_CROWDSTRIKE
142-
default:
143-
catcher.Error("unknown module name", nil, map[string]any{"process": "plugin_com.utmstack.modules-config", "module": moduleName})
134+
pluginType, exists := AllModules[moduleName]
135+
if !exists {
136+
catcher.Error("unknown module name", nil, map[string]any{
137+
"process": "plugin_com.utmstack.modules-config",
138+
"module": moduleName,
139+
})
144140
return
145141
}
146142

143+
connections := s.updateCache(pluginType, section)
144+
s.clearFailedModule(pluginType, moduleName)
145+
s.notifyConnectedPlugins(connections, section, moduleName)
146+
}
147+
148+
func (s *ConfigServer) fetchModuleConfig(backend, moduleName, internalKey string) (*ConfigurationSection, int, error) {
149+
url := fmt.Sprintf("%s/api/utm-modules/module-details-decrypted?nameShort=%s&serverId=1", backend, moduleName)
150+
151+
response, status, err := utils.DoReq[ConfigurationSection](
152+
url,
153+
nil,
154+
"GET",
155+
map[string]string{"Utm-Internal-Key": internalKey},
156+
true,
157+
)
158+
159+
if err != nil || status != http.StatusOK {
160+
return nil, status, err
161+
}
162+
163+
return &response, status, nil
164+
}
165+
166+
func (s *ConfigServer) updateCache(pluginType PluginType, config *ConfigurationSection) []*pluginConnection {
147167
s.mu.Lock()
148-
s.cache[pluginType] = section
149-
connectedPlugins := append([]*pluginConnection{}, s.plugins[pluginType]...)
150-
s.mu.Unlock()
168+
defer s.mu.Unlock()
151169

152-
if len(connectedPlugins) == 0 {
153-
catcher.Info(fmt.Sprintf("No active connections for plugin type: %s", pluginType), map[string]any{"process": "plugin_com.utmstack.modules-config"})
170+
s.cache[pluginType] = config
171+
return append([]*pluginConnection{}, s.plugins[pluginType]...)
172+
}
173+
174+
func (s *ConfigServer) clearFailedModule(pluginType PluginType, moduleName string) {
175+
s.failedMu.Lock()
176+
defer s.failedMu.Unlock()
177+
178+
if _, exists := s.failedModules[pluginType]; exists {
179+
delete(s.failedModules, pluginType)
180+
catcher.Info(
181+
fmt.Sprintf("Module %s removed from retry list after successful config update", moduleName), map[string]any{
182+
"process": "plugin_com.utmstack.modules-config",
183+
"pluginType": pluginType,
184+
},
185+
)
186+
}
187+
}
188+
189+
func (s *ConfigServer) notifyConnectedPlugins(connections []*pluginConnection, config *ConfigurationSection, moduleName string) {
190+
if len(connections) == 0 {
191+
catcher.Info("No active connections for plugin type", map[string]any{
192+
"process": "plugin_com.utmstack.modules-config",
193+
"module": moduleName,
194+
},
195+
)
154196
return
155197
}
156198

157-
for _, conn := range connectedPlugins {
158-
err := conn.stream.Send(&BiDirectionalMessage{
159-
Payload: &BiDirectionalMessage_Config{
160-
Config: section,
199+
for _, conn := range connections {
200+
if err := conn.stream.Send(&BiDirectionalMessage{
201+
Payload: &BiDirectionalMessage_Config{Config: config},
202+
}); err != nil {
203+
catcher.Error("failed to send config update", err, map[string]any{
204+
"process": "plugin_com.utmstack.modules-config",
205+
"module": moduleName,
161206
},
162-
})
163-
if err != nil {
164-
catcher.Error("error sending configuration update", err, map[string]any{"process": "plugin_com.utmstack.modules-config"})
165-
continue
207+
)
166208
}
167209
}
168210
}
169211

170-
func (s *ConfigServer) syncModuleWithRetry(
171-
moduleName string,
172-
pluginType PluginType,
173-
backend string,
174-
internalKey string,
175-
) error {
212+
func (s *ConfigServer) trackFailure(pluginType PluginType, moduleName string, err error) {
213+
s.failedMu.Lock()
214+
defer s.failedMu.Unlock()
215+
216+
errMsg := "unknown error"
217+
if err != nil {
218+
errMsg = err.Error()
219+
}
220+
221+
if existing, exists := s.failedModules[pluginType]; exists {
222+
existing.retries++
223+
existing.lastAttempt = time.Now()
224+
existing.lastError = errMsg
225+
} else {
226+
s.failedModules[pluginType] = &failedModule{
227+
moduleName: moduleName,
228+
pluginType: pluginType,
229+
lastAttempt: time.Now(),
230+
retries: 1,
231+
lastError: errMsg,
232+
}
233+
}
234+
}
235+
236+
func (s *ConfigServer) syncModuleWithRetry(moduleName string, pluginType PluginType, backend string, internalKey string) error {
176237
const maxRetries = 5
177238
baseDelay := 2 * time.Second
178239

179-
url := fmt.Sprintf("%s/api/utm-modules/module-details-decrypted?nameShort=%s&serverId=1", backend, moduleName)
240+
var lastErr error
241+
var lastStatus int
180242

181243
for attempt := 0; attempt <= maxRetries; attempt++ {
182-
response, status, err := utils.DoReq[ConfigurationSection](
183-
url,
184-
nil,
185-
"GET",
186-
map[string]string{"Utm-Internal-Key": internalKey},
187-
true,
188-
)
244+
config, status, err := s.fetchModuleConfig(backend, moduleName, internalKey)
245+
lastStatus = status
246+
lastErr = err
189247

190248
if err == nil && status == http.StatusOK {
191-
s.mu.Lock()
192-
s.cache[pluginType] = &response
193-
connectedPlugins := append([]*pluginConnection{}, s.plugins[pluginType]...)
194-
s.mu.Unlock()
195-
196-
if len(connectedPlugins) > 0 {
197-
for _, conn := range connectedPlugins {
198-
if err := conn.stream.Send(&BiDirectionalMessage{
199-
Payload: &BiDirectionalMessage_Config{Config: &response},
200-
}); err != nil {
201-
catcher.Error(
202-
"failed to send late-arrival config",
203-
err,
204-
map[string]any{
205-
"process": "plugin_com.utmstack.modules-config",
206-
"module": moduleName,
207-
},
208-
)
209-
}
210-
}
211-
}
212-
249+
connections := s.updateCache(pluginType, config)
250+
s.clearFailedModule(pluginType, moduleName)
251+
s.notifyConnectedPlugins(connections, config, moduleName)
213252
return nil
214253
}
215254

@@ -219,10 +258,14 @@ func (s *ConfigServer) syncModuleWithRetry(
219258
}
220259
}
221260

222-
return catcher.Error("failed to sync module after max retries", nil, map[string]any{
223-
"process": "plugin_com.utmstack.modules-config",
224-
"module": moduleName,
225-
"retries": maxRetries + 1,
261+
s.trackFailure(pluginType, moduleName, lastErr)
262+
263+
return catcher.Error("failed to sync module after max retries, will retry periodically", lastErr, map[string]any{
264+
"process": "plugin_com.utmstack.modules-config",
265+
"module": moduleName,
266+
"pluginType": pluginType,
267+
"retries": maxRetries + 1,
268+
"status_code": lastStatus,
226269
},
227270
)
228271
}
@@ -249,8 +292,49 @@ func (s *ConfigServer) SyncConfigs(backend string, internalKey string) {
249292
}
250293

251294
if err := g.Wait(); err != nil {
252-
catcher.Error("module config sync failed", err, map[string]any{
253-
"process": "plugin_com.utmstack.modules-config",
254-
})
295+
catcher.Error("module config sync failed", err, map[string]any{"process": "plugin_com.utmstack.modules-config"})
296+
}
297+
}
298+
299+
func (s *ConfigServer) StartPeriodicRetry(backend string, internalKey string) {
300+
const retryInterval = 5 * time.Minute
301+
302+
ticker := time.NewTicker(retryInterval)
303+
defer ticker.Stop()
304+
305+
for range ticker.C {
306+
s.failedMu.RLock()
307+
toRetry := make([]*failedModule, 0, len(s.failedModules))
308+
for _, fm := range s.failedModules {
309+
toRetry = append(toRetry, fm)
310+
}
311+
s.failedMu.RUnlock()
312+
313+
if len(toRetry) == 0 {
314+
continue
315+
}
316+
317+
catcher.Info(fmt.Sprintf("Retrying %d failed module(s)", len(toRetry)), map[string]any{"process": "plugin_com.utmstack.modules-config"})
318+
319+
for _, fm := range toRetry {
320+
err := s.syncModuleWithRetry(fm.moduleName, fm.pluginType, backend, internalKey)
321+
322+
if err != nil {
323+
s.failedMu.RLock()
324+
currentRetries := 0
325+
if existing, ok := s.failedModules[fm.pluginType]; ok {
326+
currentRetries = existing.retries
327+
}
328+
s.failedMu.RUnlock()
329+
330+
catcher.Error(fmt.Sprintf("Module sync retry failed (attempt %d)", currentRetries), err, map[string]any{
331+
"process": "plugin_com.utmstack.modules-config",
332+
"module": fm.moduleName,
333+
"pluginType": fm.pluginType,
334+
"retries": currentRetries,
335+
},
336+
)
337+
}
338+
}
255339
}
256340
}

plugins/modules-config/handlers.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ func startGRPCServer() error {
2525
config.RegisterConfigServiceServer(server, config.GetConfigServer())
2626
config.GetConfigServer().SyncConfigs(BackendService, InternalKey)
2727

28+
go config.GetConfigServer().StartPeriodicRetry(BackendService, InternalKey)
29+
2830
if err := server.Serve(listener); err != nil {
2931
return catcher.Error("failed to serve grpc", err, map[string]any{"process": "plugin_com.utmstack.modules-config"})
3032
}

0 commit comments

Comments
 (0)