@@ -9,7 +9,10 @@ import (
99 "syscall"
1010 "time"
1111
12+ "github.com/rs/zerolog/log"
1213 "github.com/spf13/cobra"
14+ "google.golang.org/grpc/codes"
15+ "google.golang.org/grpc/status"
1316
1417 v1 "github.com/authzed/authzed-go/proto/authzed/api/v1"
1518
@@ -44,26 +47,29 @@ func RegisterWatchRelationshipCmd(parentCmd *cobra.Command) *cobra.Command {
4447
4548var watchCmd = & cobra.Command {
4649 Use : "watch [object_types, ...] [start_cursor]" ,
47- Short : "Watches the stream of relationship updates from the server" ,
50+ Short : "Watches the stream of relationship updates and schema updates from the server" ,
4851 Args : ValidationWrapper (cobra .RangeArgs (0 , 2 )),
4952 RunE : watchCmdFunc ,
5053 Deprecated : "please use `zed relationships watch` instead" ,
5154}
5255
5356var watchRelationshipsCmd = & cobra.Command {
5457 Use : "watch [object_types, ...] [start_cursor]" ,
55- Short : "Watches the stream of relationship updates from the server" ,
58+ Short : "Watches the stream of relationship updates and schema updates from the server" ,
5659 Args : ValidationWrapper (cobra .RangeArgs (0 , 2 )),
5760 RunE : watchCmdFunc ,
5861}
5962
6063func watchCmdFunc (cmd * cobra.Command , _ []string ) error {
61- console .Printf ("starting watch stream over types %v and revision %v\n " , watchObjectTypes , watchRevision )
62-
63- cli , err := client .NewClient (cmd )
64+ client , err := client .NewClient (cmd )
6465 if err != nil {
6566 return err
6667 }
68+ return watchCmdFuncImpl (cmd , client , processResponse )
69+ }
70+
71+ func watchCmdFuncImpl (cmd * cobra.Command , watchClient v1.WatchServiceClient , processResponse func (resp * v1.WatchResponse )) error {
72+ console .Printf ("starting watch stream over types %v and revision %v\n " , watchObjectTypes , watchRevision )
6773
6874 relFilters := make ([]* v1.RelationshipFilter , 0 , len (watchRelationshipFilters ))
6975 for _ , filter := range watchRelationshipFilters {
@@ -74,21 +80,26 @@ func watchCmdFunc(cmd *cobra.Command, _ []string) error {
7480 relFilters = append (relFilters , relFilter )
7581 }
7682
83+ ctx , cancel := context .WithCancel (cmd .Context ())
84+ defer cancel ()
85+
86+ signalctx , interruptCancel := signal .NotifyContext (ctx , os .Interrupt , syscall .SIGTERM , syscall .SIGINT )
87+ defer interruptCancel ()
88+
7789 req := & v1.WatchRequest {
7890 OptionalObjectTypes : watchObjectTypes ,
7991 OptionalRelationshipFilters : relFilters ,
92+ OptionalUpdateKinds : []v1.WatchKind {
93+ v1 .WatchKind_WATCH_KIND_INCLUDE_CHECKPOINTS , // keeps connection open during quiet periods
94+ v1 .WatchKind_WATCH_KIND_INCLUDE_SCHEMA_UPDATES ,
95+ },
8096 }
97+
8198 if watchRevision != "" {
8299 req .OptionalStartCursor = & v1.ZedToken {Token : watchRevision }
83100 }
84101
85- ctx , cancel := context .WithCancel (cmd .Context ())
86- defer cancel ()
87-
88- signalctx , interruptCancel := signal .NotifyContext (ctx , os .Interrupt , syscall .SIGTERM , syscall .SIGINT )
89- defer interruptCancel ()
90-
91- watchStream , err := cli .Watch (ctx , req )
102+ watchStream , err := watchClient .Watch (ctx , req )
92103 if err != nil {
93104 return err
94105 }
@@ -104,40 +115,74 @@ func watchCmdFunc(cmd *cobra.Command, _ []string) error {
104115 default :
105116 resp , err := watchStream .Recv ()
106117 if err != nil {
107- return err
108- }
118+ ok , err := isRetryable (err )
119+ if ! ok {
120+ return err
121+ }
109122
110- for _ , update := range resp .Updates {
111- if watchTimestamps {
112- console .Printf ("%v: " , time .Now ())
123+ log .Trace ().Err (err ).Msg ("will retry from the last known revision " + watchRevision )
124+ req .OptionalStartCursor = & v1.ZedToken {Token : watchRevision }
125+ watchStream , err = watchClient .Watch (ctx , req )
126+ if err != nil {
127+ return err
113128 }
129+ continue
130+ }
114131
115- switch update .Operation {
116- case v1 .RelationshipUpdate_OPERATION_CREATE :
117- console .Printf ("CREATED " )
132+ processResponse (resp )
133+ }
134+ }
135+ }
118136
119- case v1 .RelationshipUpdate_OPERATION_DELETE :
120- console .Printf ("DELETED " )
137+ func isRetryable (err error ) (bool , error ) {
138+ statusErr , ok := status .FromError (err )
139+ if ! ok || (statusErr .Code () != codes .Unavailable ) {
140+ return false , err
141+ }
142+ return true , nil
143+ }
121144
122- case v1 .RelationshipUpdate_OPERATION_TOUCH :
123- console .Printf ("TOUCHED " )
124- }
145+ func processResponse (resp * v1.WatchResponse ) {
146+ if resp .ChangesThrough != nil {
147+ watchRevision = resp .ChangesThrough .Token
148+ }
125149
126- subjectRelation := ""
127- if update .Relationship .Subject .OptionalRelation != "" {
128- subjectRelation = " " + update .Relationship .Subject .OptionalRelation
129- }
150+ if resp .SchemaUpdated {
151+ if watchTimestamps {
152+ console .Printf ("%v: " , time .Now ())
153+ }
154+ console .Println ("SCHEMA UPDATED" )
155+ }
130156
131- console .Printf ("%s:%s %s %s:%s%s\n " ,
132- update .Relationship .Resource .ObjectType ,
133- update .Relationship .Resource .ObjectId ,
134- update .Relationship .Relation ,
135- update .Relationship .Subject .Object .ObjectType ,
136- update .Relationship .Subject .Object .ObjectId ,
137- subjectRelation ,
138- )
139- }
157+ for _ , update := range resp .Updates {
158+ if watchTimestamps {
159+ console .Printf ("%v: " , time .Now ())
140160 }
161+
162+ switch update .Operation {
163+ case v1 .RelationshipUpdate_OPERATION_CREATE :
164+ console .Printf ("CREATED " )
165+
166+ case v1 .RelationshipUpdate_OPERATION_DELETE :
167+ console .Printf ("DELETED " )
168+
169+ case v1 .RelationshipUpdate_OPERATION_TOUCH :
170+ console .Printf ("TOUCHED " )
171+ }
172+
173+ subjectRelation := ""
174+ if update .Relationship .Subject .OptionalRelation != "" {
175+ subjectRelation = " " + update .Relationship .Subject .OptionalRelation
176+ }
177+
178+ console .Printf ("%s:%s %s %s:%s%s\n " ,
179+ update .Relationship .Resource .ObjectType ,
180+ update .Relationship .Resource .ObjectId ,
181+ update .Relationship .Relation ,
182+ update .Relationship .Subject .Object .ObjectType ,
183+ update .Relationship .Subject .Object .ObjectId ,
184+ subjectRelation ,
185+ )
141186 }
142187}
143188
0 commit comments