Skip to content

Commit 2da01c5

Browse files
shiva-istariShivaji Kharse
andauthored
fix: zero address change (#9680)
**Description** When a Zero node is bootstrapped, its --my address is permanently recorded in the Raft WAL via a ConfChangeAddNode entry. On every subsequent restart, WAL replay restores that original address into MembershipState, overwriting the current --my flag. This causes Alphas to connect to a stale address (e.g., localhost:5080 from an initial bulk load) even when Zero is restarted with a production FQDN. This PR introduces a leader-driven reconciliation loop that detects mismatches between the live --my address and what is stored in MembershipState, and proposes a ConfChangeUpdateNode through Raft to durably correct the address across all nodes. An in-memory override in the Connect RPC ensures Alphas receive the correct address immediately, even before the Raft entry is committed. fixes #9676 --------- Co-authored-by: Shivaji Kharse <shiva@Shivajis-MacBook-Pro.local>
1 parent 55e3b79 commit 2da01c5

10 files changed

Lines changed: 921 additions & 10 deletions

File tree

conn/node.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,11 @@ func (n *Node) DeletePeer(pid uint64) {
555555

556556
var errInternalRetry = errors.New("Retry proposal again")
557557

558-
func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
558+
// ProposeConfChange proposes a Raft configuration change (add, remove, or
559+
// update a node) and blocks until it is committed or the context expires.
560+
// It is used by both the conn package internally and by the zero package
561+
// (for address reconciliation via ConfChangeUpdateNode).
562+
func (n *Node) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
559563
cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
560564
defer cancel()
561565

@@ -599,7 +603,7 @@ func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
599603
for err == errInternalRetry {
600604
glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
601605
glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
602-
err = n.proposeConfChange(ctx, cc)
606+
err = n.ProposeConfChange(ctx, cc)
603607
}
604608
return err
605609
}
@@ -618,7 +622,7 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
618622
}
619623
err := errInternalRetry
620624
for err == errInternalRetry {
621-
err = n.proposeConfChange(ctx, cc)
625+
err = n.ProposeConfChange(ctx, cc)
622626
}
623627
return err
624628
}

conn/pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
124124
}
125125
}
126126

