Skip to content

Commit 26a5710

Browse files
authored
basichost: fix deadlock with addrs_manager (#3348)
1 parent 8e8f76a commit 26a5710

File tree

7 files changed

+96
-38
lines changed

7 files changed

+96
-38
lines changed

config/config.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,13 @@ func (cfg *Config) NewNode() (host.Host, error) {
623623
}
624624

625625
if cfg.Routing != nil {
626-
return &closableRoutedHost{App: app, RoutedHost: rh}, nil
626+
return &closableRoutedHost{
627+
closableBasicHost: closableBasicHost{
628+
App: app,
629+
BasicHost: bh,
630+
},
631+
RoutedHost: rh,
632+
}, nil
627633
}
628634
return &closableBasicHost{App: app, BasicHost: bh}, nil
629635
}

config/host.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@ func (h *closableBasicHost) Close() error {
2020
}
2121

2222
type closableRoutedHost struct {
23-
*fx.App
23+
// closableBasicHost is embedded here so that interface assertions on
24+
// BasicHost exported methods work correctly.
25+
closableBasicHost
2426
*routed.RoutedHost
2527
}
2628

2729
func (h *closableRoutedHost) Close() error {
2830
_ = h.App.Stop(context.Background())
31+
// The routed host will close the basic host
2932
return h.RoutedHost.Close()
3033
}

libp2p_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,23 @@ func TestCustomTCPDialer(t *testing.T) {
812812
require.ErrorContains(t, err, expectedErr.Error())
813813
}
814814

