|
9 | 9 |
|
10 | 10 | "github.com/threatwinds/go-sdk/catcher" |
11 | 11 | "github.com/threatwinds/go-sdk/utils" |
| 12 | + "golang.org/x/sync/errgroup" |
12 | 13 | ) |
13 | 14 |
|
14 | 15 | var ( |
@@ -45,7 +46,11 @@ func (s *ConfigServer) GetModuleGroup(moduleName PluginType) *ConfigurationSecti |
45 | 46 |
|
46 | 47 | section, exists := s.cache[moduleName] |
47 | 48 | if !exists { |
48 | | - catcher.Error("module group not found", fmt.Errorf("module: %s", moduleName), map[string]any{"process": "plugin_com.utmstack.modules-config"}) |
| 49 | + catcher.Error("module group not found", nil, map[string]any{ |
| 50 | + "process": "plugin_com.utmstack.modules-config", |
| 51 | + "module": moduleName, |
| 52 | + }, |
| 53 | + ) |
49 | 54 | return nil |
50 | 55 | } |
51 | 56 |
|
@@ -89,7 +94,11 @@ func (s *ConfigServer) StreamConfig(stream ConfigService_StreamConfigServer) err |
89 | 94 | go s.monitorDisconnect(pluginType, conn) |
90 | 95 |
|
91 | 96 | default: |
92 | | - catcher.Error("unexpected message type", fmt.Errorf("received: %T", payload), map[string]any{"process": "plugin_com.utmstack.modules-config"}) |
| 97 | + catcher.Error("unexpected message type", nil, map[string]any{ |
| 98 | + "process": "plugin_com.utmstack.modules-config", |
| 99 | + "message_type": fmt.Sprintf("%T", payload), |
| 100 | + }, |
| 101 | + ) |
93 | 102 | } |
94 | 103 | } |
95 | 104 |
|
@@ -131,59 +140,117 @@ func (s *ConfigServer) NotifyUpdate(moduleName string, section *ConfigurationSec |
131 | 140 | case "CROWDSTRIKE": |
132 | 141 | pluginType = PluginType_CROWDSTRIKE |
133 | 142 | default: |
134 | | - _ = catcher.Error("unknown module name", fmt.Errorf("module: %s", moduleName), map[string]any{"process": "plugin_com.utmstack.modules-config"}) |
| 143 | + catcher.Error("unknown module name", nil, map[string]any{"process": "plugin_com.utmstack.modules-config", "module": moduleName}) |
135 | 144 | return |
136 | 145 | } |
137 | 146 |
|
138 | 147 | s.mu.Lock() |
139 | | - defer s.mu.Unlock() |
140 | | - |
141 | 148 | s.cache[pluginType] = section |
| 149 | + connectedPlugins := append([]*pluginConnection{}, s.plugins[pluginType]...) |
| 150 | + s.mu.Unlock() |
142 | 151 |
|
143 | | - if len(s.plugins[pluginType]) == 0 { |
| 152 | + if len(connectedPlugins) == 0 { |
144 | 153 | catcher.Info(fmt.Sprintf("No active connections for plugin type: %s", pluginType), map[string]any{"process": "plugin_com.utmstack.modules-config"}) |
145 | 154 | return |
146 | 155 | } |
147 | 156 |
|
148 | | - for _, conn := range s.plugins[pluginType] { |
| 157 | + for _, conn := range connectedPlugins { |
149 | 158 | err := conn.stream.Send(&BiDirectionalMessage{ |
150 | 159 | Payload: &BiDirectionalMessage_Config{ |
151 | 160 | Config: section, |
152 | 161 | }, |
153 | 162 | }) |
154 | 163 | if err != nil { |
155 | | - _ = catcher.Error("error sending configuration update", err, map[string]any{"process": "plugin_com.utmstack.modules-config"}) |
| 164 | + catcher.Error("error sending configuration update", err, map[string]any{"process": "plugin_com.utmstack.modules-config"}) |
156 | 165 | continue |
157 | 166 | } |
158 | 167 | } |
159 | 168 | } |
160 | 169 |
|
161 | | -func (s *ConfigServer) SyncConfigs(backend string, internalKey string) { |
162 | | - var AllModules = map[string]PluginType{ |
163 | | - "AWS_IAM_USER": PluginType_AWS_IAM_USER, |
164 | | - "AZURE": PluginType_AZURE, |
165 | | - "BITDEFENDER": PluginType_BITDEFENDER, |
166 | | - "GCP": PluginType_GCP, |
167 | | - "O365": PluginType_O365, |
168 | | - "SOC_AI": PluginType_SOC_AI, |
169 | | - "SOPHOS": PluginType_SOPHOS, |
170 | | - "CROWDSTRIKE": PluginType_CROWDSTRIKE, |
171 | | - } |
| 170 | +func (s *ConfigServer) syncModuleWithRetry( |
| 171 | + moduleName string, |
| 172 | + pluginType PluginType, |
| 173 | + backend string, |
| 174 | + internalKey string, |
| 175 | +) error { |
| 176 | + const maxRetries = 5 |
| 177 | + baseDelay := 2 * time.Second |
172 | 178 |
|
173 | | - for name, t := range AllModules { |
174 | | - url := fmt.Sprintf("%s/api/utm-modules/module-details-decrypted?nameShort=%s&serverId=1", backend, name) |
175 | | - |
176 | | - for { |
177 | | - response, status, err := utils.DoReq[ConfigurationSection](url, nil, "GET", map[string]string{"Utm-Internal-Key": internalKey}, true) |
178 | | - if err == nil && status == http.StatusOK { |
179 | | - s.mu.Lock() |
180 | | - s.cache[t] = &response |
181 | | - s.mu.Unlock() |
182 | | - break |
| 179 | + url := fmt.Sprintf("%s/api/utm-modules/module-details-decrypted?nameShort=%s&serverId=1", backend, moduleName) |
| 180 | + |
| 181 | + for attempt := 0; attempt <= maxRetries; attempt++ { |
| 182 | + response, status, err := utils.DoReq[ConfigurationSection]( |
| 183 | + url, |
| 184 | + nil, |
| 185 | + "GET", |
| 186 | + map[string]string{"Utm-Internal-Key": internalKey}, |
| 187 | + true, |
| 188 | + ) |
| 189 | + |
| 190 | + if err == nil && status == http.StatusOK { |
| 191 | + s.mu.Lock() |
| 192 | + s.cache[pluginType] = &response |
| 193 | + connectedPlugins := append([]*pluginConnection{}, s.plugins[pluginType]...) |
| 194 | + s.mu.Unlock() |
| 195 | + |
| 196 | + if len(connectedPlugins) > 0 { |
| 197 | + for _, conn := range connectedPlugins { |
| 198 | + if err := conn.stream.Send(&BiDirectionalMessage{ |
| 199 | + Payload: &BiDirectionalMessage_Config{Config: &response}, |
| 200 | + }); err != nil { |
| 201 | + catcher.Error( |
| 202 | + "failed to send late-arrival config", |
| 203 | + err, |
| 204 | + map[string]any{ |
| 205 | + "process": "plugin_com.utmstack.modules-config", |
| 206 | + "module": moduleName, |
| 207 | + }, |
| 208 | + ) |
| 209 | + } |
| 210 | + } |
183 | 211 | } |
184 | 212 |
|
185 | | - fmt.Printf("Error fetching configuration for %s: %v, status code: %d. Retrying...\n", name, err, status) |
186 | | - time.Sleep(5 * time.Second) |
| 213 | + return nil |
| 214 | + } |
| 215 | + |
| 216 | + if attempt < maxRetries { |
| 217 | + delay := time.Duration(1<<attempt) * baseDelay |
| 218 | + time.Sleep(delay) |
187 | 219 | } |
188 | 220 | } |
| 221 | + |
| 222 | + return catcher.Error("failed to sync module after max retries", nil, map[string]any{ |
| 223 | + "process": "plugin_com.utmstack.modules-config", |
| 224 | + "module": moduleName, |
| 225 | + "retries": maxRetries + 1, |
| 226 | + }, |
| 227 | + ) |
| 228 | +} |
| 229 | + |
| 230 | +var AllModules = map[string]PluginType{ |
| 231 | + "AWS_IAM_USER": PluginType_AWS_IAM_USER, |
| 232 | + "AZURE": PluginType_AZURE, |
| 233 | + "BITDEFENDER": PluginType_BITDEFENDER, |
| 234 | + "GCP": PluginType_GCP, |
| 235 | + "O365": PluginType_O365, |
| 236 | + "SOC_AI": PluginType_SOC_AI, |
| 237 | + "SOPHOS": PluginType_SOPHOS, |
| 238 | + "CROWDSTRIKE": PluginType_CROWDSTRIKE, |
| 239 | +} |
| 240 | + |
| 241 | +func (s *ConfigServer) SyncConfigs(backend string, internalKey string) { |
| 242 | + g := errgroup.Group{} |
| 243 | + g.SetLimit(4) |
| 244 | + |
| 245 | + for name, t := range AllModules { |
| 246 | + g.Go(func() error { |
| 247 | + return s.syncModuleWithRetry(name, t, backend, internalKey) |
| 248 | + }) |
| 249 | + } |
| 250 | + |
| 251 | + if err := g.Wait(); err != nil { |
| 252 | + catcher.Error("module config sync failed", err, map[string]any{ |
| 253 | + "process": "plugin_com.utmstack.modules-config", |
| 254 | + }) |
| 255 | + } |
189 | 256 | } |
0 commit comments