@@ -197,6 +197,10 @@ public function readAll()
197197 // are to be discarded.
198198
199199 foreach ($ this ->stream as $ readRowsResponse ) {
200+ if ($ lastScannedRowKey = $ readRowsResponse ->getLastScannedRowKey ()) {
201+ // The server sends the response with a "last_scanned_row_key" in some cases
202+ $ this ->prevRowKey = $ lastScannedRowKey ;
203+ }
200204 foreach ($ readRowsResponse ->getChunks () as $ chunk ) {
201205 switch ($ this ->state ) {
202206 case self ::$ rowStateEnum ['NEW_ROW ' ]:
@@ -234,7 +238,6 @@ private function updateReadRowsRequest($request, $options, $prevRowKey)
234238 $ options ['requestCompleted ' ] = true ;
235239 }
236240 }
237-
238241 if ($ request ->hasRows ()) {
239242 $ rowSet = $ request ->getRows ();
240243 if (count ($ rowSet ->getRowKeys ()) > 0 ) {
@@ -252,7 +255,8 @@ private function updateReadRowsRequest($request, $options, $prevRowKey)
252255 if (($ range ->getEndKeyOpen () && $ prevRowKey > $ range ->getEndKeyOpen ())
253256 || ($ range ->getEndKeyClosed () && $ prevRowKey >= $ range ->getEndKeyClosed ())) {
254257 continue ;
255- } elseif ((!$ range ->getStartKeyOpen () || $ prevRowKey > $ range ->getStartKeyOpen ())
258+ }
259+ if ((!$ range ->getStartKeyOpen () || $ prevRowKey > $ range ->getStartKeyOpen ())
256260 && (!$ range ->getStartKeyClosed () || $ prevRowKey >= $ range ->getStartKeyClosed ())) {
257261 $ range ->setStartKeyOpen ($ prevRowKey );
258262 }
@@ -270,8 +274,10 @@ private function updateReadRowsRequest($request, $options, $prevRowKey)
270274 $ options ['requestCompleted ' ] = true ;
271275 }
272276 } else {
273- $ range = (new RowRange ())->setStartKeyOpen ($ prevRowKey );
274- $ options ['rows ' ] = (new RowSet ())->setRowRanges ([$ range ]);
277+ $ range = (new RowRange ());
278+ $ range ->setStartKeyOpen ($ prevRowKey );
279+ $ rowset = (new RowSet ())->setRowRanges ([$ range ]);
280+ $ request ->setRows ($ rowset );
275281 }
276282
277283 return [$ request , $ options ];
0 commit comments