Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions conn/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,11 @@ func (n *Node) DeletePeer(pid uint64) {

var errInternalRetry = errors.New("Retry proposal again")

func (n *Node) proposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
// ProposeConfChange proposes a Raft configuration change (add, remove, or
// update a node) and blocks until it is committed or the context expires.
// It is used by both the conn package internally and by the zero package
// (for address reconciliation via ConfChangeUpdateNode).
func (n *Node) ProposeConfChange(ctx context.Context, conf raftpb.ConfChange) error {
cctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()

Expand Down Expand Up @@ -599,7 +603,7 @@ func (n *Node) addToCluster(ctx context.Context, rc *pb.RaftContext) error {
for err == errInternalRetry {
glog.Infof("Trying to add %#x to cluster. Addr: %v\n", pid, rc.Addr)
glog.Infof("Current confstate at %#x: %+v\n", n.Id, n.ConfState())
err = n.proposeConfChange(ctx, cc)
err = n.ProposeConfChange(ctx, cc)
}
return err
}
Expand All @@ -618,7 +622,7 @@ func (n *Node) ProposePeerRemoval(ctx context.Context, id uint64) error {
}
err := errInternalRetry
for err == errInternalRetry {
err = n.proposeConfChange(ctx, cc)
err = n.ProposeConfChange(ctx, cc)
}
return err
}
Expand Down
6 changes: 6 additions & 0 deletions conn/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
}
}

// Remove disconnects and removes the pool for addr. It is a no-op if addr is
// not in the pool. Used to clean up stale connections after an address change.
func (p *Pools) Remove(addr string) {
p.remove(addr)
}

