Skip to content

Commit a440855

Browse files
committed
fix: implement more robust retries for Watch
1 parent 80884b3 commit a440855

4 files changed

Lines changed: 246 additions & 59 deletions

File tree

internal/client/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ const (
4444
defaultRetryJitterFraction = 0.5
4545
importBulkRoute = "/authzed.api.v1.PermissionsService/ImportBulkRelationships"
4646
exportBulkRoute = "/authzed.api.v1.PermissionsService/ExportBulkRelationships"
47+
watchRoute = "/authzed.api.v1.WatchService/Watch"
4748
)
4849

4950
// NewClient defines an (overridable) means of creating a new client.
@@ -235,7 +236,7 @@ func DialOptsFromFlags(cmd *cobra.Command, token storage.Token) ([]grpc.DialOpti
235236
// retrying the bulk import in backup/restore logic is handled manually.
236237
// retrying bulk export is also handled manually, because the default behavior is
237238
// to start at the beginning of the stream, which produces duplicate values.
238-
selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute))),
239+
selector.StreamClientInterceptor(retry.StreamClientInterceptor(retryOpts...), selector.MatchFunc(isNoneOf(importBulkRoute, exportBulkRoute, watchRoute))),
239240
}
240241

241242
if !cobrautil.MustGetBool(cmd, "skip-version-check") {

internal/client/client_test.go

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -133,30 +133,36 @@ func TestGetCurrentTokenWithCLIOverrideWithoutSecretFile(t *testing.T) {
133133
require.Equal(&bTrue, token.Insecure)
134134
}
135135

136-
type fakeSchemaServer struct {
136+
type fakeServer struct {
137137
v1.UnimplementedSchemaServiceServer
138138
v1.UnimplementedExperimentalServiceServer
139+
v1.UnimplementedWatchServiceServer
139140
v1.UnimplementedPermissionsServiceServer
140141
testFunc func()
141142
}
142143

143-
func (fss *fakeSchemaServer) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) {
144+
func (fss *fakeServer) ReadSchema(_ context.Context, _ *v1.ReadSchemaRequest) (*v1.ReadSchemaResponse, error) {
144145
fss.testFunc()
145146
return nil, status.Error(codes.Unavailable, "")
146147
}
147148

148-
func (fss *fakeSchemaServer) ImportBulkRelationships(grpc.ClientStreamingServer[v1.ImportBulkRelationshipsRequest, v1.ImportBulkRelationshipsResponse]) error {
149+
func (fss *fakeServer) ImportBulkRelationships(grpc.ClientStreamingServer[v1.ImportBulkRelationshipsRequest, v1.ImportBulkRelationshipsResponse]) error {
149150
fss.testFunc()
150151
return status.Errorf(codes.Aborted, "")
151152
}
152153

154+
func (fss *fakeServer) Watch(*v1.WatchRequest, grpc.ServerStreamingServer[v1.WatchResponse]) error {
155+
fss.testFunc()
156+
return status.Errorf(codes.Unavailable, "")
157+
}
158+
153159
func TestRetries(t *testing.T) {
154160
ctx := t.Context()
155161
var callCount uint
156162
lis := bufconn.Listen(1024 * 1024)
157163
s := grpc.NewServer()
158164

159-
fakeServer := &fakeSchemaServer{testFunc: func() {
165+
fakeServer := &fakeServer{testFunc: func() {
160166
callCount++
161167
}}
162168
v1.RegisterSchemaServiceServer(s, fakeServer)
@@ -185,22 +191,25 @@ func TestRetries(t *testing.T) {
185191
c, err := authzed.NewClient("passthrough://bufnet", dialOpts...)
186192
require.NoError(t, err)
187193

188-
_, err = c.ReadSchema(ctx, &v1.ReadSchemaRequest{})
189-
grpcutil.RequireStatus(t, codes.Unavailable, err)
190-
require.Equal(t, retries, callCount)
194+
t.Run("read_schema", func(t *testing.T) {
195+
_, err = c.ReadSchema(ctx, &v1.ReadSchemaRequest{})
196+
grpcutil.RequireStatus(t, codes.Unavailable, err)
197+
require.Equal(t, retries, callCount)
198+
})
191199
}
192200

193-
func TestDoesNotRetryBackupRestore(t *testing.T) {
201+
func TestDoesNotRetry(t *testing.T) {
194202
ctx := t.Context()
195203
var callCount uint
196204
lis := bufconn.Listen(1024 * 1024)
197205
s := grpc.NewServer()
198206

199-
fakeServer := &fakeSchemaServer{testFunc: func() {
207+
fakeServer := &fakeServer{testFunc: func() {
200208
callCount++
201209
}}
202210
v1.RegisterPermissionsServiceServer(s, fakeServer)
203211
v1.RegisterExperimentalServiceServer(s, fakeServer)
212+
v1.RegisterWatchServiceServer(s, fakeServer)
204213

205214
go func() {
206215
_ = s.Serve(lis)
@@ -226,20 +235,23 @@ func TestDoesNotRetryBackupRestore(t *testing.T) {
226235
c, err := authzed.NewClientWithExperimentalAPIs("passthrough://bufnet", dialOpts...)
227236
require.NoError(t, err)
228237

229-
ibc, err := c.ImportBulkRelationships(ctx)
230-
require.NoError(t, err)
231-
err = ibc.SendMsg(&v1.ImportBulkRelationshipsRequest{})
232-
require.NoError(t, err)
233-
_, err = ibc.CloseAndRecv()
234-
grpcutil.RequireStatus(t, codes.Aborted, err)
235-
require.Equal(t, uint(1), callCount)
238+
t.Run("import_bulk", func(t *testing.T) {
239+
ibc, err := c.ImportBulkRelationships(ctx)
240+
require.NoError(t, err)
241+
err = ibc.SendMsg(&v1.ImportBulkRelationshipsRequest{})
242+
require.NoError(t, err)
243+
_, err = ibc.CloseAndRecv()
244+
grpcutil.RequireStatus(t, codes.Aborted, err)
245+
require.Equal(t, uint(1), callCount)
246+
})
236247

237-
callCount = 0
238-
bic, err := c.ImportBulkRelationships(ctx)
239-
require.NoError(t, err)
240-
err = bic.SendMsg(&v1.ImportBulkRelationshipsRequest{})
241-
require.NoError(t, err)
242-
_, err = bic.CloseAndRecv()
243-
grpcutil.RequireStatus(t, codes.Aborted, err)
244-
require.Equal(t, uint(1), callCount)
248+
t.Run("watch", func(t *testing.T) {
249+
callCount = 0
250+
watchReq, err := c.Watch(ctx, &v1.WatchRequest{})
251+
require.NoError(t, err)
252+
resp, err := watchReq.Recv()
253+
require.Nil(t, resp)
254+
grpcutil.RequireStatus(t, codes.Unavailable, err)
255+
require.Equal(t, uint(1), callCount)
256+
})
245257
}

internal/commands/watch.go

Lines changed: 80 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import (
99
"syscall"
1010
"time"
1111

12+
"github.com/rs/zerolog/log"
1213
"github.com/spf13/cobra"
14+
"google.golang.org/grpc/codes"
15+
"google.golang.org/grpc/status"
1316

1417
v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
1518

@@ -44,20 +47,24 @@ func RegisterWatchRelationshipCmd(parentCmd *cobra.Command) *cobra.Command {
4447

4548
var watchCmd = &cobra.Command{
4649
Use: "watch [object_types, ...] [start_cursor]",
47-
Short: "Watches the stream of relationship updates from the server",
50+
Short: "Watches the stream of relationship updates and schema updates from the server",
4851
Args: ValidationWrapper(cobra.RangeArgs(0, 2)),
4952
RunE: watchCmdFunc,
5053
Deprecated: "please use `zed relationships watch` instead",
5154
}
5255

5356
var watchRelationshipsCmd = &cobra.Command{
5457
Use: "watch [object_types, ...] [start_cursor]",
55-
Short: "Watches the stream of relationship updates from the server",
58+
Short: "Watches the stream of relationship updates and schema updates from the server",
5659
Args: ValidationWrapper(cobra.RangeArgs(0, 2)),
5760
RunE: watchCmdFunc,
5861
}
5962

6063
func watchCmdFunc(cmd *cobra.Command, _ []string) error {
64+
return watchCmdFuncImpl(cmd, processResponse)
65+
}
66+
67+
func watchCmdFuncImpl(cmd *cobra.Command, processResponse func(resp *v1.WatchResponse)) error {
6168
console.Printf("starting watch stream over types %v and revision %v\n", watchObjectTypes, watchRevision)
6269

6370
cli, err := client.NewClient(cmd)
@@ -74,20 +81,25 @@ func watchCmdFunc(cmd *cobra.Command, _ []string) error {
7481
relFilters = append(relFilters, relFilter)
7582
}
7683

84+
ctx, cancel := context.WithCancel(cmd.Context())
85+
defer cancel()
86+
87+
signalctx, interruptCancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
88+
defer interruptCancel()
89+
7790
req := &v1.WatchRequest{
7891
OptionalObjectTypes: watchObjectTypes,
7992
OptionalRelationshipFilters: relFilters,
93+
OptionalUpdateKinds: []v1.WatchKind{
94+
v1.WatchKind_WATCH_KIND_INCLUDE_CHECKPOINTS,
95+
v1.WatchKind_WATCH_KIND_INCLUDE_SCHEMA_UPDATES,
96+
},
8097
}
98+
8199
if watchRevision != "" {
82100
req.OptionalStartCursor = &v1.ZedToken{Token: watchRevision}
83101
}
84102

85-
ctx, cancel := context.WithCancel(cmd.Context())
86-
defer cancel()
87-
88-
signalctx, interruptCancel := signal.NotifyContext(ctx, os.Interrupt, syscall.SIGTERM, syscall.SIGINT)
89-
defer interruptCancel()
90-
91103
watchStream, err := cli.Watch(ctx, req)
92104
if err != nil {
93105
return err
@@ -104,40 +116,74 @@ func watchCmdFunc(cmd *cobra.Command, _ []string) error {
104116
default:
105117
resp, err := watchStream.Recv()
106118
if err != nil {
107-
return err
108-
}
119+
err, ok := isRetryable(err)
120+
if !ok {
121+
return err
122+
}
109123

110-
for _, update := range resp.Updates {
111-
if watchTimestamps {
112-
console.Printf("%v: ", time.Now())
124+
log.Trace().Err(err).Msg("will retry from the last known revision " + watchRevision)
125+
req.OptionalStartCursor = &v1.ZedToken{Token: watchRevision}
126+
watchStream, err = cli.Watch(ctx, req)
127+
if err != nil {
128+
return err
113129
}
130+
continue
131+
}
114132

115-
switch update.Operation {
116-
case v1.RelationshipUpdate_OPERATION_CREATE:
117-
console.Printf("CREATED ")
133+
processResponse(resp)
134+
}
135+
}
136+
}
118137

119-
case v1.RelationshipUpdate_OPERATION_DELETE:
120-
console.Printf("DELETED ")
138+
func isRetryable(err error) (error, bool) {
139+
statusErr, ok := status.FromError(err)
140+
if !ok || (statusErr.Code() != codes.Unavailable) {
141+
return err, false
142+
}
143+
return nil, true
144+
}
121145

122-
case v1.RelationshipUpdate_OPERATION_TOUCH:
123-
console.Printf("TOUCHED ")
124-
}
146+
func processResponse(resp *v1.WatchResponse) {
147+
if resp.ChangesThrough != nil {
148+
watchRevision = resp.ChangesThrough.Token
149+
}
125150

126-
subjectRelation := ""
127-
if update.Relationship.Subject.OptionalRelation != "" {
128-
subjectRelation = " " + update.Relationship.Subject.OptionalRelation
129-
}
151+
if resp.SchemaUpdated {
152+
if watchTimestamps {
153+
console.Printf("%v: ", time.Now())
154+
}
155+
console.Println("SCHEMA UPDATED")
156+
}
130157

131-
console.Printf("%s:%s %s %s:%s%s\n",
132-
update.Relationship.Resource.ObjectType,
133-
update.Relationship.Resource.ObjectId,
134-
update.Relationship.Relation,
135-
update.Relationship.Subject.Object.ObjectType,
136-
update.Relationship.Subject.Object.ObjectId,
137-
subjectRelation,
138-
)
139-
}
158+
for _, update := range resp.Updates {
159+
if watchTimestamps {
160+
console.Printf("%v: ", time.Now())
140161
}
162+
163+
switch update.Operation {
164+
case v1.RelationshipUpdate_OPERATION_CREATE:
165+
console.Printf("CREATED ")
166+
167+
case v1.RelationshipUpdate_OPERATION_DELETE:
168+
console.Printf("DELETED ")
169+
170+
case v1.RelationshipUpdate_OPERATION_TOUCH:
171+
console.Printf("TOUCHED ")
172+
}
173+
174+
subjectRelation := ""
175+
if update.Relationship.Subject.OptionalRelation != "" {
176+
subjectRelation = " " + update.Relationship.Subject.OptionalRelation
177+
}
178+
179+
console.Printf("%s:%s %s %s:%s%s\n",
180+
update.Relationship.Resource.ObjectType,
181+
update.Relationship.Resource.ObjectId,
182+
update.Relationship.Relation,
183+
update.Relationship.Subject.Object.ObjectType,
184+
update.Relationship.Subject.Object.ObjectId,
185+
subjectRelation,
186+
)
141187
}
142188
}
143189

0 commit comments

Comments
 (0)