Skip to content

Commit dbf0974

Browse files
Shivaji KharseShivaji Kharse
authored andcommitted
add tests and fix issues related logs
1 parent 07457d1 commit dbf0974

6 files changed

Lines changed: 675 additions & 84 deletions

File tree

conn/pool.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,12 @@ func (p *Pools) RemoveInvalid(state *pb.MembershipState) {
124124
}
125125
}
126126

127+
// Remove disconnects and removes the pool for addr. It is a no-op if addr is
128+
// not in the pool. Used to clean up stale connections after an address change.
129+
func (p *Pools) Remove(addr string) {
130+
p.remove(addr)
131+
}
132+
127133
func (p *Pools) remove(addr string) {
128134
p.Lock()
129135
defer p.Unlock()

dgraph/cmd/zero/raft.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,14 @@ func (n *node) applyConfChange(e raftpb.Entry) {
508508
// that were baked into the WAL at initial bootstrap.
509509
var rc pb.RaftContext
510510
x.Check(proto.Unmarshal(cc.Context, &rc))
511+
512+
// Drop the stale pool before connecting to the new address so that
513+
// repeated dial errors to the old address are eliminated immediately.
514+
if oldMember, ok := n.server.membershipState().GetZeros()[rc.Id]; ok {
515+
if oldMember.GetAddr() != rc.Addr {
516+
conn.GetPools().Remove(oldMember.GetAddr())
517+
}
518+
}
511519
go n.Connect(rc.Id, rc.Addr)
512520

513521
m := &pb.Member{Id: rc.Id, Addr: rc.Addr, GroupId: 0, Learner: rc.IsLearner}
@@ -789,6 +797,8 @@ func (n *node) reconcileZeroAddresses() {
789797
// following periodic cycle.
790798
return
791799
}
800+
// V(2): emitted on every 10s tick, too noisy for production at Info level.
801+
glog.V(2).Infof("Zero address reconciliation complete: all addresses up to date")
792802
}
793803

794804
func (n *node) checkQuorum(closer *z.Closer) {

dgraphtest/local_cluster.go

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1075,19 +1075,6 @@ func (c *LocalCluster) GetAlphaHttpClient(alphaID int) (*dgraphapi.HTTPClient, e
10751075
return dgraphapi.GetHttpClient(url, "")
10761076
}
10771077

1078-
// GetZeroStateURL returns the full HTTP URL for the /state endpoint of the
1079-
// specified Zero node.
1080-
func (c *LocalCluster) GetZeroStateURL(id int) (string, error) {
1081-
if id >= c.conf.numZeros {
1082-
return "", fmt.Errorf("invalid id of zero: %v", id)
1083-
}
1084-
pubPort, err := publicPort(c.dcli, c.zeros[id], zeroHttpPort)
1085-
if err != nil {
1086-
return "", err
1087-
}
1088-
return "http://0.0.0.0:" + pubPort + "/state", nil
1089-
}
1090-
10911078
// serverURL returns url to the 'server' 'endpoint'
10921079
func (c *LocalCluster) serverURL(server, endpoint string) (string, error) {
10931080
pubPort, err := publicPort(c.dcli, c.alphas[0], alphaHttpPort)

dgraphtest/zero_state.go

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package dgraphtest
7+
8+
import (
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"strings"
14+
"time"
15+
16+
"github.com/pkg/errors"
17+
)
18+
19+
// ZeroMember is a minimal view of a Zero member as reported by the /state
20+
// endpoint. Only the fields relevant to address reconciliation are included.
21+
type ZeroMember struct {
22+
Addr string `json:"addr"`
23+
Leader bool `json:"leader"`
24+
}
25+
26+
// ZeroState is the subset of Zero's /state response that we care about for
27+
// testing. It intentionally mirrors only the fields used by tests, keeping
28+
// unmarshal resilient to unrelated schema changes.
29+
type ZeroState struct {
30+
Zeros map[string]ZeroMember `json:"zeros"`
31+
}
32+
33+
// GetZeroStateURL returns the full HTTP URL of a Zero's /state endpoint.
34+
func (c *LocalCluster) GetZeroStateURL(id int) (string, error) {
35+
if id >= c.conf.numZeros {
36+
return "", fmt.Errorf("invalid id of zero: %v", id)
37+
}
38+
pubPort, err := publicPort(c.dcli, c.zeros[id], zeroHttpPort)
39+
if err != nil {
40+
return "", err
41+
}
42+
return "http://0.0.0.0:" + pubPort + "/state", nil
43+
}
44+
45+
// GetZeroState queries the /state endpoint on the specified Zero and returns
46+
// the parsed membership snapshot.
47+
func (c *LocalCluster) GetZeroState(id int) (*ZeroState, error) {
48+
stateURL, err := c.GetZeroStateURL(id)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
resp, err := http.Get(stateURL)
54+
if err != nil {
55+
return nil, errors.Wrapf(err, "GET %s", stateURL)
56+
}
57+
defer func() { _ = resp.Body.Close() }()
58+
59+
body, err := io.ReadAll(resp.Body)
60+
if err != nil {
61+
return nil, errors.Wrap(err, "reading /state body")
62+
}
63+
if resp.StatusCode != http.StatusOK {
64+
return nil, fmt.Errorf("/state HTTP %d: %s", resp.StatusCode, body)
65+
}
66+
67+
var state ZeroState
68+
if err := json.Unmarshal(body, &state); err != nil {
69+
return nil, errors.Wrapf(err, "unmarshal /state (body: %s)", string(body))
70+
}
71+
return &state, nil
72+
}
73+
74+
// ZeroLocator identifies a Zero by its container index and Raft ID.
75+
type ZeroLocator struct {
76+
ContainerIdx int
77+
RaftID string
78+
Addr string
79+
}
80+
81+
// GetZeroLeader returns the locator of the current Zero leader, as reported by
82+
// the Zero at queryIdx.
83+
func (c *LocalCluster) GetZeroLeader(queryIdx int) (*ZeroLocator, error) {
84+
return c.findZero(queryIdx, true)
85+
}
86+
87+
// GetZeroFollower returns the locator of any Zero that is not the leader, as
88+
// reported by the Zero at queryIdx.
89+
func (c *LocalCluster) GetZeroFollower(queryIdx int) (*ZeroLocator, error) {
90+
return c.findZero(queryIdx, false)
91+
}
92+
93+
// GetZeroFollowers returns the locators of all non-leader Zeros, as reported
94+
// by the Zero at queryIdx.
95+
func (c *LocalCluster) GetZeroFollowers(queryIdx int) ([]ZeroLocator, error) {
96+
state, err := c.GetZeroState(queryIdx)
97+
if err != nil {
98+
return nil, err
99+
}
100+
var followers []ZeroLocator
101+
for id, z := range state.Zeros {
102+
if z.Leader {
103+
continue
104+
}
105+
idx, ok := containerIdxFromAddr(z.Addr)
106+
if !ok {
107+
return nil, fmt.Errorf("cannot map follower addr %q to container index", z.Addr)
108+
}
109+
followers = append(followers, ZeroLocator{ContainerIdx: idx, RaftID: id, Addr: z.Addr})
110+
}
111+
return followers, nil
112+
}
113+
114+
func (c *LocalCluster) findZero(queryIdx int, wantLeader bool) (*ZeroLocator, error) {
115+
state, err := c.GetZeroState(queryIdx)
116+
if err != nil {
117+
return nil, err
118+
}
119+
for id, z := range state.Zeros {
120+
if z.Leader != wantLeader {
121+
continue
122+
}
123+
idx, ok := containerIdxFromAddr(z.Addr)
124+
if !ok {
125+
return nil, fmt.Errorf("cannot map addr %q to container index", z.Addr)
126+
}
127+
return &ZeroLocator{ContainerIdx: idx, RaftID: id, Addr: z.Addr}, nil
128+
}
129+
role := "follower"
130+
if wantLeader {
131+
role = "leader"
132+
}
133+
return nil, fmt.Errorf("no %s found in /state from zero%d", role, queryIdx)
134+
}
135+
136+
// WaitForZeroAddress polls /state until the member identified by raftID has
137+
// wantAddr. It first tries queryIdx; if that node's /state is unavailable
138+
// (e.g. it temporarily lost quorum during a leadership change) it falls back
139+
// to any other Zero in the cluster. The last observed address and an error
140+
// are returned if the target is not reached within timeout.
141+
func (c *LocalCluster) WaitForZeroAddress(queryIdx int, raftID, wantAddr string,
142+
timeout, poll time.Duration) (string, error) {
143+
144+
deadline := time.Now().Add(timeout)
145+
var lastAddr string
146+
var lastErr error
147+
for time.Now().Before(deadline) {
148+
// Try queryIdx first, fall back to other nodes when quorum is transiently lost.
149+
for _, idx := range append([]int{queryIdx}, otherZeroIdxs(queryIdx, c.conf.numZeros)...) {
150+
state, err := c.GetZeroState(idx)
151+
if err != nil {
152+
lastErr = err
153+
continue
154+
}
155+
z, ok := state.Zeros[raftID]
156+
if !ok {
157+
lastErr = fmt.Errorf("raft id %s not present in /state", raftID)
158+
continue
159+
}
160+
lastAddr = z.Addr
161+
if lastAddr == wantAddr {
162+
return lastAddr, nil
163+
}
164+
break
165+
}
166+
time.Sleep(poll)
167+
}
168+
if lastErr != nil {
169+
return lastAddr, errors.Wrapf(lastErr,
170+
"timed out waiting for zero %s addr=%q on zero%d (last seen %q)",
171+
raftID, wantAddr, queryIdx, lastAddr)
172+
}
173+
return lastAddr, fmt.Errorf(
174+
"timed out waiting for zero %s addr=%q on zero%d (last seen %q)",
175+
raftID, wantAddr, queryIdx, lastAddr)
176+
}
177+
178+
// otherZeroIdxs returns all zero indices except exclude, preserving order.
179+
func otherZeroIdxs(exclude, numZeros int) []int {
180+
var out []int
181+
for i := range numZeros {
182+
if i != exclude {
183+
out = append(out, i)
184+
}
185+
}
186+
return out
187+
}
188+
189+
// ChangeZeroAddress reconfigures the Zero at id with a new --my flag that
190+
// points to its Docker container name (a valid DNS alias on the cluster
191+
// network). The Zero is stopped, recreated with its WAL preserved, and
192+
// restarted. The new address is returned so callers can assert convergence.
193+
func (c *LocalCluster) ChangeZeroAddress(id int) (string, error) {
194+
if err := c.StopZero(id); err != nil {
195+
return "", err
196+
}
197+
containerName, err := c.GetZeroContainerName(id)
198+
if err != nil {
199+
return "", err
200+
}
201+
newAddr := containerName + ":" + zeroGrpcPort
202+
if err := c.SetZeroMyAddr(id, newAddr); err != nil {
203+
return "", err
204+
}
205+
if err := c.RecreateZero(id); err != nil {
206+
return "", err
207+
}
208+
if err := c.StartZero(id); err != nil {
209+
return "", err
210+
}
211+
return newAddr, nil
212+
}
213+
214+
// containerIdxFromAddr extracts the numeric suffix from the "zeroN" segment
215+
// of a member's address and returns it as the container index used by tests.
216+
func containerIdxFromAddr(addr string) (int, bool) {
217+
for i := range maxContainerIdxScan {
218+
if strings.Contains(addr, fmt.Sprintf("zero%d", i)) {
219+
return i, true
220+
}
221+
}
222+
return -1, false
223+
}
224+
225+
// maxContainerIdxScan bounds the zero N scan in containerIdxFromAddr. It is
226+
// larger than any realistic test topology.
227+
const maxContainerIdxScan = 32
228+
229+
// GetZeroLogs returns the full container stdout/stderr log for the Zero at id.
230+
func (c *LocalCluster) GetZeroLogs(id int) (string, error) {
231+
if id >= c.conf.numZeros {
232+
return "", fmt.Errorf("invalid id of zero: %v", id)
233+
}
234+
return c.getLogs(c.zeros[id].containerID)
235+
}
236+
237+
// WaitForZeroLog polls the Zero's container log until it contains substr, or
238+
// the timeout expires. It is the log-based equivalent of WaitForZeroAddress
239+
// and lets tests react to concrete events (e.g. a Raft proposal being applied)
240+
// instead of sleeping for an arbitrary duration.
241+
func (c *LocalCluster) WaitForZeroLog(id int, substr string, timeout, poll time.Duration) error {
242+
deadline := time.Now().Add(timeout)
243+
var lastErr error
244+
for time.Now().Before(deadline) {
245+
logs, err := c.GetZeroLogs(id)
246+
if err != nil {
247+
lastErr = err
248+
} else if strings.Contains(logs, substr) {
249+
return nil
250+
}
251+
time.Sleep(poll)
252+
}
253+
if lastErr != nil {
254+
return errors.Wrapf(lastErr, "timed out waiting for log substring %q on zero%d",
255+
substr, id)
256+
}
257+
return fmt.Errorf("timed out waiting for log substring %q on zero%d", substr, id)
258+
}
259+
260+
// ZeroLogContains reports whether the Zero's current log output contains substr.
261+
func (c *LocalCluster) ZeroLogContains(id int, substr string) (bool, error) {
262+
logs, err := c.GetZeroLogs(id)
263+
if err != nil {
264+
return false, err
265+
}
266+
return strings.Contains(logs, substr), nil
267+
}
268+
269+
// WaitForAnyZeroLog polls all Zero containers until any one of them contains
270+
// substr, or the timeout expires. Use this when a log marker may appear on any
271+
// node (e.g. after a leadership change caused by the test scenario itself).
272+
func (c *LocalCluster) WaitForAnyZeroLog(substr string, timeout, poll time.Duration) error {
273+
deadline := time.Now().Add(timeout)
274+
for time.Now().Before(deadline) {
275+
for i := range c.conf.numZeros {
276+
logs, err := c.GetZeroLogs(i)
277+
if err != nil {
278+
continue
279+
}
280+
if strings.Contains(logs, substr) {
281+
return nil
282+
}
283+
}
284+
time.Sleep(poll)
285+
}
286+
return fmt.Errorf("timed out waiting for log substring %q on any zero", substr)
287+
}

0 commit comments

Comments
 (0)