127+
// Remove disconnects and removes the pool for addr. It is a no-op if addr is
128+
// not in the pool. Used to clean up stale connections after an address change.
129+
func (p *Pools) Remove(addr string) {
130+
p.remove(addr)
131+
}
132+
127133
func (p *Pools) remove(addr string) {
128134
p.Lock()
129135
defer p.Unlock()

dgraph/cmd/debug/wal.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ func printEntry(es raftpb.Entry, pending map[uint64]bool, isZero bool) {
3535
fmt.Fprintf(&buf, "%d . %d . %v . %-6s . %8d .", es.Term, es.Index, es.Type,
3636
humanize.Bytes(uint64(es.Size())), key)
3737
if es.Type == raftpb.EntryConfChange {
38+
var cc raftpb.ConfChange
39+
if err := cc.Unmarshal(es.Data); err != nil {
40+
fmt.Fprintf(&buf, " [ConfChange unmarshal error: %v]", err)
41+
return
42+
}
43+
fmt.Fprintf(&buf, " Type: %s . NodeID: %#x .", cc.Type, cc.NodeID)
44+
if len(cc.Context) > 0 {
45+
var rc pb.RaftContext
46+
if err := proto.Unmarshal(cc.Context, &rc); err == nil {
47+
fmt.Fprintf(&buf, " RaftContext: %s", rc.String())
48+
}
49+
}
3850
return
3951
}
4052
if len(es.Data) == 0 {

dgraph/cmd/zero/raft.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -500,6 +500,28 @@ func (n *node) applyConfChange(e raftpb.Entry) {
500500
n.DeletePeer(cc.NodeID)
501501
n.server.removeZero(cc.NodeID)
502502

503+
} else if cc.Type == raftpb.ConfChangeUpdateNode && len(cc.Context) > 0 {
504+
// ConfChangeUpdateNode is a Raft-provided mechanism for updating node
505+
// metadata (e.g. addresses) without changing cluster membership. It is
506+
// a no-op inside the Raft library; the application is responsible for
507+
// interpreting the Context bytes. We use it to fix stale Zero addresses
508+
// that were baked into the WAL at initial bootstrap.
509+
var rc pb.RaftContext
510+
x.Check(proto.Unmarshal(cc.Context, &rc))
511+
512+
// Drop the stale pool before connecting to the new address so that
513+
// repeated dial errors to the old address are eliminated immediately.
514+
if oldMember, ok := n.server.membershipState().GetZeros()[rc.Id]; ok {
515+
if oldMember.GetAddr() != rc.Addr {
516+
conn.GetPools().Remove(oldMember.GetAddr())
517+
}
518+
}
519+
go n.Connect(rc.Id, rc.Addr)
520+
521+
m := &pb.Member{Id: rc.Id, Addr: rc.Addr, GroupId: 0, Learner: rc.IsLearner}
522+
n.server.storeZero(m)
523+
glog.Infof("Applied ConfChangeUpdateNode for Zero %#x: addr=%q", rc.Id, rc.Addr)
524+
503525
} else if len(cc.Context) > 0 {
504526
var rc pb.RaftContext
505527
x.Check(proto.Unmarshal(cc.Context, &rc))
@@ -696,12 +718,89 @@ func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) {
696718
select {
697719
case <-ticker:
698720
n.server.updateZeroLeader()
721+
n.reconcileZeroAddresses()
699722
case <-closer.HasBeenClosed():
700723
return
701724
}
702725
}
703726
}
704727

728+
// reconcileZeroAddresses detects mismatches between the current --my address
729+
// (or transport-layer peer addresses) and what is stored in MembershipState,
730+
// then proposes a ConfChangeUpdateNode through Raft to correct them. This
731+
// ensures that stale addresses baked into the WAL at initial bootstrap are
732+
// replaced with the current addresses after a restart. Only the leader can
733+
// propose, so this function is a no-op on followers.
734+
func (n *node) reconcileZeroAddresses() {
735+
if !n.AmLeader() {
736+
return
737+
}
738+
739+
state := n.server.membershipState()
740+
if state == nil {
741+
return
742+
}
743+
744+
for id, zero := range state.GetZeros() {
745+
var correctAddr string
746+
747+
if id == n.Id {
748+
correctAddr = n.RaftContext.Addr
749+
} else if peerAddr, ok := n.Peer(id); ok {
750+
correctAddr = peerAddr
751+
} else {
752+
continue
753+
}
754+
755+
if correctAddr == zero.GetAddr() {
756+
continue
757+
}
758+
759+
// Validate: ensure no other Zero already claims this address.
760+
duplicate := false
761+
for otherId, otherZero := range state.GetZeros() {
762+
if otherId != id && otherZero.GetAddr() == correctAddr {
763+
glog.Warningf("Skipping address reconciliation for Zero %#x: "+
764+
"address %q already used by Zero %#x", id, correctAddr, otherId)
765+
duplicate = true
766+
break
767+
}
768+
}
769+
if duplicate {
770+
continue
771+
}
772+
773+
glog.Infof("Zero %#x address mismatch: MembershipState has %q, expected %q. "+
774+
"Proposing ConfChangeUpdateNode.", id, zero.GetAddr(), correctAddr)
775+
776+
rc := &pb.RaftContext{
777+
Id: id,
778+
Addr: correctAddr,
779+
Group: 0,
780+
}
781+
data, err := proto.Marshal(rc)
782+
if err != nil {
783+
glog.Errorf("Error marshalling RaftContext for address update: %v", err)
784+
continue
785+
}
786+
787+
cc := raftpb.ConfChange{
788+
Type: raftpb.ConfChangeUpdateNode,
789+
NodeID: id,
790+
Context: data,
791+
}
792+
if err := n.ProposeConfChange(n.ctx, cc); err != nil {
793+
glog.Errorf("Failed to propose ConfChangeUpdateNode for Zero %#x: %v", id, err)
794+
}
795+
// Propose one update at a time to respect Raft's single pending
796+
// ConfChange constraint. The next mismatch will be handled in the
797+
// following periodic cycle.
798+
return
799+
}
800+
// V(2): emitted on every 10s tick, too noisy for production at Info level.
801+
glog.V(2).Infof("Zero address reconciliation complete: all addresses up to date")
802+
}
803+
705804
func (n *node) checkQuorum(closer *z.Closer) {
706805
defer closer.Done()
707806
ticker := time.Tick(time.Second)
@@ -923,6 +1022,10 @@ func (n *node) Run() {
9231022
if rd.RaftState == raft.StateLeader && !leader {
9241023
glog.Infoln("I've become the leader, updating leases.")
9251024
n.server.updateLeases()
1025+
// Eagerly reconcile on leader election so stale WAL
1026+
// addresses are corrected without waiting for the
1027+
// periodic 10s tick.
1028+
go n.reconcileZeroAddresses()
9261029
}
9271030
leader = rd.RaftState == raft.StateLeader
9281031
// group id hardcoded as 0

dgraph/cmd/zero/zero.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -515,6 +515,17 @@ func (s *Server) Connect(ctx context.Context,
515515
return nil, err
516516
}
517517

518+
// Ensure this Zero's own address in MembershipState reflects the current
519+
// --my flag, even before ConfChangeUpdateNode has been committed through
520+
// Raft. This prevents Alphas from receiving a stale address during the
521+
// brief window between restart and reconciliation.
522+
myAddr := s.Node.RaftContext.Addr
523+
if myId := s.Node.Id; myAddr != "" {
524+
if z, ok := ms.GetZeros()[myId]; ok && z.GetAddr() != myAddr {
525+
z.Addr = myAddr
526+
}
527+
}
528+
518529
if m.ClusterInfoOnly {
519530
// This request only wants to access the membership state, and nothing else. Most likely
520531
// from our clients.

dgraphtest/dgraph.go

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,12 @@ type dnode interface {
8888
}
8989

9090
type zero struct {
91-
id int // 0, 1, 2
92-
containerID string // container ID in docker world
93-
containerName string // something like test-1234_zero2
94-
aliasName string // something like alpha0, zero1
95-
isRunning bool
91+
id int // 0, 1, 2
92+
containerID string // container ID in docker world
93+
containerName string // something like test-1234_zero2
94+
aliasName string // something like alpha0, zero1
95+
isRunning bool
96+
myAddrOverride string // if set, overrides the --my flag value
9697
}
9798

9899
func (z *zero) cname() string {
@@ -128,7 +129,11 @@ func (z *zero) bindings(offset int) nat.PortMap {
128129
}
129130

130131
func (z *zero) cmd(c *LocalCluster) []string {
131-
zcmd := []string{"/gobin/dgraph", "zero", fmt.Sprintf("--my=%s:%v", z.aname(), zeroGrpcPort), "--bindall",
132+
myAddr := fmt.Sprintf("%s:%v", z.aname(), zeroGrpcPort)
133+
if z.myAddrOverride != "" {
134+
myAddr = z.myAddrOverride
135+
}
136+
zcmd := []string{"/gobin/dgraph", "zero", fmt.Sprintf("--my=%s", myAddr), "--bindall",
132137
fmt.Sprintf(`--replicas=%v`, c.conf.replicas), "--logtostderr", fmt.Sprintf("-v=%d", c.conf.verbosity)}
133138

134139
if c.lowerThanV21 {

dgraphtest/local_cluster.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,75 @@ func (c *LocalCluster) StopZero(id int) error {
596596
return c.stopContainer(c.zeros[id])
597597
}
598598

599+
// GetZeroContainerName returns the Docker container name for the specified Zero,
600+
// which also serves as a DNS alias on the cluster network.
601+
func (c *LocalCluster) GetZeroContainerName(id int) (string, error) {
602+
if id >= c.conf.numZeros {
603+
return "", fmt.Errorf("invalid id of zero: %v", id)
604+
}
605+
return c.zeros[id].containerName, nil
606+
}
607+
608+
// SetZeroMyAddr overrides the --my flag for the specified Zero node. The next
609+
// time the container is recreated (via RecreateZero), this address will be
610+
// used instead of the default container alias.
611+
func (c *LocalCluster) SetZeroMyAddr(id int, addr string) error {
612+
if id >= c.conf.numZeros {
613+
return fmt.Errorf("invalid id of zero: %v", id)
614+
}
615+
c.zeros[id].myAddrOverride = addr
616+
return nil
617+
}
618+
619+
// RecreateZero destroys the Zero container and creates a new one with the
620+
// current command-line flags (e.g. a different --my address). The Zero's data
621+
// directory is preserved across the recreation by extracting it from the old
622+
// (stopped) container and injecting it into the new one. This simulates a
623+
// process restart with persistent storage — the exact scenario where stale WAL
624+
// addresses surface.
625+
func (c *LocalCluster) RecreateZero(id int) error {
626+
if id >= c.conf.numZeros {
627+
return fmt.Errorf("invalid id of zero: %v", id)
628+
}
629+
z := c.zeros[id]
630+
631+
ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
632+
defer cancel()
633+
634+
// Extract the data directory from the stopped container so the WAL survives.
635+
dataReader, _, err := c.dcli.CopyFromContainer(ctx, z.cid(), zeroWorkingDir)
636+
if err != nil {
637+
return errors.Wrapf(err, "error copying data from zero container [%v]", z.cname())
638+
}
639+
defer dataReader.Close()
640+
641+
// Read the tar into memory (Zero WAL is small).
642+
dataTar, err := io.ReadAll(dataReader)
643+
if err != nil {
644+
return errors.Wrap(err, "error reading zero data tar")
645+
}
646+
647+
ro := container.RemoveOptions{RemoveVolumes: true, Force: true}
648+
if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil {
649+
return errors.Wrapf(err, "error removing zero container [%v]", z.cname())
650+
}
651+
652+
cid, err := c.createContainer(z)
653+
if err != nil {
654+
return errors.Wrapf(err, "error recreating zero container [%v]", z.cname())
655+
}
656+
z.containerID = cid
657+
658+
// Inject the data directory into the new container. CopyToContainer expects
659+
// a tar archive rooted at the parent of the target path.
660+
if err := c.dcli.CopyToContainer(ctx, cid, "/data", bytes.NewReader(dataTar),
661+
container.CopyToContainerOptions{}); err != nil {
662+
return errors.Wrapf(err, "error restoring data to zero container [%v]", z.cname())
663+
}
664+
665+
return nil
666+
}
667+
599668
func (c *LocalCluster) StopAlpha(id int) error {
600669
if id >= c.conf.numAlphas {
601670
return fmt.Errorf("invalid id of alpha: %v", id)

0 commit comments

Comments
 (0)