Skip to content

Commit 666d6ac

Browse files
committed
fix: Rewrite regristy concurrency to avoid deadlocks
1 parent db41e19 commit 666d6ac

2 files changed

Lines changed: 114 additions & 67 deletions

File tree

internal/cli/parse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ type parseOpts struct {
2727
type parserFactory func() (source.Parser, error)
2828

2929
func newParseCmd() *cobra.Command {
30-
opts := parseOpts{maxDepth: 10, maxNodes: 100}
30+
opts := parseOpts{maxDepth: 10, maxNodes: 5000}
3131

3232
cmd := &cobra.Command{
3333
Use: "parse",

pkg/source/registry.go

Lines changed: 113 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ type RepoInfo struct {
3030
Version string
3131
ProjectURLs map[string]string
3232
HomePage string
33-
ManifestFile string // e.g., "Cargo.toml", "pyproject.toml", "package.json"
33+
ManifestFile string
3434
}
3535

3636
type Options struct {
@@ -68,18 +68,6 @@ type PackageInfo interface {
6868

6969
type fetchFunc[T PackageInfo] func(ctx context.Context, name string, refresh bool) (T, error)
7070

71-
type job struct {
72-
name string
73-
depth int
74-
}
75-
76-
type result[T PackageInfo] struct {
77-
name string
78-
info T
79-
depth int
80-
err error
81-
}
82-
8371
func Parse[T PackageInfo](ctx context.Context, root string, opts Options, fetch fetchFunc[T]) (*dag.DAG, error) {
8472
opts = opts.withDefaults()
8573

@@ -90,13 +78,26 @@ func Parse[T PackageInfo](ctx context.Context, root string, opts Options, fetch
9078
g: dag.New(nil),
9179
visited: make(map[string]bool),
9280
meta: make(map[string]map[string]any),
93-
jobs: make(chan job, numWorkers),
94-
results: make(chan result[T], numWorkers),
81+
jobs: make(chan job, numWorkers*2),
82+
results: make(chan result[T], numWorkers*2),
83+
done: make(chan struct{}),
9584
}
9685

9786
return p.parse(root)
9887
}
9988

89+
type job struct {
90+
name string
91+
depth int
92+
}
93+
94+
type result[T PackageInfo] struct {
95+
name string
96+
info T
97+
depth int
98+
err error
99+
}
100+
100101
type parser[T PackageInfo] struct {
101102
ctx context.Context
102103
opts Options
@@ -108,30 +109,40 @@ type parser[T PackageInfo] struct {
108109

109110
jobs chan job
110111
results chan result[T]
112+
done chan struct{}
113+
114+
mu sync.Mutex
115+
inflight int64
116+
nodeCount int32
117+
}
111118

112-
mu sync.Mutex
113-
active int
119+
func (p *parser[T]) adjustInflight(delta int64) {
120+
p.mu.Lock()
121+
p.inflight += delta
122+
isDone := p.inflight == 0
123+
p.mu.Unlock()
124+
125+
if isDone {
126+
close(p.done)
127+
}
114128
}
115129

116130
func (p *parser[T]) parse(root string) (*dag.DAG, error) {
117-
var wg sync.WaitGroup
131+
var workerWg sync.WaitGroup
118132
for range numWorkers {
119-
wg.Add(1)
133+
workerWg.Add(1)
120134
go func() {
121-
defer wg.Done()
135+
defer workerWg.Done()
122136
p.worker()
123137
}()
124138
}
125139

126-
go func() {
127-
wg.Wait()
128-
close(p.results)
129-
}()
130-
131140
p.submit(job{name: root, depth: 0})
132141

133142
rootErr := p.processResults(root)
143+
134144
close(p.jobs)
145+
workerWg.Wait()
135146

136147
if rootErr != nil {
137148
return nil, rootErr
@@ -144,92 +155,128 @@ func (p *parser[T]) parse(root string) (*dag.DAG, error) {
144155
func (p *parser[T]) worker() {
145156
for j := range p.jobs {
146157
if p.ctx.Err() != nil {
147-
return
158+
p.adjustInflight(-1) // job cancelled
159+
continue
148160
}
149161
info, err := p.fetch(p.ctx, j.name, p.opts.Refresh)
150162
p.results <- result[T]{name: j.name, info: info, depth: j.depth, err: err}
151163
}
152164
}
153165

154-
func (p *parser[T]) submit(j job) {
166+
func (p *parser[T]) submit(j job) bool {
155167
p.mu.Lock()
156168
if p.visited[j.name] {
157169
p.mu.Unlock()
158-
return
170+
return false
159171
}
160172
p.visited[j.name] = true
161-
p.active++
173+
p.inflight++
162174
p.mu.Unlock()
163175

164176
p.jobs <- j
177+
return true
165178
}
166179

167180
func (p *parser[T]) processResults(root string) error {
168-
for r := range p.results {
169-
p.decrementActive()
170-
171-
if r.err != nil {
172-
if r.name == root {
173-
return r.err
181+
for {
182+
select {
183+
case r := <-p.results:
184+
if err := p.handleResult(r, root); err != nil {
185+
return err
174186
}
175-
p.opts.Logger("failed to fetch %s: %v", r.name, r.err)
176-
} else {
177-
p.processResult(r)
187+
188+
case <-p.done:
189+
return nil
190+
191+
case <-p.ctx.Done():
192+
return p.ctx.Err()
178193
}
194+
}
195+
}
179196

180-
if p.isDone() {
181-
break
197+
func (p *parser[T]) handleResult(r result[T], root string) error {
198+
defer p.adjustInflight(-1)
199+
200+
if r.err != nil {
201+
if r.name == root {
202+
return r.err
182203
}
204+
p.opts.Logger("failed to fetch %s: %v", r.name, r.err)
205+
return nil
183206
}
207+
208+
p.addNode(r)
209+
p.submitDependencies(r)
184210
return nil
185211
}
186212

187-
func (p *parser[T]) processResult(r result[T]) {
213+
func (p *parser[T]) addNode(r result[T]) {
188214
_ = p.g.AddNode(dag.Node{ID: r.name})
189215

216+
p.mu.Lock()
217+
p.nodeCount++
218+
p.mu.Unlock()
219+
190220
meta := enrichMetadata(p.ctx, r.info, p.opts)
191-
p.storeMeta(r.name, meta)
221+
if len(meta) > 0 {
222+
p.mu.Lock()
223+
p.meta[r.name] = meta
224+
p.mu.Unlock()
225+
}
226+
}
192227

193-
if r.depth >= p.opts.MaxDepth || len(p.visited) >= p.opts.MaxNodes {
228+
func (p *parser[T]) submitDependencies(r result[T]) {
229+
if r.depth >= p.opts.MaxDepth {
194230
return
195231
}
196232

197-
for _, dep := range r.info.GetDependencies() {
198-
_ = p.g.AddNode(dag.Node{ID: dep})
199-
_ = p.g.AddEdge(dag.Edge{From: r.name, To: dep})
200-
p.submit(job{name: dep, depth: r.depth + 1})
233+
deps := r.info.GetDependencies()
234+
if len(deps) == 0 {
235+
return
201236
}
202-
}
203237

204-
func (p *parser[T]) applyMetadata() {
238+
// Add edges and collect jobs
205239
p.mu.Lock()
206-
defer p.mu.Unlock()
207-
for id, m := range p.meta {
208-
if n, ok := p.g.Node(id); ok {
209-
n.Meta = m
240+
nodeCount := p.nodeCount
241+
p.mu.Unlock()
242+
243+
var toSubmit []job
244+
for _, dep := range deps {
245+
_ = p.g.AddNode(dag.Node{ID: dep})
246+
_ = p.g.AddEdge(dag.Edge{From: r.name, To: dep})
247+
248+
if int(nodeCount) < p.opts.MaxNodes {
249+
toSubmit = append(toSubmit, job{name: dep, depth: r.depth + 1})
210250
}
211251
}
212-
}
213252

214-
func (p *parser[T]) decrementActive() {
253+
if len(toSubmit) == 0 {
254+
return
255+
}
256+
257+
// Reserve a slot for the async submitter BEFORE spawning it.
258+
// This prevents processResults from exiting prematurely.
215259
p.mu.Lock()
216-
p.active--
260+
p.inflight++
217261
p.mu.Unlock()
262+
263+
go func() {
264+
defer p.adjustInflight(-1) // release slot when done submitting
265+
266+
for _, j := range toSubmit {
267+
p.submit(j)
268+
}
269+
}()
218270
}
219271

220-
func (p *parser[T]) isDone() bool {
272+
func (p *parser[T]) applyMetadata() {
221273
p.mu.Lock()
222274
defer p.mu.Unlock()
223-
return p.active == 0
224-
}
225-
226-
func (p *parser[T]) storeMeta(name string, meta map[string]any) {
227-
if len(meta) == 0 {
228-
return
275+
for id, m := range p.meta {
276+
if n, ok := p.g.Node(id); ok {
277+
n.Meta = m
278+
}
229279
}
230-
p.mu.Lock()
231-
p.meta[name] = meta
232-
p.mu.Unlock()
233280
}
234281

235282
func enrichMetadata(ctx context.Context, info PackageInfo, opts Options) map[string]any {

0 commit comments

Comments
 (0)