Skip to content

Commit 1b4a1c9

Browse files
authored
feat(Bigtable): expose totalTimeoutMillis in RetrySettings for Read and Mutate Rows (#8223)
1 parent 00bd9e2 commit 1b4a1c9

5 files changed

Lines changed: 118 additions & 47 deletions

File tree

Bigtable/src/BigtableClient.php

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Google\ApiCore\ClientOptionsTrait;
2222
use Google\ApiCore\CredentialsWrapper;
2323
use Google\ApiCore\Serializer;
24+
use Google\ApiCore\Transport\GrpcTransport;
25+
use Google\ApiCore\Transport\RestTransport;
2426
use Google\ApiCore\Transport\TransportInterface;
2527
use Google\ApiCore\ValidationException;
2628
use Google\Auth\FetchAuthTokenInterface;
@@ -94,14 +96,13 @@ class BigtableClient
9496
* This option accepts either a path to a credentials file, or a
9597
* decoded credentials file as a PHP array.
9698
* *Advanced usage*: In addition, this option can also accept a
97-
* pre-constructed {@see Google\Auth\FetchAuthTokenInterface} object
98-
* or {@see Google\ApiCore\CredentialsWrapper} object. Note that when
99-
* one of these objects are provided, any settings in
100-
* $config['credentialsConfig'] will be ignored.
99+
* pre-constructed {@see FetchAuthTokenInterface} object
100+
* or {@see CredentialsWrapper} object. Note that when one of
101+
* these objects are provided, any settings in
102+
* `$config['credentialsConfig']` will be ignored.
101103
* @type array $credentialsConfig Options used to configure credentials,
102104
* including auth token caching, for the client. For a full list of
103-
* supporting configuration options, see
104-
* {@see Google\ApiCore\CredentialsWrapper}.
105+
* supporting configuration options, see {@see CredentialsWrapper}.
105106
* @type bool $disableRetries Determines whether or not retries defined by
106107
* the client configuration should be disabled. **Defaults to**
107108
* `false`.
@@ -112,9 +113,8 @@ class BigtableClient
112113
* executing network requests. May be either the string `rest` or
113114
* `grpc`. **Defaults to** `grpc` if gRPC support is detected on
114115
* the system. *Advanced usage*: Additionally, it is possible to
115-
* pass in an already instantiated
116-
* {@see Google\ApiCore\Transport\TransportInterface} object. Note
117-
* that when this object is provided, any settings in
116+
* pass in an already instantiated {@see TransportInterface}
117+
* object. Note that when this object is provided, any settings in
118118
* $config['transportConfig'] and the $config['apiEndpoint']
119119
* setting will be ignored.
120120
* @type array $transportConfig Configuration options that will be used to
@@ -124,9 +124,8 @@ class BigtableClient
124124
* 'grpc' => [...],
125125
* 'rest' => [...]
126126
* ];
127-
* See the `build` method on {@see Google\ApiCore\Transport\GrpcTransport}
128-
* and {@see Google\ApiCore\Transport\RestTransport} for the
129-
* supported options.
127+
* See the `build` method on {@see GrpcTransport} and
128+
* {@see RestTransport} for the supported options.
130129
* @type string $quotaProject Specifies a user project to bill for
131130
* access charges associated with the request.
132131
* @type bool $pingAndWarm EXPERIMENTAL When true, calls the

Bigtable/src/ResumableStream.php

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ class ResumableStream implements \IteratorAggregate
8484
*/
8585
private $delayFunction;
8686