func (p *Pools) remove(addr string) {
p.Lock()
defer p.Unlock()
Expand Down
12 changes: 12 additions & 0 deletions dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ func printEntry(es raftpb.Entry, pending map[uint64]bool, isZero bool) {
fmt.Fprintf(&buf, "%d . %d . %v . %-6s . %8d .", es.Term, es.Index, es.Type,
humanize.Bytes(uint64(es.Size())), key)
if es.Type == raftpb.EntryConfChange {
var cc raftpb.ConfChange
if err := cc.Unmarshal(es.Data); err != nil {
fmt.Fprintf(&buf, " [ConfChange unmarshal error: %v]", err)
return
}
fmt.Fprintf(&buf, " Type: %s . NodeID: %#x .", cc.Type, cc.NodeID)
if len(cc.Context) > 0 {
var rc pb.RaftContext
if err := proto.Unmarshal(cc.Context, &rc); err == nil {
fmt.Fprintf(&buf, " RaftContext: %s", rc.String())
}
}
return
}
if len(es.Data) == 0 {
Expand Down
103 changes: 103 additions & 0 deletions dgraph/cmd/zero/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,28 @@ func (n *node) applyConfChange(e raftpb.Entry) {
n.DeletePeer(cc.NodeID)
n.server.removeZero(cc.NodeID)

} else if cc.Type == raftpb.ConfChangeUpdateNode && len(cc.Context) > 0 {
// ConfChangeUpdateNode is a Raft-provided mechanism for updating node
// metadata (e.g. addresses) without changing cluster membership. It is
// a no-op inside the Raft library; the application is responsible for
// interpreting the Context bytes. We use it to fix stale Zero addresses
// that were baked into the WAL at initial bootstrap.
var rc pb.RaftContext
x.Check(proto.Unmarshal(cc.Context, &rc))

// Drop the stale pool before connecting to the new address so that
// repeated dial errors to the old address are eliminated immediately.
if oldMember, ok := n.server.membershipState().GetZeros()[rc.Id]; ok {
if oldMember.GetAddr() != rc.Addr {
conn.GetPools().Remove(oldMember.GetAddr())
}
}
go n.Connect(rc.Id, rc.Addr)

m := &pb.Member{Id: rc.Id, Addr: rc.Addr, GroupId: 0, Learner: rc.IsLearner}
n.server.storeZero(m)
glog.Infof("Applied ConfChangeUpdateNode for Zero %#x: addr=%q", rc.Id, rc.Addr)

} else if len(cc.Context) > 0 {
var rc pb.RaftContext
x.Check(proto.Unmarshal(cc.Context, &rc))
Expand Down Expand Up @@ -696,12 +718,89 @@ func (n *node) updateZeroMembershipPeriodically(closer *z.Closer) {
select {
case <-ticker:
n.server.updateZeroLeader()
n.reconcileZeroAddresses()
case <-closer.HasBeenClosed():
return
}
}
}

// reconcileZeroAddresses detects mismatches between the current --my address
// (or transport-layer peer addresses) and what is stored in MembershipState,
// then proposes a ConfChangeUpdateNode through Raft to correct them. This
// ensures that stale addresses baked into the WAL at initial bootstrap are
// replaced with the current addresses after a restart. Only the leader can
// propose, so this function is a no-op on followers.
func (n *node) reconcileZeroAddresses() {
if !n.AmLeader() {
return
}

state := n.server.membershipState()
if state == nil {
return
}

for id, zero := range state.GetZeros() {
var correctAddr string

if id == n.Id {
correctAddr = n.RaftContext.Addr
} else if peerAddr, ok := n.Peer(id); ok {
correctAddr = peerAddr
} else {
continue
}

if correctAddr == zero.GetAddr() {
continue
}

// Validate: ensure no other Zero already claims this address.
duplicate := false
for otherId, otherZero := range state.GetZeros() {
if otherId != id && otherZero.GetAddr() == correctAddr {
glog.Warningf("Skipping address reconciliation for Zero %#x: "+
"address %q already used by Zero %#x", id, correctAddr, otherId)
duplicate = true
break
}
}
if duplicate {
continue
}

glog.Infof("Zero %#x address mismatch: MembershipState has %q, expected %q. "+
"Proposing ConfChangeUpdateNode.", id, zero.GetAddr(), correctAddr)

rc := &pb.RaftContext{
Id: id,
Addr: correctAddr,
Group: 0,
}
data, err := proto.Marshal(rc)
if err != nil {
glog.Errorf("Error marshalling RaftContext for address update: %v", err)
continue
}

cc := raftpb.ConfChange{
Type: raftpb.ConfChangeUpdateNode,
NodeID: id,
Context: data,
}
if err := n.ProposeConfChange(n.ctx, cc); err != nil {
glog.Errorf("Failed to propose ConfChangeUpdateNode for Zero %#x: %v", id, err)
}
// Propose one update at a time to respect Raft's single pending
// ConfChange constraint. The next mismatch will be handled in the
// following periodic cycle.
return
}
// V(2): emitted on every 10s tick, too noisy for production at Info level.
glog.V(2).Infof("Zero address reconciliation complete: all addresses up to date")
}

func (n *node) checkQuorum(closer *z.Closer) {
defer closer.Done()
ticker := time.Tick(time.Second)
Expand Down Expand Up @@ -923,6 +1022,10 @@ func (n *node) Run() {
if rd.RaftState == raft.StateLeader && !leader {
glog.Infoln("I've become the leader, updating leases.")
n.server.updateLeases()
// Eagerly reconcile on leader election so stale WAL
// addresses are corrected without waiting for the
// periodic 10s tick.
go n.reconcileZeroAddresses()
}
leader = rd.RaftState == raft.StateLeader
// group id hardcoded as 0
Expand Down
11 changes: 11 additions & 0 deletions dgraph/cmd/zero/zero.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,17 @@ func (s *Server) Connect(ctx context.Context,
return nil, err
}

// Ensure this Zero's own address in MembershipState reflects the current
// --my flag, even before ConfChangeUpdateNode has been committed through
// Raft. This prevents Alphas from receiving a stale address during the
// brief window between restart and reconciliation.
myAddr := s.Node.RaftContext.Addr
if myId := s.Node.Id; myAddr != "" {
if z, ok := ms.GetZeros()[myId]; ok && z.GetAddr() != myAddr {
z.Addr = myAddr
}
}

if m.ClusterInfoOnly {
// This request only wants to access the membership state, and nothing else. Most likely
// from our clients.
Expand Down
17 changes: 11 additions & 6 deletions dgraphtest/dgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,12 @@ type dnode interface {
}

type zero struct {
id int // 0, 1, 2
containerID string // container ID in docker world
containerName string // something like test-1234_zero2
aliasName string // something like alpha0, zero1
isRunning bool
id int // 0, 1, 2
containerID string // container ID in docker world
containerName string // something like test-1234_zero2
aliasName string // something like alpha0, zero1
isRunning bool
myAddrOverride string // if set, overrides the --my flag value
}

func (z *zero) cname() string {
Expand Down Expand Up @@ -128,7 +129,11 @@ func (z *zero) bindings(offset int) nat.PortMap {
}

func (z *zero) cmd(c *LocalCluster) []string {
zcmd := []string{"/gobin/dgraph", "zero", fmt.Sprintf("--my=%s:%v", z.aname(), zeroGrpcPort), "--bindall",
myAddr := fmt.Sprintf("%s:%v", z.aname(), zeroGrpcPort)
if z.myAddrOverride != "" {
myAddr = z.myAddrOverride
}
zcmd := []string{"/gobin/dgraph", "zero", fmt.Sprintf("--my=%s", myAddr), "--bindall",
fmt.Sprintf(`--replicas=%v`, c.conf.replicas), "--logtostderr", fmt.Sprintf("-v=%d", c.conf.verbosity)}

if c.lowerThanV21 {
Expand Down
69 changes: 69 additions & 0 deletions dgraphtest/local_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,75 @@ func (c *LocalCluster) StopZero(id int) error {
return c.stopContainer(c.zeros[id])
}

// GetZeroContainerName returns the Docker container name for the specified Zero,
// which also serves as a DNS alias on the cluster network.
func (c *LocalCluster) GetZeroContainerName(id int) (string, error) {
if id >= c.conf.numZeros {
return "", fmt.Errorf("invalid id of zero: %v", id)
}
return c.zeros[id].containerName, nil
}

// SetZeroMyAddr overrides the --my flag for the specified Zero node. The next
// time the container is recreated (via RecreateZero), this address will be
// used instead of the default container alias.
func (c *LocalCluster) SetZeroMyAddr(id int, addr string) error {
if id >= c.conf.numZeros {
return fmt.Errorf("invalid id of zero: %v", id)
}
c.zeros[id].myAddrOverride = addr
return nil
}

// RecreateZero destroys the Zero container and creates a new one with the
// current command-line flags (e.g. a different --my address). The Zero's data
// directory is preserved across the recreation by extracting it from the old
// (stopped) container and injecting it into the new one. This simulates a
// process restart with persistent storage — the exact scenario where stale WAL
// addresses surface.
func (c *LocalCluster) RecreateZero(id int) error {
if id >= c.conf.numZeros {
return fmt.Errorf("invalid id of zero: %v", id)
}
z := c.zeros[id]

ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
defer cancel()

// Extract the data directory from the stopped container so the WAL survives.
dataReader, _, err := c.dcli.CopyFromContainer(ctx, z.cid(), zeroWorkingDir)
if err != nil {
return errors.Wrapf(err, "error copying data from zero container [%v]", z.cname())
}
defer dataReader.Close()

// Read the tar into memory (Zero WAL is small).
dataTar, err := io.ReadAll(dataReader)
if err != nil {
return errors.Wrap(err, "error reading zero data tar")
}

ro := container.RemoveOptions{RemoveVolumes: true, Force: true}
if err := c.dcli.ContainerRemove(ctx, z.cid(), ro); err != nil {
return errors.Wrapf(err, "error removing zero container [%v]", z.cname())
}

cid, err := c.createContainer(z)
if err != nil {
return errors.Wrapf(err, "error recreating zero container [%v]", z.cname())
}
z.containerID = cid

// Inject the data directory into the new container. CopyToContainer expects
// a tar archive rooted at the parent of the target path.
if err := c.dcli.CopyToContainer(ctx, cid, "/data", bytes.NewReader(dataTar),
container.CopyToContainerOptions{}); err != nil {
return errors.Wrapf(err, "error restoring data to zero container [%v]", z.cname())
}

return nil
}

func (c *LocalCluster) StopAlpha(id int) error {
if id >= c.conf.numAlphas {
return fmt.Errorf("invalid id of alpha: %v", id)
Expand Down
Loading
Loading