815+
func TestBasicHostInterfaceAssertion(t *testing.T) {
816+
mockRouter := &mockPeerRouting{}
817+
h, err := New(
818+
NoListenAddrs,
819+
Routing(func(host.Host) (routing.PeerRouting, error) { return mockRouter, nil }),
820+
DisableRelay(),
821+
)
822+
require.NoError(t, err)
823+
defer h.Close()
824+
825+
require.NotNil(t, h)
826+
require.NotEmpty(t, h.ID())
827+
828+
_, ok := h.(interface{ AllAddrs() []ma.Multiaddr })
829+
require.True(t, ok)
830+
}
831+
815832
func BenchmarkAllAddrs(b *testing.B) {
816833
h, err := New()
817834

p2p/host/basic/addrs_manager.go

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@ type addrsManager struct {
5353
addrsUpdatedChan chan struct{}
5454

5555
// triggerAddrsUpdateChan is used to trigger an addresses update.
56-
triggerAddrsUpdateChan chan struct{}
56+
triggerAddrsUpdateChan chan chan struct{}
57+
// started is used to check whether the addrsManager has started.
58+
started atomic.Bool
5759
// triggerReachabilityUpdate is notified when reachable addrs are updated.
5860
triggerReachabilityUpdate chan struct{}
5961

@@ -87,7 +89,7 @@ func newAddrsManager(
8789
observedAddrsManager: observedAddrsManager,
8890
natManager: natmgr,
8991
addrsFactory: addrsFactory,
90-
triggerAddrsUpdateChan: make(chan struct{}, 1),
92+
triggerAddrsUpdateChan: make(chan chan struct{}, 1),
9193
triggerReachabilityUpdate: make(chan struct{}, 1),
9294
addrsUpdatedChan: addrsUpdatedChan,
9395
interfaceAddrs: &interfaceAddrsCache{},
@@ -115,7 +117,6 @@ func (a *addrsManager) Start() error {
115117
return fmt.Errorf("error starting addrs reachability tracker: %s", err)
116118
}
117119
}
118-
119120
return a.startBackgroundWorker()
120121
}
121122

@@ -140,16 +141,24 @@ func (a *addrsManager) NetNotifee() network.Notifiee {
140141
// Updating addrs in sync provides the nice property that
141142
// host.Addrs() just after host.Network().Listen(x) will return x
142143
return &network.NotifyBundle{
143-
ListenF: func(network.Network, ma.Multiaddr) { a.triggerAddrsUpdate() },
144-
ListenCloseF: func(network.Network, ma.Multiaddr) { a.triggerAddrsUpdate() },
144+
ListenF: func(network.Network, ma.Multiaddr) { a.updateAddrsSync() },
145+
ListenCloseF: func(network.Network, ma.Multiaddr) { a.updateAddrsSync() },
145146
}
146147
}
147148

148-
func (a *addrsManager) triggerAddrsUpdate() {
149-
a.updateAddrs(false, nil)
149+
func (a *addrsManager) updateAddrsSync() {
150+
// This prevents a deadlock where addrs updates before starting the manager are ignored
151+
if !a.started.Load() {
152+
return
153+
}
154+
ch := make(chan struct{})
150155
select {
151-
case a.triggerAddrsUpdateChan <- struct{}{}:
152-
default:
156+
case a.triggerAddrsUpdateChan <- ch:
157+
select {
158+
case <-ch:
159+
case <-a.ctx.Done():
160+
}
161+
case <-a.ctx.Done():
153162
}
154163
}
155164

@@ -177,7 +186,7 @@ func (a *addrsManager) startBackgroundWorker() error {
177186
}
178187
err2 := autonatReachabilitySub.Close()
179188
if err2 != nil {
180-
err2 = fmt.Errorf("error closing autonat reachability: %w", err1)
189+
err2 = fmt.Errorf("error closing autonat reachability: %w", err2)
181190
}
182191
err = fmt.Errorf("error subscribing to autonat reachability: %s", err)
183192
return errors.Join(err, err1, err2)
@@ -200,9 +209,11 @@ func (a *addrsManager) startBackgroundWorker() error {
200209
}
201210
default:
202211
}
212+
// this ensures that listens concurrent with Start are reflected correctly after Start exits.
213+
a.started.Store(true)
203214
// update addresses before starting the worker loop. This ensures that any address updates
204215
// before calling addrsManager.Start are correctly reported after Start returns.
205-
a.updateAddrs(true, relayAddrs)
216+
a.updateAddrs(relayAddrs)
206217

207218
a.wg.Add(1)
208219
go a.background(autoRelayAddrsSub, autonatReachabilitySub, emitter, relayAddrs)
@@ -227,13 +238,18 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
227238
ticker := time.NewTicker(addrChangeTickrInterval)
228239
defer ticker.Stop()
229240
var previousAddrs hostAddrs
241+
var notifCh chan struct{}
230242
for {
231-
currAddrs := a.updateAddrs(true, relayAddrs)
243+
currAddrs := a.updateAddrs(relayAddrs)
244+
if notifCh != nil {
245+
close(notifCh)
246+
notifCh = nil
247+
}
232248
a.notifyAddrsChanged(emitter, previousAddrs, currAddrs)
233249
previousAddrs = currAddrs
234250
select {
235251
case <-ticker.C:
236-
case <-a.triggerAddrsUpdateChan:
252+
case notifCh = <-a.triggerAddrsUpdateChan:
237253
case <-a.triggerReachabilityUpdate:
238254
case e := <-autoRelayAddrsSub.Out():
239255
if evt, ok := e.(event.EvtAutoRelayAddrsUpdated); ok {
@@ -250,26 +266,18 @@ func (a *addrsManager) background(autoRelayAddrsSub, autonatReachabilitySub even
250266
}
251267

252268
// updateAddrs updates the addresses of the host and returns the new updated
253-
// addrs
254-
func (a *addrsManager) updateAddrs(updateRelayAddrs bool, relayAddrs []ma.Multiaddr) hostAddrs {
255-
// Must lock while doing both recompute and update as this method is called from
256-
// multiple goroutines.
257-
a.addrsMx.Lock()
258-
defer a.addrsMx.Unlock()
259-
269+
// addrs. This must only be called from the background goroutine or from the Start method otherwise
270+
// we may end up with stale addrs.
271+
func (a *addrsManager) updateAddrs(relayAddrs []ma.Multiaddr) hostAddrs {
260272
localAddrs := a.getLocalAddrs()
261273
var currReachableAddrs, currUnreachableAddrs, currUnknownAddrs []ma.Multiaddr
262274
if a.addrsReachabilityTracker != nil {
263275
currReachableAddrs, currUnreachableAddrs, currUnknownAddrs = a.getConfirmedAddrs(localAddrs)
264276
}
265-
if !updateRelayAddrs {
266-
relayAddrs = a.currentAddrs.relayAddrs
267-
} else {
268-
// Copy the callers slice
269-
relayAddrs = slices.Clone(relayAddrs)
270-
}
277+
relayAddrs = slices.Clone(relayAddrs)
271278
currAddrs := a.getAddrs(slices.Clone(localAddrs), relayAddrs)
272279

280+
a.addrsMx.Lock()
273281
a.currentAddrs = hostAddrs{
274282
addrs: append(a.currentAddrs.addrs[:0], currAddrs...),
275283
localAddrs: append(a.currentAddrs.localAddrs[:0], localAddrs...),
@@ -278,6 +286,7 @@ func (a *addrsManager) updateAddrs(updateRelayAddrs bool, relayAddrs []ma.Multia
278286
unknownAddrs: append(a.currentAddrs.unknownAddrs[:0], currUnknownAddrs...),
279287
relayAddrs: append(a.currentAddrs.relayAddrs[:0], relayAddrs...),
280288
}
289+
a.addrsMx.Unlock()
281290

282291
return hostAddrs{
283292
localAddrs: localAddrs,
@@ -315,7 +324,8 @@ func (a *addrsManager) notifyAddrsChanged(emitter event.Emitter, previous, curre
315324
if areAddrsDifferent(previous.reachableAddrs, current.reachableAddrs) ||
316325
areAddrsDifferent(previous.unreachableAddrs, current.unreachableAddrs) ||
317326
areAddrsDifferent(previous.unknownAddrs, current.unknownAddrs) {
318-
log.Debugf("host reachable addrs updated: %s", current.localAddrs)
327+
log.Debugf("host reachable addrs updated: reachable: %s, unreachable: %s, unknown: %s",
328+
current.reachableAddrs, current.unreachableAddrs, current.unknownAddrs)
319329
if err := emitter.Emit(event.EvtHostReachableAddrsChanged{
320330
Reachable: slices.Clone(current.reachableAddrs),
321331
Unreachable: slices.Clone(current.unreachableAddrs),

p2p/host/basic/addrs_manager_test.go

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"errors"
66
"fmt"
77
"slices"
8+
"sync/atomic"
89
"testing"
910
"time"
1011

@@ -13,6 +14,7 @@ import (
1314
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
1415
"github.com/libp2p/go-libp2p/p2p/protocol/autonatv2"
1516
ma "github.com/multiformats/go-multiaddr"
17+
"github.com/multiformats/go-multiaddr/matest"
1618
manet "github.com/multiformats/go-multiaddr/net"
1719
"github.com/prometheus/client_golang/prometheus"
1820
"github.com/stretchr/testify/assert"
@@ -239,7 +241,7 @@ func TestAddrsManager(t *testing.T) {
239241
},
240242
ListenAddrs: func() []ma.Multiaddr { return []ma.Multiaddr{lhquic, lhtcp} },
241243
})
242-
am.triggerAddrsUpdate()
244+
am.updateAddrsSync()
243245
require.EventuallyWithT(t, func(collect *assert.CollectT) {
244246
expected := []ma.Multiaddr{publicQUIC, lhquic, lhtcp}
245247
assert.ElementsMatch(collect, am.Addrs(), expected, "%s\n%s", am.Addrs(), expected)
@@ -315,7 +317,7 @@ func TestAddrsManager(t *testing.T) {
315317
},
316318
ListenAddrs: func() []ma.Multiaddr { return []ma.Multiaddr{lhquic, lhtcp} },
317319
})
318-
am.triggerAddrsUpdate()
320+
am.updateAddrsSync()
319321
expected := []ma.Multiaddr{lhquic, lhtcp, publicTCP, publicQUIC}
320322
require.EventuallyWithT(t, func(collect *assert.CollectT) {
321323
assert.ElementsMatch(collect, am.Addrs(), expected, "%s\n%s", am.Addrs(), expected)
@@ -343,7 +345,7 @@ func TestAddrsManager(t *testing.T) {
343345
},
344346
ListenAddrs: func() []ma.Multiaddr { return []ma.Multiaddr{lhquic} },
345347
})
346-
am.triggerAddrsUpdate()
348+
am.updateAddrsSync()
347349
expected := []ma.Multiaddr{lhquic}
348350
expected = append(expected, quicAddrs[:maxObservedAddrsPerListenAddr]...)
349351
require.EventuallyWithT(t, func(collect *assert.CollectT) {
@@ -428,12 +430,32 @@ func TestAddrsManager(t *testing.T) {
428430
require.Contains(t, am.Addrs(), publicTCP)
429431
require.NotContains(t, am.Addrs(), publicQUIC)
430432
close(updateChan)
431-
am.triggerAddrsUpdate()
433+
am.updateAddrsSync()
432434
require.EventuallyWithT(t, func(collect *assert.CollectT) {
433435
assert.Contains(collect, am.Addrs(), publicQUIC)
434436
assert.NotContains(collect, am.Addrs(), publicTCP)
435437
}, 1*time.Second, 50*time.Millisecond)
436438
})
439+
440+
t.Run("addrs factory depends on confirmed addrs", func(t *testing.T) {
441+
var amp atomic.Pointer[addrsManager]
442+
q1 := ma.StringCast("/ip4/1.1.1.1/udp/1/quic-v1")
443+
addrsFactory := func(_ []ma.Multiaddr) []ma.Multiaddr {
444+
if amp.Load() == nil {
445+
return nil
446+
}
447+
// r is empty as there's no reachability tracker
448+
r, _, _ := amp.Load().ConfirmedAddrs()
449+
return append(r, q1)
450+
}
451+
am := newAddrsManagerTestCase(t, addrsManagerArgs{
452+
AddrsFactory: addrsFactory,
453+
ListenAddrs: func() []ma.Multiaddr { return []ma.Multiaddr{lhquic, lhtcp} },
454+
})
455+
amp.Store(am.addrsManager)
456+
am.updateAddrsSync()
457+
matest.AssertEqualMultiaddrs(t, []ma.Multiaddr{q1}, am.Addrs())
458+
})
437459
}
438460

439461
func TestAddrsManagerReachabilityEvent(t *testing.T) {

p2p/host/basic/basic_host.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,6 @@ func NewHost(n network.Network, opts *HostOpts) (*BasicHost, error) {
262262
if err != nil {
263263
return nil, fmt.Errorf("failed to create address service: %w", err)
264264
}
265-
// register to be notified when the network's listen addrs change,
266-
// so we can update our address set and push events if needed
267-
h.Network().Notify(h.addressManager.NetNotifee())
268265

269266
if opts.EnableHolePunching {
270267
if opts.EnableMetrics {
@@ -336,6 +333,9 @@ func (h *BasicHost) Start() {
336333
log.Errorf("autonat v2 failed to start: %s", err)
337334
}
338335
}
336+
// register to be notified when the network's listen addrs change,
337+
// so we can update our address set and push events if needed
338+
h.Network().Notify(h.addressManager.NetNotifee())
339339
if err := h.addressManager.Start(); err != nil {
340340
log.Errorf("address service failed to start: %s", err)
341341
}
@@ -857,7 +857,6 @@ func (h *BasicHost) Close() error {
857857
if h.cmgr != nil {
858858
h.cmgr.Close()
859859
}
860-
h.addressManager.Close()
861860

862861
if h.ids != nil {
863862
h.ids.Close()
@@ -882,6 +881,7 @@ func (h *BasicHost) Close() error {
882881
log.Errorf("swarm close failed: %v", err)
883882
}
884883

884+
h.addressManager.Close()
885885
h.psManager.Close()
886886
if h.Peerstore() != nil {
887887
h.Peerstore().Close()

p2p/host/basic/basic_host_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ func TestHostAddrChangeDetection(t *testing.T) {
737737
lk.Lock()
738738
currentAddrSet = i
739739
lk.Unlock()
740-
h.addressManager.triggerAddrsUpdate()
740+
h.addressManager.updateAddrsSync()
741741
evt := waitForAddrChangeEvent(ctx, sub, t)
742742
if !updatedAddrEventsEqual(expectedEvents[i-1], evt) {
743743
t.Errorf("change events not equal: \n\texpected: %v \n\tactual: %v", expectedEvents[i-1], evt)

0 commit comments

Comments
 (0)