diff --git a/conn/node.go b/conn/node.go index 605c00d33fc..16014832d6c 100644 --- a/conn/node.go +++ b/conn/node.go @@ -555,7 +555,11 @@ func (n *Node) DeletePeer(pid uint64) { var errInternalRetry = errors.New("Retry proposal again") -func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { +// ProposeConfChange proposes a Raft configuration change (add, remove, or +// update a node) and blocks until it is committed or the context expires. +// It is used by both the conn package internally and by the zero package +// (for address reconciliation via ConfChangeUpdateNode). +func (n *Node) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error { cctx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() @@ -599,7 +603,7 @@ func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error { for err == errInternalRetry { glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr) glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState()) - err = n.proposeConfChange(ctx, cc) + err = n.ProposeConfChange(ctx, cc) } return err } @@ -618,7 +622,7 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error { } err := errInternalRetry for err == errInternalRetry { - err = n.proposeConfChange(ctx, cc) + err = n.ProposeConfChange(ctx, cc) } return err } diff --git a/conn/pool.go b/conn/pool.go index e81a32b5212..e8785cc7c7d 100644 --- a/conn/pool.go +++ b/conn/pool.go @@ -124,6 +124,12 @@ func (p *Pools) RemoveInvalid(state *pb.MembershipState) { } } +// Remove disconnects and removes the pool for addr. It is a no-op if addr is +// not in the pool. Used to clean up stale connections after an address change. +func (p *Pools) Remove(addr string) { + p.remove(addr) +} + func (p *Pools) remove(addr string) { p.Lock() defer p.Unlock() diff --git a/dgraph/cmd/debug/wal.go b/dgraph/cmd/debug/wal.go index 4103d955ad6..5cb3b42cc7b 100644 --- a/dgraph/cmd/debug/wal.go +++ b/dgraph/cmd/debug/wal.go @@ -35,6 +35,18 @@ func printEntry(es raftpb.Entry, pending map[uint64]bool, isZero bool) { fmt.Fprintf(&buf, "%d . %d . %v . %-6s . %8d .", es.Term, es.Index, es.Type, humanize.Bytes(uint64(es.Size())), key) if es.Type == raftpb.EntryConfChange { + var cc raftpb.ConfChange + if err := cc.Unmarshal(es.Data); err != nil { + fmt.Fprintf(&buf, " [ConfChange unmarshal error: %v]", err) + return + } + fmt.Fprintf(&buf, " Type: %s . NodeID: %#x .", cc.Type, cc.NodeID) + if len(cc.Context) > 0 { + var rc pb.RaftContext + if err := proto.Unmarshal(cc.Context, &rc); err == nil { + fmt.Fprintf(&buf, " RaftContext: %s", rc.String()) + } + } return } if len(es.Data) == 0 { diff --git a/dgraph/cmd/zero/raft.go b/dgraph/cmd/zero/raft.go index 2c0cabc727b..f9fea51f82b 100644 --- a/dgraph/cmd/zero/raft.go +++ b/dgraph/cmd/zero/raft.go @@ -500,6 +500,28 @@ func (n *node) applyConfChange(e raftpb.Entry) { n.DeletePeer(cc.NodeID) n.server.removeZero(cc.NodeID) + } else if cc.Type == raftpb.ConfChangeUpdateNode && len(cc.Context) > 0 { + // ConfChangeUpdateNode is a Raft-provided mechanism for updating node + // metadata (e.g. addresses) without changing cluster membership. It is + // a no-op inside the Raft library; the application is responsible for + // interpreting the Context bytes. We use it to fix stale Zero addresses + // that were baked into the WAL at initial bootstrap. + var rc pb.RaftContext + x.Check(proto.Unmarshal(cc.Context, &rc)) + + // Drop the stale pool before connecting to the new address so that + // repeated dial errors to the old address are eliminated immediately. + if oldMember, ok := n.server.membershipState().GetZeros()[rc.Id]; ok { + if oldMember.GetAddr() != rc.Addr { + conn.GetPools().Remove(oldMember.GetAddr()) + } + } + go n.Connect(rc.Id, rc.Addr) + + m := &pb.Member{Id: rc.Id, Addr: rc.Addr, GroupId: 0, Learner: rc.IsLearner} + n.server.storeZero(m) + glog.Infof("Applied ConfChangeUpdateNode for Zero %#x: addr=%q", rc.Id, rc.Addr) + } else if len(cc.Context) > 0 { var rc pb.RaftContext x.Check(proto.Unmarshal(cc.Context, &rc)) @@ -696,12 +718,89 @@ func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) { select { case <-ticker: n.server.updateZeroLeader() + n.reconcileZeroAddresses() case <-closer.HasBeenClosed(): return } } } +// reconcileZeroAddresses detects mismatches between the current --my address +// (or transport-layer peer addresses) and what is stored in MembershipState, +// then proposes a ConfChangeUpdateNode through Raft to correct them. This +// ensures that stale addresses baked into the WAL at initial bootstrap are +// replaced with the current addresses after a restart. Only the leader can +// propose, so this function is a no-op on followers. +func (n *node) reconcileZeroAddresses() { + if !n.AmLeader() { + return + } + + state := n.server.membershipState() + if state == nil { + return + } + + for id, zero := range state.GetZeros() { + var correctAddr string + + if id == n.Id { + correctAddr = n.RaftContext.Addr + } else if peerAddr, ok := n.Peer(id); ok { + correctAddr = peerAddr + } else { + continue + } + + if correctAddr == zero.GetAddr() { + continue + } + + // Validate: ensure no other Zero already claims this address. + duplicate := false + for otherId, otherZero := range state.GetZeros() { + if otherId != id && otherZero.GetAddr() == correctAddr { + glog.Warningf("Skipping address reconciliation for Zero %#x: "+ + "address %q already used by Zero %#x", id, correctAddr, otherId) + duplicate = true + break + } + } + if duplicate { + continue + } + + glog.Infof("Zero %#x address mismatch: MembershipState has %q, expected %q. "+ + "Proposing ConfChangeUpdateNode.", id, zero.GetAddr(), correctAddr) + + rc := &pb.RaftContext{ + Id: id, + Addr: correctAddr, + Group: 0, + } + data, err := proto.Marshal(rc) + if err != nil { + glog.Errorf("Error marshalling RaftContext for address update: %v", err) + continue + } + + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: id, + Context: data, + } + if err := n.ProposeConfChange(n.ctx, cc); err != nil { + glog.Errorf("Failed to propose ConfChangeUpdateNode for Zero %#x: %v", id, err) + } + // Propose one update at a time to respect Raft's single pending + // ConfChange constraint. The next mismatch will be handled in the + // following periodic cycle. + return + } + // V(2): emitted on every 10s tick, too noisy for production at Info level. + glog.V(2).Infof("Zero address reconciliation complete: all addresses up to date") +} + func (n *node) checkQuorum(closer *z.Closer) { defer closer.Done() ticker := time.Tick(time.Second) @@ -923,6 +1022,10 @@ func (n *node) Run() { if rd.RaftState == raft.StateLeader && !leader { glog.Infoln("I've become the leader, updating leases.") n.server.updateLeases() + // Eagerly reconcile on leader election so stale WAL + // addresses are corrected without waiting for the + // periodic 10s tick. + go n.reconcileZeroAddresses() } leader = rd.RaftState == raft.StateLeader // group id hardcoded as 0 diff --git a/dgraph/cmd/zero/zero.go b/dgraph/cmd/zero/zero.go index 8903ec4a52c..c640d93e386 100644 --- a/dgraph/cmd/zero/zero.go +++ b/dgraph/cmd/zero/zero.go @@ -515,6 +515,17 @@ func (s *Server) Connect(ctx context.Context, return nil, err } + // Ensure this Zero's own address in MembershipState reflects the current + // --my flag, even before ConfChangeUpdateNode has been committed through + // Raft. This prevents Alphas from receiving a stale address during the + // brief window between restart and reconciliation. + myAddr := s.Node.RaftContext.Addr + if myId := s.Node.Id; myAddr != "" { + if z, ok := ms.GetZeros()[myId]; ok && z.GetAddr() != myAddr { + z.Addr = myAddr + } + } + if m.ClusterInfoOnly { // This request only wants to access the membership state, and nothing else. Most likely // from our clients. diff --git a/dgraphtest/dgraph.go b/dgraphtest/dgraph.go index 44d25a6eeed..bc7fdcea707 100644 --- a/dgraphtest/dgraph.go +++ b/dgraphtest/dgraph.go @@ -88,11 +88,12 @@ type dnode interface { } type zero struct { - id int // 0, 1, 2 - containerID string // container ID in docker world - containerName string // something like test-1234_zero2 - aliasName string // something like alpha0, zero1 - isRunning bool + id int // 0, 1, 2 + containerID string // container ID in docker world + containerName string // something like test-1234_zero2 + aliasName string // something like alpha0, zero1 + isRunning bool + myAddrOverride string // if set, overrides the --my flag value } func (z *zero) cname() string { @@ -128,7 +129,11 @@ func (z *zero) bindings(offset int) nat.PortMap { } func (z *zero) cmd(c *LocalCluster) []string { - zcmd := []string{"/gobin/dgraph", "zero", fmt.Sprintf("--my=%s:%v", z.aname(), zeroGrpcPort), "--bindall", + myAddr := fmt.Sprintf("%s:%v", z.aname(), zeroGrpcPort) + if z.myAddrOverride != "" { + myAddr = z.myAddrOverride + } + zcmd := []string{"/gobin/dgraph", "zero", fmt.Sprintf("--my=%s", myAddr), "--bindall", fmt.Sprintf(`--replicas=%v`, c.conf.replicas), "--logtostderr", fmt.Sprintf("-v=%d", c.conf.verbosity)} if c.lowerThanV21 { diff --git a/dgraphtest/local_cluster.go b/dgraphtest/local_cluster.go index 33d7f5d5d79..5e102acdf84 100644 --- a/dgraphtest/local_cluster.go +++ b/dgraphtest/local_cluster.go @@ -596,6 +596,75 @@ func (c *LocalCluster) StopZero(id int) error { return c.stopContainer(c.zeros[id]) } +// GetZeroContainerName returns the Docker container name for the specified Zero, +// which also serves as a DNS alias on the cluster network. +func (c *LocalCluster) GetZeroContainerName(id int) (string, error) { + if id >= c.conf.numZeros { + return "", fmt.Errorf("invalid id of zero: %v", id) + } + return c.zeros[id].containerName, nil +} + +// SetZeroMyAddr overrides the --my flag for the specified Zero node. The next +// time the container is recreated (via RecreateZero), this address will be +// used instead of the default container alias. +func (c *LocalCluster) SetZeroMyAddr(id int, addr string) error { + if id >= c.conf.numZeros { + return fmt.Errorf("invalid id of zero: %v", id) + } + c.zeros[id].myAddrOverride = addr + return nil +} + +// RecreateZero destroys the Zero container and creates a new one with the +// current command-line flags (e.g. a different --my address). The Zero's data +// directory is preserved across the recreation by extracting it from the old +// (stopped) container and injecting it into the new one. This simulates a +// process restart with persistent storage — the exact scenario where stale WAL +// addresses surface. +func (c *LocalCluster) RecreateZero(id int) error { + if id >= c.conf.numZeros { + return fmt.Errorf("invalid id of zero: %v", id) + } + z := c.zeros[id] + + ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) + defer cancel() + + // Extract the data directory from the stopped container so the WAL survives. + dataReader, _, err := c.dcli.CopyFromContainer(ctx, z.cid(), zeroWorkingDir) + if err != nil { + return errors.Wrapf(err, "error copying data from zero container [%v]", z.cname()) + } + defer dataReader.Close() + + // Read the tar into memory (Zero WAL is small). + dataTar, err := io.ReadAll(dataReader) + if err != nil { + return errors.Wrap(err, "error reading zero data tar") + } + + ro := container.RemoveOptions{RemoveVolumes: true, Force: true} + if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil { + return errors.Wrapf(err, "error removing zero container [%v]", z.cname()) + } + + cid, err := c.createContainer(z) + if err != nil { + return errors.Wrapf(err, "error recreating zero container [%v]", z.cname()) + } + z.containerID = cid + + // Inject the data directory into the new container. CopyToContainer expects + // a tar archive rooted at the parent of the target path. + if err := c.dcli.CopyToContainer(ctx, cid, "/data", bytes.NewReader(dataTar), + container.CopyToContainerOptions{}); err != nil { + return errors.Wrapf(err, "error restoring data to zero container [%v]", z.cname()) + } + + return nil +} + func (c *LocalCluster) StopAlpha(id int) error { if id >= c.conf.numAlphas { return fmt.Errorf("invalid id of alpha: %v", id) diff --git a/dgraphtest/zero_state.go b/dgraphtest/zero_state.go new file mode 100644 index 00000000000..ac6a82ab7a6 --- /dev/null +++ b/dgraphtest/zero_state.go @@ -0,0 +1,287 @@ +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package dgraphtest + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/pkg/errors" +) + +// ZeroMember is a minimal view of a Zero member as reported by the /state +// endpoint. Only the fields relevant to address reconciliation are included. +type ZeroMember struct { + Addr string `json:"addr"` + Leader bool `json:"leader"` +} + +// ZeroState is the subset of Zero's /state response that we care about for +// testing. It intentionally mirrors only the fields used by tests, keeping +// unmarshal resilient to unrelated schema changes. +type ZeroState struct { + Zeros map[string]ZeroMember `json:"zeros"` +} + +// GetZeroStateURL returns the full HTTP URL of a Zero's /state endpoint. +func (c *LocalCluster) GetZeroStateURL(id int) (string, error) { + if id >= c.conf.numZeros { + return "", fmt.Errorf("invalid id of zero: %v", id) + } + pubPort, err := publicPort(c.dcli, c.zeros[id], zeroHttpPort) + if err != nil { + return "", err + } + return "http://0.0.0.0:" + pubPort + "/state", nil +} + +// GetZeroState queries the /state endpoint on the specified Zero and returns +// the parsed membership snapshot. +func (c *LocalCluster) GetZeroState(id int) (*ZeroState, error) { + stateURL, err := c.GetZeroStateURL(id) + if err != nil { + return nil, err + } + + resp, err := http.Get(stateURL) + if err != nil { + return nil, errors.Wrapf(err, "GET %s", stateURL) + } + defer func() { _ = resp.Body.Close() }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, errors.Wrap(err, "reading /state body") + } + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("/state HTTP %d: %s", resp.StatusCode, body) + } + + var state ZeroState + if err := json.Unmarshal(body, &state); err != nil { + return nil, errors.Wrapf(err, "unmarshal /state (body: %s)", string(body)) + } + return &state, nil +} + +// ZeroLocator identifies a Zero by its container index and Raft ID. +type ZeroLocator struct { + ContainerIdx int + RaftID string + Addr string +} + +// GetZeroLeader returns the locator of the current Zero leader, as reported by +// the Zero at queryIdx. +func (c *LocalCluster) GetZeroLeader(queryIdx int) (*ZeroLocator, error) { + return c.findZero(queryIdx, true) +} + +// GetZeroFollower returns the locator of any Zero that is not the leader, as +// reported by the Zero at queryIdx. +func (c *LocalCluster) GetZeroFollower(queryIdx int) (*ZeroLocator, error) { + return c.findZero(queryIdx, false) +} + +// GetZeroFollowers returns the locators of all non-leader Zeros, as reported +// by the Zero at queryIdx. +func (c *LocalCluster) GetZeroFollowers(queryIdx int) ([]ZeroLocator, error) { + state, err := c.GetZeroState(queryIdx) + if err != nil { + return nil, err + } + var followers []ZeroLocator + for id, z := range state.Zeros { + if z.Leader { + continue + } + idx, ok := containerIdxFromAddr(z.Addr) + if !ok { + return nil, fmt.Errorf("cannot map follower addr %q to container index", z.Addr) + } + followers = append(followers, ZeroLocator{ContainerIdx: idx, RaftID: id, Addr: z.Addr}) + } + return followers, nil +} + +func (c *LocalCluster) findZero(queryIdx int, wantLeader bool) (*ZeroLocator, error) { + state, err := c.GetZeroState(queryIdx) + if err != nil { + return nil, err + } + for id, z := range state.Zeros { + if z.Leader != wantLeader { + continue + } + idx, ok := containerIdxFromAddr(z.Addr) + if !ok { + return nil, fmt.Errorf("cannot map addr %q to container index", z.Addr) + } + return &ZeroLocator{ContainerIdx: idx, RaftID: id, Addr: z.Addr}, nil + } + role := "follower" + if wantLeader { + role = "leader" + } + return nil, fmt.Errorf("no %s found in /state from zero%d", role, queryIdx) +} + +// WaitForZeroAddress polls /state until the member identified by raftID has +// wantAddr. It first tries queryIdx; if that node's /state is unavailable +// (e.g. it temporarily lost quorum during a leadership change) it falls back +// to any other Zero in the cluster. The last observed address and an error +// are returned if the target is not reached within timeout. +func (c *LocalCluster) WaitForZeroAddress(queryIdx int, raftID, wantAddr string, + timeout, poll time.Duration) (string, error) { + + deadline := time.Now().Add(timeout) + var lastAddr string + var lastErr error + for time.Now().Before(deadline) { + // Try queryIdx first, fall back to other nodes when quorum is transiently lost. + for _, idx := range append([]int{queryIdx}, otherZeroIdxs(queryIdx, c.conf.numZeros)...) { + state, err := c.GetZeroState(idx) + if err != nil { + lastErr = err + continue + } + z, ok := state.Zeros[raftID] + if !ok { + lastErr = fmt.Errorf("raft id %s not present in /state", raftID) + continue + } + lastAddr = z.Addr + if lastAddr == wantAddr { + return lastAddr, nil + } + break + } + time.Sleep(poll) + } + if lastErr != nil { + return lastAddr, errors.Wrapf(lastErr, + "timed out waiting for zero %s addr=%q on zero%d (last seen %q)", + raftID, wantAddr, queryIdx, lastAddr) + } + return lastAddr, fmt.Errorf( + "timed out waiting for zero %s addr=%q on zero%d (last seen %q)", + raftID, wantAddr, queryIdx, lastAddr) +} + +// otherZeroIdxs returns all zero indices except exclude, preserving order. +func otherZeroIdxs(exclude, numZeros int) []int { + var out []int + for i := range numZeros { + if i != exclude { + out = append(out, i) + } + } + return out +} + +// ChangeZeroAddress reconfigures the Zero at id with a new --my flag that +// points to its Docker container name (a valid DNS alias on the cluster +// network). The Zero is stopped, recreated with its WAL preserved, and +// restarted. The new address is returned so callers can assert convergence. +func (c *LocalCluster) ChangeZeroAddress(id int) (string, error) { + if err := c.StopZero(id); err != nil { + return "", err + } + containerName, err := c.GetZeroContainerName(id) + if err != nil { + return "", err + } + newAddr := containerName + ":" + zeroGrpcPort + if err := c.SetZeroMyAddr(id, newAddr); err != nil { + return "", err + } + if err := c.RecreateZero(id); err != nil { + return "", err + } + if err := c.StartZero(id); err != nil { + return "", err + } + return newAddr, nil +} + +// containerIdxFromAddr extracts the numeric suffix from the "zeroN" segment +// of a member's address and returns it as the container index used by tests. +func containerIdxFromAddr(addr string) (int, bool) { + for i := range maxContainerIdxScan { + if strings.Contains(addr, fmt.Sprintf("zero%d", i)) { + return i, true + } + } + return -1, false +} + +// maxContainerIdxScan bounds the zero N scan in containerIdxFromAddr. It is +// larger than any realistic test topology. +const maxContainerIdxScan = 32 + +// GetZeroLogs returns the full container stdout/stderr log for the Zero at id. +func (c *LocalCluster) GetZeroLogs(id int) (string, error) { + if id >= c.conf.numZeros { + return "", fmt.Errorf("invalid id of zero: %v", id) + } + return c.getLogs(c.zeros[id].containerID) +} + +// WaitForZeroLog polls the Zero's container log until it contains substr, or +// the timeout expires. It is the log-based equivalent of WaitForZeroAddress +// and lets tests react to concrete events (e.g. a Raft proposal being applied) +// instead of sleeping for an arbitrary duration. +func (c *LocalCluster) WaitForZeroLog(id int, substr string, timeout, poll time.Duration) error { + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + logs, err := c.GetZeroLogs(id) + if err != nil { + lastErr = err + } else if strings.Contains(logs, substr) { + return nil + } + time.Sleep(poll) + } + if lastErr != nil { + return errors.Wrapf(lastErr, "timed out waiting for log substring %q on zero%d", + substr, id) + } + return fmt.Errorf("timed out waiting for log substring %q on zero%d", substr, id) +} + +// ZeroLogContains reports whether the Zero's current log output contains substr. +func (c *LocalCluster) ZeroLogContains(id int, substr string) (bool, error) { + logs, err := c.GetZeroLogs(id) + if err != nil { + return false, err + } + return strings.Contains(logs, substr), nil +} + +// WaitForAnyZeroLog polls all Zero containers until any one of them contains +// substr, or the timeout expires. Use this when a log marker may appear on any +// node (e.g. after a leadership change caused by the test scenario itself). +func (c *LocalCluster) WaitForAnyZeroLog(substr string, timeout, poll time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + for i := range c.conf.numZeros { + logs, err := c.GetZeroLogs(i) + if err != nil { + continue + } + if strings.Contains(logs, substr) { + return nil + } + } + time.Sleep(poll) + } + return fmt.Errorf("timed out waiting for log substring %q on any zero", substr) +} diff --git a/systest/integration2/zero_addr_test.go b/systest/integration2/zero_addr_test.go new file mode 100644 index 00000000000..bb92b24fb48 --- /dev/null +++ b/systest/integration2/zero_addr_test.go @@ -0,0 +1,408 @@ +//go:build integration2 + +/* + * SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc. + * SPDX-License-Identifier: Apache-2.0 + */ + +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/dgraph-io/dgraph/v25/dgraphtest" +) + +// These tests verify that Dgraph Zero reconciles stale addresses that were +// baked into its Raft WAL at initial bootstrap. Each test targets one of the +// scenarios identified in the design: +// +// 1. Leader changes address, reconciles itself (single-Zero cluster). +// 2. WAL replay — address survives a subsequent restart. +// 4. Leader changes address in multi-Zero cluster; followers apply. +// 5. Follower changes address; leader detects via Raft transport peer map. +// 6. Multiple Zeros change addresses simultaneously; sequential reconcile. +// 7. Old leader changes address after losing leadership; new leader fixes. +// 9. Follower restart: its own address converges for the Connect response. +// 11. Duplicate address is rejected by the uniqueness check. +// 12. Restart with unchanged --my triggers no proposal and no state change. +// +// Numbering follows the scenarios enumerated in the design discussion. +// +// Positive convergence is asserted via /state polling. "Negative" assertions +// (no proposal, duplicate rejected) are anchored to concrete log markers +// emitted by the fix instead of sleeping for a fixed duration, which keeps +// the suite fast and avoids flakes on slow CI. + +const ( + reconcileTimeout = 30 * time.Second + multiReconcileTimeout = 120 * time.Second + reconcilePoll = time.Second + + // Log markers emitted by the Zero reconciliation code path. Keep these in + // sync with glog statements in dgraph/cmd/zero/raft.go. + logApplied = "Applied ConfChangeUpdateNode" + logProposed = "Proposing ConfChangeUpdateNode" + logBecameLeader = "I've become the leader" + // logReconcileComplete is emitted at V(2) at the end of reconcileZeroAddresses + // when the loop completes without making any proposal. It is used for negative + // assertions (e.g. "no change committed", "duplicate rejected") where /state + // polling alone is insufficient: the /state endpoint requires WaitLinearizableRead + // and is unavailable during leadership transitions, making it impossible to + // distinguish "address unchanged" from "endpoint temporarily unreachable". + // Test containers run with -v=2 so V(2) logs are visible in container output. + logReconcileComplete = "Zero address reconciliation complete: all addresses up to date" +) + +// waitForAddr asserts a Zero's address converges within reconcileTimeout. +func waitForAddr(t *testing.T, c *dgraphtest.LocalCluster, queryIdx int, raftID, want string) { + t.Helper() + addr, err := c.WaitForZeroAddress(queryIdx, raftID, want, reconcileTimeout, reconcilePoll) + require.NoError(t, err, "zero %s did not converge to %q on zero%d (last seen %q)", + raftID, want, queryIdx, addr) +} + +// waitForAddrLong is the same as waitForAddr but with a longer deadline for +// scenarios that must span multiple reconciliation cycles. +func waitForAddrLong(t *testing.T, c *dgraphtest.LocalCluster, queryIdx int, raftID, want string) { + t.Helper() + addr, err := c.WaitForZeroAddress(queryIdx, raftID, want, multiReconcileTimeout, reconcilePoll) + require.NoError(t, err, "zero %s did not converge to %q on zero%d (last seen %q)", + raftID, want, queryIdx, addr) +} + +// waitForZeroLog asserts a log marker appears on the given Zero within reconcileTimeout. +func waitForZeroLog(t *testing.T, c *dgraphtest.LocalCluster, id int, substr string) { + t.Helper() + require.NoError(t, + c.WaitForZeroLog(id, substr, reconcileTimeout, reconcilePoll), + "expected log marker %q on zero%d", substr, id) +} + +// requireNoLog asserts the given marker is not present in the Zero's logs. +func requireNoLog(t *testing.T, c *dgraphtest.LocalCluster, id int, substr string) { + t.Helper() + found, err := c.ZeroLogContains(id, substr) + require.NoError(t, err) + require.False(t, found, "unexpected log marker %q on zero%d", substr, id) +} + +// waitForAllZeros polls /state on zero0 until all numZeros members are visible, +// ensuring the Raft group has fully formed before tests run. Required for +// multi-Zero tests that call GetZeroLeader/GetZeroFollower immediately after Start(). +func waitForAllZeros(t *testing.T, c *dgraphtest.LocalCluster, numZeros int) { + t.Helper() + require.Eventually(t, func() bool { + state, err := c.GetZeroState(0) + if err != nil { + return false + } + return len(state.Zeros) == numZeros + }, reconcileTimeout, reconcilePoll, + "expected %d zeros in /state, cluster did not stabilize", numZeros) +} + +func newSingleZeroCluster(t *testing.T) *dgraphtest.LocalCluster { + t.Helper() + return newCluster(t, 1) +} + +func newMultiZeroCluster(t *testing.T) *dgraphtest.LocalCluster { + t.Helper() + return newCluster(t, 3) +} + +func newCluster(t *testing.T, numZeros int) *dgraphtest.LocalCluster { + t.Helper() + conf := dgraphtest.NewClusterConfig(). + WithNumAlphas(1). + WithNumZeros(numZeros). + WithReplicas(1) + c, err := dgraphtest.NewLocalCluster(conf) + require.NoError(t, err) + t.Cleanup(func() { c.Cleanup(t.Failed()) }) + require.NoError(t, c.Start()) + if numZeros > 1 { + waitForAllZeros(t, c, numZeros) + } + return c +} + +// --------------------------------------------------------------------------- +// Scenario 1: Leader changes address, reconciles itself (single Zero). +// --------------------------------------------------------------------------- + +func TestZeroAddr_LeaderSelfReconcile(t *testing.T) { + c := newSingleZeroCluster(t) + + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + + require.NoError(t, c.StopAlpha(0)) + newAddr, err := c.ChangeZeroAddress(leader.ContainerIdx) + require.NoError(t, err) + require.NoError(t, c.HealthCheck(true)) + + waitForAddr(t, c, 0, leader.RaftID, newAddr) + + require.NoError(t, c.StartAlpha(0)) + require.NoError(t, c.HealthCheck(false)) +} + +// --------------------------------------------------------------------------- +// Scenario 2: WAL replay — address survives a subsequent restart. +// --------------------------------------------------------------------------- + +func TestZeroAddr_SurvivesRestart(t *testing.T) { + c := newSingleZeroCluster(t) + + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + + require.NoError(t, c.StopAlpha(0)) + + // First restart: reconcile to a new address. The log marker proves the + // ConfChangeUpdateNode was both proposed and applied. + newAddr, err := c.ChangeZeroAddress(leader.ContainerIdx) + require.NoError(t, err) + require.NoError(t, c.HealthCheck(true)) + waitForAddr(t, c, 0, leader.RaftID, newAddr) + waitForZeroLog(t, c, leader.ContainerIdx, logApplied) + + // Second restart with the SAME --my. The WAL now carries the + // ConfChangeUpdateNode from the previous run; replay alone must restore + // the reconciled address. Poll /state immediately without sleeping — + // the address must already be correct on the first successful read. + require.NoError(t, c.StopZero(leader.ContainerIdx)) + require.NoError(t, c.RecreateZero(leader.ContainerIdx)) + require.NoError(t, c.StartZero(leader.ContainerIdx)) + require.NoError(t, c.HealthCheck(true)) + waitForAddr(t, c, 0, leader.RaftID, newAddr) + + require.NoError(t, c.StartAlpha(0)) + require.NoError(t, c.HealthCheck(false)) +} + +// --------------------------------------------------------------------------- +// Scenario 4: Leader changes address in multi-Zero cluster. +// --------------------------------------------------------------------------- + +func TestZeroAddr_LeaderChangeMultiZero(t *testing.T) { + c := newMultiZeroCluster(t) + + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + follower, err := c.GetZeroFollower(0) + require.NoError(t, err) + + newAddr, err := c.ChangeZeroAddress(leader.ContainerIdx) + require.NoError(t, err) + + // The restarted leader reconciles its own address; the ConfChangeUpdateNode + // is replicated to all followers which apply it deterministically. + // Convergence of /state on both nodes proves proposal and application. + waitForAddr(t, c, leader.ContainerIdx, leader.RaftID, newAddr) + waitForAddr(t, c, follower.ContainerIdx, leader.RaftID, newAddr) +} + +// --------------------------------------------------------------------------- +// Scenario 5: Follower changes address, leader reconciles via peer map. +// --------------------------------------------------------------------------- + +func TestZeroAddr_FollowerChangeMultiZero(t *testing.T) { + c := newMultiZeroCluster(t) + + follower, err := c.GetZeroFollower(0) + require.NoError(t, err) + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + + newAddr, err := c.ChangeZeroAddress(follower.ContainerIdx) + require.NoError(t, err) + + // Poll /state on both nodes until the new address is committed. This + // covers the full path: follower reconnects → leader detects mismatch + // via transport peer map → proposes ConfChangeUpdateNode → all nodes apply. + waitForAddrLong(t, c, leader.ContainerIdx, follower.RaftID, newAddr) + waitForAddrLong(t, c, follower.ContainerIdx, follower.RaftID, newAddr) +} + +// --------------------------------------------------------------------------- +// Scenario 6: Two followers change addresses simultaneously. +// --------------------------------------------------------------------------- + +func TestZeroAddr_MultipleFollowersChange(t *testing.T) { + c := newMultiZeroCluster(t) + + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + followers, err := c.GetZeroFollowers(0) + require.NoError(t, err) + require.Len(t, followers, 2, "expected 2 followers in a 3-Zero cluster") + + newAddrs := make([]string, len(followers)) + for i, f := range followers { + addr, err := c.ChangeZeroAddress(f.ContainerIdx) + require.NoError(t, err) + newAddrs[i] = addr + } + + // The leader proposes one ConfChangeUpdateNode per 10-second cycle + // (Raft's single-pending-ConfChange constraint), so both addresses + // should converge within the extended window. + for i, f := range followers { + waitForAddrLong(t, c, leader.ContainerIdx, f.RaftID, newAddrs[i]) + } +} + +// --------------------------------------------------------------------------- +// Scenario 7: New leader reconciles stale address of the previous leader. +// --------------------------------------------------------------------------- + +func TestZeroAddr_NewLeaderReconciles(t *testing.T) { + c := newMultiZeroCluster(t) + + oldLeader, err := c.GetZeroLeader(0) + require.NoError(t, err) + require.NoError(t, c.StopZero(oldLeader.ContainerIdx)) + + newLeaderIdx := waitForNewLeader(t, c, oldLeader.ContainerIdx, 3) + t.Logf("new leader is zero%d", newLeaderIdx) + + // Restart the old leader with a new --my; it will rejoin as a follower. + // The new leader must reconcile the stale address via its peer map. + containerName, err := c.GetZeroContainerName(oldLeader.ContainerIdx) + require.NoError(t, err) + newAddr := containerName + ":5080" + require.NoError(t, c.SetZeroMyAddr(oldLeader.ContainerIdx, newAddr)) + require.NoError(t, c.RecreateZero(oldLeader.ContainerIdx)) + require.NoError(t, c.StartZero(oldLeader.ContainerIdx)) + + waitForAddr(t, c, newLeaderIdx, oldLeader.RaftID, newAddr) +} + +func waitForNewLeader(t *testing.T, c *dgraphtest.LocalCluster, excludeIdx, numZeros int) int { + t.Helper() + newLeaderIdx := -1 + require.Eventually(t, func() bool { + for i := range numZeros { + if i == excludeIdx { + continue + } + leader, err := c.GetZeroLeader(i) + if err == nil && leader.ContainerIdx != excludeIdx { + newLeaderIdx = leader.ContainerIdx + return true + } + } + return false + }, reconcileTimeout, reconcilePoll, "no new leader elected after excluding zero%d", excludeIdx) + return newLeaderIdx +} + +// --------------------------------------------------------------------------- +// Scenario 9: Follower address converges in multi-Zero Connect flow. +// --------------------------------------------------------------------------- + +func TestZeroAddr_FollowerConvergesOnSelf(t *testing.T) { + c := newMultiZeroCluster(t) + + follower, err := c.GetZeroFollower(0) + require.NoError(t, err) + + newAddr, err := c.ChangeZeroAddress(follower.ContainerIdx) + require.NoError(t, err) + + // Regardless of which path corrects it (Raft commit or serving-layer + // override), the follower's own /state must converge to --my. + // Uses the long timeout: the detection path (leader sees reconnect via + // transport peer map → 10s reconcile cycle → proposes ConfChange) is + // identical to scenario 5, which also uses multiReconcileTimeout. + waitForAddrLong(t, c, follower.ContainerIdx, follower.RaftID, newAddr) +} + +// --------------------------------------------------------------------------- +// Scenario 11: Duplicate --my is rejected by the uniqueness check. +// --------------------------------------------------------------------------- + +func TestZeroAddr_DuplicateRejected(t *testing.T) { + c := newMultiZeroCluster(t) + + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + follower, err := c.GetZeroFollower(0) + require.NoError(t, err) + originalFollowerAddr := follower.Addr + + // Reconfigure the follower with the leader's address (deliberate collision). + require.NoError(t, c.StopZero(follower.ContainerIdx)) + require.NoError(t, c.SetZeroMyAddr(follower.ContainerIdx, leader.Addr)) + require.NoError(t, c.RecreateZero(follower.ContainerIdx)) + require.NoError(t, c.StartZero(follower.ContainerIdx)) + + // Ensure the follower has rejoined the Raft group before waiting for the + // reconciliation log marker. Without this, logReconcileComplete could be + // satisfied by a cycle that fired before the duplicate address was visible + // to the leader's reconcile loop. + waitForAllZeros(t, c, 3) + + // Wait for any Zero to complete a full reconciliation cycle. logReconcileComplete + // is emitted at the end of reconcileZeroAddresses() when no proposal was made — + // which covers both "all correct" and "duplicate skipped" cases. Its presence + // proves that reconcile ran and intentionally left the address unchanged. + require.NoError(t, + c.WaitForAnyZeroLog(logReconcileComplete, multiReconcileTimeout, reconcilePoll), + "expected log marker %q on any zero", logReconcileComplete) + + // Verify via /state that the duplicate was not committed. Try each Zero + // in turn until one responds successfully. + var state *dgraphtest.ZeroState + for i := range 3 { + if s, err := c.GetZeroState(i); err == nil { + state = s + break + } + } + require.NotNil(t, state, "could not get /state from any zero") + require.Equal(t, originalFollowerAddr, state.Zeros[follower.RaftID].Addr, + "duplicate address must not overwrite existing member's address") +} + +// --------------------------------------------------------------------------- +// Scenario 12: Restart with unchanged --my is a no-op. +// --------------------------------------------------------------------------- + +func TestZeroAddr_NoChangeIsNoOp(t *testing.T) { + c := newSingleZeroCluster(t) + + leader, err := c.GetZeroLeader(0) + require.NoError(t, err) + originalAddr := leader.Addr + + require.NoError(t, c.StopAlpha(0)) + require.NoError(t, c.StopZero(leader.ContainerIdx)) + require.NoError(t, c.RecreateZero(leader.ContainerIdx)) + require.NoError(t, c.StartZero(leader.ContainerIdx)) + require.NoError(t, c.HealthCheck(true)) + + // Wait for leader election, then wait for the reconciliation loop to + // complete at least one full cycle. The "complete" marker is emitted + // only when the loop runs and finds no mismatches, so its presence + // proves the loop ran — making the no-proposal assertion meaningful. + waitForZeroLog(t, c, leader.ContainerIdx, logBecameLeader) + waitForZeroLog(t, c, leader.ContainerIdx, logReconcileComplete) + + requireNoLog(t, c, leader.ContainerIdx, logProposed) + requireNoLog(t, c, leader.ContainerIdx, logApplied) + + state, err := c.GetZeroState(0) + require.NoError(t, err) + require.Equal(t, originalAddr, state.Zeros[leader.RaftID].Addr, + "address must not change when --my is unchanged") + + require.NoError(t, c.StartAlpha(0)) + require.NoError(t, c.HealthCheck(false)) +} diff --git a/worker/groups.go b/worker/groups.go index 5c5984e68da..427285cdf1d 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -323,10 +323,16 @@ func (g *groupi) applyState(myId uint64, state *pb.MembershipState) { atomic.StoreUint64(&g.membershipChecksum, group.Checksum) } } - for _, member := range g.state.Zeros { + for id, member := range g.state.Zeros { if x.WorkerConfig.MyAddr != member.Addr { conn.GetPools().Connect(member.Addr, x.WorkerConfig.TLSClientConfig) } + // Remove stale pool if this Zero changed address. + if oldState != nil { + if old, ok := oldState.Zeros[id]; ok && old.Addr != member.Addr { + conn.GetPools().Remove(old.Addr) + } + } } if !foundSelf { // I'm not part of this cluster. I should crash myself.