87+
private int $totalTimeoutMillis;
88+
8789
/**
8890
* Constructs a resumable stream.
8991
*
@@ -94,11 +96,16 @@ class ResumableStream implements \IteratorAggregate
9496
* calling `$apiFunction`.
9597
* @param callable $retryFunction Function which determines whether to retry or not.
9698
* @param array $callOptions {
97-
* @option RetrySettings|array $retrySettings {
98-
* @option int $maxRetries Number of times to retry. **Defaults to** `3`.
99-
* Only maxRetries works for RetrySettings in this API.
100-
* }
101-
* }
99+
* Configuration options.
100+
*
101+
* @type RetrySettings|array $retrySettings {
102+
* Only $maxRetries and $totalTimeoutMillis work for RetrySettings in this API.
103+
*
104+
* @type int $maxRetries Number of times to retry. **Defaults to** `3`.
105+
* @type int $totalTimeoutMillis The max accumulative timeout in total for the
106+
* operation. `-1` signifies no total timeout. **Defaults to** `-1`.
107+
* }
108+
* }
102109
*/
103110
public function __construct(
104111
GapicClient $gapicClient,
@@ -112,6 +119,7 @@ public function __construct(
112119
$this->method = $method;
113120
$this->request = $request;
114121
$this->retries = $this->getMaxRetries($callOptions);
122+
$this->totalTimeoutMillis = $this->getTotalTimeoutMillis($callOptions);
115123
$this->argumentFunction = $argumentFunction;
116124
$this->retryFunction = $retryFunction;
117125
$this->callOptions = $callOptions;
@@ -149,7 +157,9 @@ public function readAll()
149157
{
150158
// Reset $currentAttempts on successful row read, but keep total attempts for the header.
151159
$currentAttempt = $totalAttempt = 0;
160+
$startTimeMillis = floor(microtime(true) * 1000);
152161
do {
162+
$this->checkTotalTimeout($startTimeMillis);
153163
$ex = null;
154164
list($this->request, $this->callOptions) =
155165
($this->argumentFunction)($this->request, $this->callOptions);
@@ -220,4 +230,32 @@ private function getMaxRetries(array $options): int
220230

221231
return $retrySettings['maxRetries'] ?? ResumableStream::DEFAULT_MAX_RETRIES;
222232
}
233+
234+
private function getTotalTimeoutMillis(array $options): ?int
235+
{
236+
$retrySettings = $options['retrySettings'] ?? [];
237+
238+
if ($retrySettings instanceof RetrySettings) {
239+
return $retrySettings->getTotalTimeoutMillis();
240+
}
241+
242+
return $retrySettings['totalTimeoutMillis'] ?? -1;
243+
}
244+
245+
private function checkTotalTimeout(int $startTimeMillis): void
246+
{
247+
if ($this->totalTimeoutMillis >= 0) {
248+
$elapsedTimeMillis = floor(microtime(true) * 1000) - $startTimeMillis;
249+
if ($elapsedTimeMillis > $this->totalTimeoutMillis) {
250+
throw new ApiException('Operation timeout exceeeded ', Code::DEADLINE_EXCEEDED);
251+
}
252+
$remainingTimeMillis = $this->totalTimeoutMillis - $elapsedTimeMillis;
253+
// Set timeoutMillis if it's unset or less than the remaining time
254+
// in order to preempt requests from exceeding total timeout
255+
if (!isset($this->callOptions['timeoutMillis'])
256+
|| $this->callOptions['timeoutMillis'] > $remainingTimeMillis) {
257+
$this->callOptions['timeoutMillis'] = $remainingTimeMillis;
258+
}
259+
}
260+
}
223261
}

Bigtable/src/Table.php

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,16 @@ public function __construct(
120120
* @param array $rowMutations An associative array with the key being the
121121
* row key and the value being the
122122
* {@see \Google\Cloud\Bigtable\Mutations} to perform.
123-
* @param array $options [optional] Configuration options.
123+
* @param array $options {
124+
* Configuration options.
125+
*
126+
* @option RetrySettings|array $retrySettings {
127+
* Only $maxRetries and $totalTimeoutMillis work for RetrySettings in this API.
128+
*
129+
* @option int $maxRetries Number of times to retry. **Defaults to** `3`.
130+
* @option int $totalTimeoutMillis The max accumulative timeout in total.
131+
* }
132+
* }
124133
* @return void
125134
* @throws ApiException|BigtableDataOperationException If the remote call fails or operation fails.
126135
* @throws \InvalidArgumentException If rowMutations is a list instead of associative array indexed by row key.
@@ -155,13 +164,15 @@ public function mutateRows(array $rowMutations, array $options = [])
155164
*
156165
* @param string $rowKey The row key of the row to mutate.
157166
* @param Mutations $mutations Mutations to apply on row.
158-
* @param array $options [optional] {
167+
* @param array $options {
159168
* Configuration options.
160169
*
161-
* @param RetrySettings|array $retrySettings {
162-
* @option int $maxRetries Number of times to retry. **Defaults to** `3`.
163-
* Only maxRetries works for RetrySettings in this API.
164-
* }
170+
* @option RetrySettings|array $retrySettings {
171+
* Only $maxRetries and $totalTimeoutMillis work for RetrySettings in this API.
172+
*
173+
* @option int $maxRetries Number of times to retry. **Defaults to** `3`.
174+
* @option int $totalTimeoutMillis The max accumulative timeout in total.
175+
* }
165176
* }
166177
* @return void
167178
* @throws ApiException If the remote call fails.
@@ -201,10 +212,12 @@ public function mutateRow($rowKey, Mutations $mutations, array $options = [])
201212
* @param array $options [optional] {
202213
* Configuration options.
203214
*
204-
* @param RetrySettings|array $retrySettings {
205-
* @option int $maxRetries Number of times to retry. **Defaults to** `3`.
206-
* Only maxRetries works for RetrySettings in this API.
207-
* }
215+
* @type RetrySettings|array $retrySettings {
216+
* Only $maxRetries and $totalTimeoutMillis work for RetrySettings in this API.
217+
*
218+
* @type int $maxRetries Number of times to retry. **Defaults to** `3`.
219+
* @type int $totalTimeoutMillis The max accumulative timeout in total for the operation.
220+
* }
208221
* }
209222
* @return void
210223
* @throws ApiException|BigtableDataOperationException If the remote call fails or operation fails
@@ -274,10 +287,12 @@ public function upsert(array $rows, array $options = [])
274287
* To learn more please see {@see \Google\Cloud\Bigtable\Filter} which
275288
* provides static factory methods for the various filter types.
276289
* @type int $rowsLimit The number of rows to scan.
277-
* @param RetrySettings|array $retrySettings {
278-
* @option int $maxRetries Number of times to retry. **Defaults to** `3`.
279-
* Only maxRetries works for RetrySettings in this API.
280-
* }
290+
* @type RetrySettings|array $retrySettings {
291+
* Only $maxRetries and $totalTimeoutMillis work for RetrySettings in this API.
292+
*
293+
* @type int $maxRetries Number of times to retry. **Defaults to** `3`.
294+
* @type int $totalTimeoutMillis The max accumulative timeout in total for the operation.
295+
* }
281296
* }
282297
* @return ChunkFormatter
283298
*/
Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
_Retry_WithRoutingCookie
22
_Retry_WithRetryInfo
3-
TestMutateRows_Generic_DeadlineExceeded
4-
TestReadRow_Generic_DeadlineExceeded
53
TestReadRows_Retry_StreamReset
64
TestReadRows_Retry_LastScannedRow_Reverse
75
TestReadRows_ReverseScans_FeatureFlag_Enabled
8-
TestReadRows_Generic_DeadlineExceeded

Bigtable/tests/Conformance/proxy/src/ProxyService.php

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,23 @@ public function ReadRow(GRPC\ContextInterface $ctx, Testproxy\ReadRowRequest $in
133133
'appProfileId' => $config->getAppProfileId(),
134134
]);
135135

136-
$rowData = $table->readRow($in->getRowKey(), [
137-
'filter' => $in->getFilter(),
138-
'timeoutMillis' => $this->getTimeoutMillis($config->getPerOperationTimeout()),
139-
]);
136+
$timeoutMillis = $this->getTimeoutMillis($config->getPerOperationTimeout());
137+
try {
138+
$rowData = $table->readRow($in->getRowKey(), [
139+
'filter' => $in->getFilter(),
140+
'retrySettings' => [
141+
// set total timeout for the operation (including retries)
142+
'totalTimeoutMillis' => $timeoutMillis,
143+
],
144+
// the conformance tests check for this, but I'm not sure why
145+
'rowsLimit' => 1,
146+
]);
147+
} catch (\Google\ApiCore\ApiException $e) {
148+
return $out->setStatus(new Status([
149+
'code' => $e->getCode(),
150+
'message' => $e->getMessage(),
151+
]));
152+
}
140153

141154
if ($rowData) {
142155
$row = $this->arrayToRowProto($rowData);
@@ -195,13 +208,16 @@ public function ReadRows(GRPC\ContextInterface $ctx, Testproxy\ReadRowsRequest $
195208

196209
$this->logger->debug(json_encode($rowKeys));
197210

211+
$timeoutMillis = $this->getTimeoutMillis($config->getPerOperationTimeout());
198212
$stream = $table->readRows([
199213
'rowKeys' => $rowKeys,
200214
'rowRanges' => $ranges,
201215
'filter' => $request->getFilter(),
202216
'rowsLimit' => $request->getRowsLimit(),
203217
'reversed' => $request->getReversed(),
204-
'timeoutMillis' => $this->getTimeoutMillis($config->getPerOperationTimeout()),
218+
'retrySettings' => [
219+
'totalTimeoutMillis' => $timeoutMillis,
220+
],
205221
]);
206222

207223
$rows = [];
@@ -309,25 +325,31 @@ public function BulkMutateRows(GRPC\ContextInterface $ctx, Testproxy\MutateRowsR
309325
$mutations[$entry->getRowKey()] = $this->protoToMutations($entry->getMutations());
310326
}
311327

328+
$timeoutMillis = $this->getTimeoutMillis($config->getPerOperationTimeout());
312329
try {
313330
$table->mutateRows($mutations, [
314-
'timeoutMillis' => $this->getTimeoutMillis($config->getPerOperationTimeout())
331+
'retrySettings' => [
332+
'totalTimeoutMillis' => $timeoutMillis,
333+
]
315334
]);
316335
} catch (BigtableDataOperationException $e) {
317336
$failedEntries = [];
318337
foreach ($e->getMetadata() as $metadata) {
319-
$status = new Status([
320-
'code' => $metadata['statusCode'],
321-
'message' => $metadata['message'],
322-
]);
323338
$failedEntries[] = new Entry([
324-
'index' => $metadata['index'],
325-
'status' => $status,
339+
'index' => $metadata['index'] ?? 0,
340+
'status' => new Status([
341+
'code' => $metadata['statusCode'],
342+
'message' => $metadata['message'],
343+
]),
326344
]);
327345
}
328346
$out->setEntries($failedEntries);
347+
$out->setStatus(new Status([
348+
'code' => $e->getCode(),
349+
'message' => $e->getMessage(),
350+
]));
329351
} catch (\Google\ApiCore\ApiException $e) {
330-
$out = $out->setStatus(new Status([
352+
$out->setStatus(new Status([
331353
'code' => $e->getCode(),
332354
'message' => $e->getMessage(),
333355
]));

0 commit comments

Comments
 (0)