@@ -15,144 +15,186 @@ import (
1515 "path/filepath"
1616
1717 "github.com/dgraph-io/badger/v4"
18+ "github.com/dgraph-io/dgo/v250"
1819 apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
1920 "github.com/dgraph-io/ristretto/v2/z"
2021
2122 "github.com/golang/glog"
2223 "golang.org/x/sync/errgroup"
23- "google.golang.org/grpc"
2424)
2525
2626// newClient creates a new import client with the specified endpoint and gRPC options.
27- func newClient (endpoint string , opts grpc. DialOption ) (apiv2.DgraphClient , error ) {
28- conn , err := grpc . NewClient ( endpoint , opts )
27+ func newClient (connectionString string ) (apiv2.DgraphClient , error ) {
28+ dg , err := dgo . Open ( connectionString )
2929 if err != nil {
30- return nil , fmt .Errorf ("failed to connect to endpoint [%s]: %w" , endpoint , err )
30+ return nil , fmt .Errorf ("failed to connect to endpoint [%s]: %w" , connectionString , err )
3131 }
3232
33- glog .Infof ("Successfully connected to Dgraph endpoint: %s" , endpoint )
34- return apiv2 . NewDgraphClient ( conn ) , nil
33+ glog .Infof ("[import] Successfully connected to Dgraph endpoint: %s" , connectionString )
34+ return dg . GetAPIv2Client ()[ 0 ] , nil
3535}
3636
37- func Import (ctx context.Context , endpoint string , opts grpc. DialOption , bulkOutDir string ) error {
38- dg , err := newClient (endpoint , opts )
37+ func Import (ctx context.Context , connectionString string , bulkOutDir string ) error {
38+ dg , err := newClient (connectionString )
3939 if err != nil {
4040 return err
4141 }
42- resp , err := startPDirStream (ctx , dg )
42+ resp , err := initiateSnapshotStream (ctx , dg )
4343 if err != nil {
4444 return err
4545 }
4646
47- return sendPDir (ctx , dg , bulkOutDir , resp .Groups )
47+ return streamSnapshot (ctx , dg , bulkOutDir , resp .Groups )
4848}
4949
50- // startPDirStream initiates a snapshot stream session with the Dgraph server.
51- func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.InitiatePDirStreamResponse , error ) {
52- glog .Info ("Initiating pdir stream" )
53- req := & apiv2.InitiatePDirStreamRequest {}
54- resp , err := dc .InitiatePDirStream (ctx , req )
50+ // initiateSnapshotStream initiates a snapshot stream session with the Dgraph server.
51+ func initiateSnapshotStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.UpdateExtSnapshotStreamingStateResponse , error ) {
52+ glog .Info ("[import] Initiating external snapshot stream" )
53+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
54+ Start : true ,
55+ }
56+ resp , err := dc .UpdateExtSnapshotStreamingState (ctx , req )
5557 if err != nil {
56- glog .Errorf ("failed to initiate pdir stream: %v" , err )
57- return nil , fmt .Errorf ("failed to initiate pdir stream: %v" , err )
58+ glog .Errorf ("[import] failed to initiate external snapshot stream: %v" , err )
59+ return nil , fmt .Errorf ("failed to initiate external snapshot stream: %v" , err )
5860 }
59- glog .Info ("Pdir stream initiated successfully" )
61+ glog .Info ("[import] External snapshot stream initiated successfully" )
6062 return resp , nil
6163}
6264
63- // sendPDir takes a p directory and a set of group IDs and streams the data from the
65+ // streamSnapshot takes a p directory and a set of group IDs and streams the data from the
6466// p directory to the corresponding group IDs. It first scans the provided directory for
6567// subdirectories named with numeric group IDs.
66- func sendPDir (ctx context.Context , dg apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
67- glog .Infof ("Starting to stream pdir from directory: %s" , baseDir )
68+ func streamSnapshot (ctx context.Context , dc apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
69+ glog .Infof ("[import] Starting to stream snapshot from directory: %s" , baseDir )
6870
69- errG , ctx := errgroup .WithContext (ctx )
71+ errG , errGrpCtx := errgroup .WithContext (ctx )
7072 for _ , group := range groups {
71- group := group
7273 errG .Go (func () error {
7374 pDir := filepath .Join (baseDir , fmt .Sprintf ("%d" , group - 1 ), "p" )
7475 if _ , err := os .Stat (pDir ); err != nil {
7576 return fmt .Errorf ("p directory does not exist for group [%d]: [%s]" , group , pDir )
7677 }
77-
78- glog .Infof ("Streaming data for group [%d] from directory: [%s]" , group , pDir )
79- if err := streamData (ctx , dg , pDir , group ); err != nil {
80- glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
78+ glog .Infof ("[import] Streaming data for group [%d] from directory: [%s]" , group , pDir )
79+ if err := streamSnapshotForGroup (errGrpCtx , dc , pDir , group ); err != nil {
80+ glog .Errorf ("[import] Failed to stream data for group [%v] from directory: [%s]: %v" , group , pDir , err )
8181 return err
8282 }
8383
8484 return nil
8585 })
8686 }
87+
8788 if err := errG .Wait (); err != nil {
89+ glog .Errorf ("[import] failed to stream external snapshot: %v" , err )
90+ // If errors occurs during streaming of the external snapshot, we drop all the data and
91+ // go back to ensure a clean slate and the cluster remains in working state.
92+ glog .Info ("[import] dropping all the data and going back to clean slate" )
93+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
94+ Start : false ,
95+ Finish : true ,
96+ DropData : true ,
97+ }
98+ if _ , err := dc .UpdateExtSnapshotStreamingState (ctx , req ); err != nil {
99+ return fmt .Errorf ("failed to turn off drain mode: %v" , err )
100+ }
101+
102+ glog .Info ("[import] successfully disabled drain mode" )
88103 return err
89104 }
90105
91- glog .Infof ("Completed streaming all pdirs" )
106+ glog .Info ("[import] Completed streaming external snapshot" )
107+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
108+ Start : false ,
109+ Finish : true ,
110+ DropData : false ,
111+ }
112+ if _ , err := dc .UpdateExtSnapshotStreamingState (ctx , req ); err != nil {
113+ glog .Errorf ("[import] failed to disable drain mode: %v" , err )
114+ return fmt .Errorf ("failed to disable drain mode: %v" , err )
115+ }
116+ glog .Info ("[import] successfully disable drain mode" )
92117 return nil
93118}
94119
95- // streamData handles the actual data streaming process for a single group.
120+ // streamSnapshotForGroup handles the actual data streaming process for a single group.
96121// It opens the BadgerDB at the specified directory and streams all data to the server.
97- func streamData (ctx context.Context , dg apiv2.DgraphClient , pdir string , groupId uint32 ) error {
122+ func streamSnapshotForGroup (ctx context.Context , dc apiv2.DgraphClient , pdir string , groupId uint32 ) error {
98123 glog .Infof ("Opening stream for group %d from directory %s" , groupId , pdir )
99124
100125 // Initialize stream with the server
101- out , err := dg . StreamPDir (ctx )
126+ out , err := dc . StreamExtSnapshot (ctx )
102127 if err != nil {
103- return fmt .Errorf ("failed to start pdir stream for group %d: %w" , groupId , err )
128+ return fmt .Errorf ("failed to start external snapshot stream for group %d: %w" , groupId , err )
104129 }
105130
131+ defer func () {
132+ if _ , err := out .CloseAndRecv (); err != nil {
133+ glog .Errorf ("failed to close the stream for group [%v]: %v" , groupId , err )
134+ }
135+
136+ glog .Infof ("[import] Group [%v]: Received ACK " , groupId )
137+ }()
138+
106139 // Open the BadgerDB instance at the specified directory
107140 opt := badger .DefaultOptions (pdir )
108141 ps , err := badger .OpenManaged (opt )
109142 if err != nil {
110- return fmt .Errorf ("failed to open BadgerDB at [%s]: %w" , pdir , err )
143+ glog .Errorf ("failed to open BadgerDB at [%s]: %v" , pdir , err )
144+ return fmt .Errorf ("failed to open BadgerDB at [%v]: %v" , pdir , err )
111145 }
112146
113147 defer func () {
114148 if err := ps .Close (); err != nil {
115- glog .Warningf ("Error closing BadgerDB: %v" , err )
149+ glog .Warningf ("[import] Error closing BadgerDB: %v" , err )
116150 }
117151 }()
118152
119153 // Send group ID as the first message in the stream
120- glog .Infof ("Sending group ID [%d] to server " , groupId )
121- groupReq := & apiv2.StreamPDirRequest {GroupId : groupId }
154+ glog .Infof ("[import] Sending request for streaming external snapshot for group ID [%v] " , groupId )
155+ groupReq := & apiv2.StreamExtSnapshotRequest {GroupId : groupId }
122156 if err := out .Send (groupReq ); err != nil {
123- return fmt .Errorf ("failed to send group ID [%d]: %w" , groupId , err )
157+ return fmt .Errorf ("failed to send request for streaming external snapshot for group ID [%v] to the server: %w" ,
158+ groupId , err )
124159 }
125160
126161 // Configure and start the BadgerDB stream
127- glog .Infof ("Starting BadgerDB stream for group [%d]" , groupId )
162+ glog .Infof ("[import] Starting BadgerDB stream for group [%v]" , groupId )
163+
164+ if err := streamBadger (ctx , ps , out , groupId ); err != nil {
165+ return fmt .Errorf ("badger streaming failed for group [%v]: %v" , groupId , err )
166+ }
167+
168+ return nil
169+ }
170+
171+ // streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
172+ // It creates a new stream at the maximum sequence number and sends the data to the specified group.
173+ // It also sends a final 'done' signal to mark completion.
174+ func streamBadger (ctx context.Context , ps * badger.DB , out apiv2.Dgraph_StreamExtSnapshotClient , groupId uint32 ) error {
128175 stream := ps .NewStreamAt (math .MaxUint64 )
129- stream .LogPrefix = fmt . Sprintf ( " Sending P dir for group [%d] " , groupId )
176+ stream .LogPrefix = "[import] Sending external snapshot to group [" + fmt . Sprintf ( "%d " , groupId ) + "]"
130177 stream .KeyToList = nil
131178 stream .Send = func (buf * z.Buffer ) error {
132179 p := & apiv2.StreamPacket {Data : buf .Bytes ()}
133- if err := out .Send (& apiv2.StreamPDirRequest { StreamPacket : p }); err != nil && ! errors .Is (err , io .EOF ) {
180+ if err := out .Send (& apiv2.StreamExtSnapshotRequest { Pkt : p }); err != nil && ! errors .Is (err , io .EOF ) {
134181 return fmt .Errorf ("failed to send data chunk: %w" , err )
135182 }
136183 return nil
137184 }
138185
139186 // Execute the stream process
140187 if err := stream .Orchestrate (ctx ); err != nil {
141- return fmt .Errorf ("stream orchestration failed for group [%d ]: %w" , groupId , err )
188+ return fmt .Errorf ("stream orchestration failed for group [%v ]: %w, badger path: %s " , groupId , err , ps . Opts (). Dir )
142189 }
143190
144191 // Send the final 'done' signal to mark completion
145- glog .Infof ("Sending completion signal for group [%d]" , groupId )
192+ glog .Infof ("[import] Sending completion signal for group [%d]" , groupId )
146193 done := & apiv2.StreamPacket {Done : true }
147194
148- if err := out .Send (& apiv2.StreamPDirRequest { StreamPacket : done }); err != nil && ! errors .Is (err , io .EOF ) {
195+ if err := out .Send (& apiv2.StreamExtSnapshotRequest { Pkt : done }); err != nil && ! errors .Is (err , io .EOF ) {
149196 return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
150197 }
151- // Wait for acknowledgment from the server
152- if _ , err := out .CloseAndRecv (); err != nil {
153- return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
154- }
155- glog .Infof ("Group [%d]: Received ACK " , groupId )
156198
157199 return nil
158200}
0 commit comments