Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions Spanner/src/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
use Google\ApiCore\Call;
use Google\ApiCore\CredentialsWrapper;
use Google\ApiCore\Serializer;
use Google\ApiCore\ValidationException;
use Google\Auth\GetUniverseDomainInterface;
use Google\Cloud\Core\EmulatorTrait;
use Google\Cloud\Core\GrpcRequestWrapper;
Expand Down Expand Up @@ -1104,6 +1105,12 @@ public function beginTransaction(array $args)
$readWrite = new ReadWrite();
$options->setReadWrite($readWrite);
$args = $this->addLarHeader($args, $this->larEnabled);

if (isset($transactionOptions['readWrite']['readLockMode'])) {
// Nested option `readLockMode` inside `readWrite` transactions
$readLockModeOption = $transactionOptions['readWrite']['readLockMode'];
$options->getReadWrite()->setReadLockMode($readLockModeOption);
}
} elseif (isset($transactionOptions['partitionedDml'])) {
$pdml = new PartitionedDml();
$options->setPartitionedDml($pdml);
Expand Down Expand Up @@ -1573,6 +1580,32 @@ private function formatTransactionOptions(array $transactionOptions)
$transactionOptions['readOnly'] = $ro;
}

if (isset($transactionOptions['readLockMode'])) {
if (isset($transactionOptions['readOnly'])) {
throw new ValidationException(
'The readLockMode option is only valid for readWrite transactions.'
);
}

if (!isset($transactionOptions['readWrite'])) {
$transactionOptions['readWrite'] = [];
}
}

if (isset($transactionOptions['readWrite'])) {
$rw = $transactionOptions['readWrite'];

// Format nested options inside readWrite transaction
if (isset($transactionOptions['readLockMode'])) {
$rw['readLockMode'] = $transactionOptions['readLockMode'];

// Unset the readLockMode key on the base options array.
// If we don't do this it causes issues in the serializer for TransactionOptions
unset($transactionOptions['readLockMode']);
}

$transactionOptions['readWrite'] = $rw;
}
return $transactionOptions;
}

Expand Down
17 changes: 6 additions & 11 deletions Spanner/src/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -775,10 +775,8 @@ public function snapshot(array $options = [])
* If you wish Google Cloud PHP to handle retry logic for you (recommended
* for most cases), use {@see \Google\Cloud\Spanner\Database::runTransaction()}.
*
* Please note that once a transaction reads data, it will lock the read
* data, preventing other users from modifying that data. For this reason,
* it is important that every transaction commits or rolls back as early as
* possible. Do not hold transactions open longer than necessary.
* Please note for locking semantics and defaults for the transactions
* use {@see \Google\Cloud\Spanner\V1\TransactionOptions\ReadWrite\ReadLockMode}
*
* Example:
* ```
Expand Down Expand Up @@ -812,8 +810,8 @@ public function transaction(array $options = [])
throw new \BadMethodCallException('Nested transactions are not supported by this client.');
}

// There isn't anything configurable here.
$options['transactionOptions'] = $this->configureTransactionOptions();
// Configure readWrite options here. Any nested options for readWrite should be added to this call
$options['transactionOptions'] = $this->configureTransactionOptions($options['transactionOptions'] ?? []);

$session = $this->selectSession(
SessionPoolInterface::CONTEXT_READWRITE,
Expand All @@ -840,10 +838,8 @@ public function transaction(array $options = [])
* exception types will immediately bubble up and will interrupt the retry
* operation.
*
* Please note that once a transaction reads data, it will lock the read
* data, preventing other users from modifying that data. For this reason,
* it is important that every transaction commits or rolls back as early as
* possible. Do not hold transactions open longer than necessary.
* Please note for locking semantics and defaults for the transactions
* use {@see \Google\Cloud\Spanner\V1\TransactionOptions\ReadWrite\ReadLockMode}
*
* Please also note that nested transactions are NOT supported by this client.
* Attempting to call `runTransaction` inside a transaction callable will
Expand Down Expand Up @@ -920,7 +916,6 @@ public function runTransaction(callable $operation, array $options = [])
'maxRetries' => self::MAX_RETRIES,
];

// There isn't anything configurable here.
$options['transactionOptions'] = $this->configureTransactionOptions($options['transactionOptions'] ?? []);

$session = $this->selectSession(
Expand Down
10 changes: 10 additions & 0 deletions Spanner/src/TransactionConfigurationTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use Google\Cloud\Core\ArrayTrait;
use Google\Cloud\Spanner\Session\SessionPoolInterface;
use Google\Cloud\Spanner\V1\TransactionOptions\ReadWrite\ReadLockMode as ReadLockMode;

/**
* Configure transaction selection for read, executeSql, rollback and commit.
Expand Down Expand Up @@ -152,6 +153,15 @@ private function configureTransactionOptions(array $options = [])
$transactionOptions['excludeTxnFromChangeStreams'] = $options['excludeTxnFromChangeStreams'];
}

// Allow for proper configuring of the `readLockMode` if it's set as a base or nested option
if (isset($options['readLockMode'])) {
$transactionOptions['readWrite']['readLockMode'] = $options['readLockMode'];
}

if (isset($options['readWrite']['readLockMode'])) {
$transactionOptions['readWrite']['readLockMode'] = $options['readWrite']['readLockMode'];
}

return $transactionOptions;
}

Expand Down
99 changes: 91 additions & 8 deletions Spanner/tests/Unit/DatabaseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use Google\Cloud\Core\Exception\AbortedException;
use Google\Cloud\Core\Exception\NotFoundException;
use Google\Cloud\Core\Exception\ServerException;
use Google\Cloud\Core\Exception\ServiceException;
use Google\Cloud\Core\Iam\Iam;
use Google\Cloud\Core\Iterator\ItemIterator;
use Google\Cloud\Core\LongRunning\LongRunningConnectionInterface;
Expand All @@ -45,18 +46,18 @@
use Google\Cloud\Spanner\Tests\StubCreationTrait;
use Google\Cloud\Spanner\Timestamp;
use Google\Cloud\Spanner\Transaction;
use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type as ReplicaType;
use Google\Cloud\Spanner\V1\ResultSet;
use Google\Cloud\Spanner\V1\ResultSetStats;
use Google\Cloud\Spanner\V1\DirectedReadOptions\ReplicaSelection\Type as ReplicaType;
use Google\Cloud\Spanner\V1\Session as SessionProto;
use Google\Cloud\Spanner\V1\SpannerClient;
use Google\Cloud\Spanner\V1\Transaction as TransactionProto;
use Google\Cloud\Spanner\V1\TransactionOptions;
use Google\Cloud\Spanner\V1\TransactionOptions\ReadWrite\ReadLockMode as ReadLockMode;
use Google\Rpc\Code;
use PHPUnit\Framework\TestCase;
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
use Google\Cloud\Core\Exception\ServiceException;
use Google\Cloud\Spanner\V1\ReadRequest\LockHint;
use Google\Cloud\Spanner\V1\ReadRequest\OrderBy;

Expand Down Expand Up @@ -94,7 +95,6 @@ class DatabaseTest extends TestCase
private $directedReadOptionsIncludeReplicas;
private $directedReadOptionsExcludeReplicas;


public function setUp(): void
{
$this->checkAndSkipGrpcTests();
Expand Down Expand Up @@ -253,7 +253,7 @@ public function testBackups()
]
];

$expectedFilter = "database:".$this->database->name();
$expectedFilter = 'database:' . $this->database->name();
$this->connection->listBackups(Argument::withEntry('filter', $expectedFilter))
->shouldBeCalled()
->willReturn(['backups' => $backups]);
Expand Down Expand Up @@ -281,8 +281,8 @@ public function testBackupsWithCustomFilter()
'name' => DatabaseAdminClient::backupName(self::PROJECT, self::INSTANCE, 'backup2'),
]
];
$defaultFilter = "database:" . $this->database->name();
$customFilter = "customFilter";
$defaultFilter = 'database:' . $this->database->name();
$customFilter = 'customFilter';
$expectedFilter = sprintf('(%1$s) AND (%2$s)', $defaultFilter, $customFilter);

$this->connection->listBackups(Argument::withEntry('filter', $expectedFilter))
Expand Down Expand Up @@ -412,7 +412,7 @@ public function testCreatePostgresDialect()
$this->database->___setProperty('connection', $this->connection->reveal());

$op = $this->database->create([
'databaseDialect'=> DatabaseDialect::POSTGRESQL
'databaseDialect' => DatabaseDialect::POSTGRESQL
]);

$this->assertInstanceOf(LongRunningOperation::class, $op);
Expand Down Expand Up @@ -710,7 +710,6 @@ public function testBatchWrite()
Argument::withEntry('mutationGroups', [$expectedMutationGroup])
))->shouldBeCalled()->willReturn(['foo result']);


$mutationGroups = [
($this->database->mutationGroup(false))
->insertOrUpdate(
Expand Down Expand Up @@ -2193,6 +2192,90 @@ public function testBatchWriteWithExcludeTxnFromChangeStreams()
]);
}

public function testRunTransactionWithReadLockMode()
{
$expectedReadLockMode = ReadLockMode::OPTIMISTIC;

$gapic = $this->prophesize(SpannerClient::class);

$sessName = SpannerClient::sessionName(self::PROJECT, self::INSTANCE, self::DATABASE, self::SESSION);
$session = new SessionProto(['name' => $sessName]);
$resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]);
$gapic->createSession(Argument::cetera())->shouldBeCalled()->willReturn($session);
$gapic->deleteSession(Argument::cetera())->shouldBeCalled();

$sql = 'SELECT example FROM sql_query';
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()->shouldBeCalledOnce()->willReturn([$resultSet]);
$gapic->executeStreamingSql(
$sessName,
$sql,
Argument::that(function (array $options) use ($expectedReadLockMode) {
$this->assertArrayHasKey('transaction', $options);
$this->assertNotNull($transactionOptions = $options['transaction']->getBegin());
$this->assertNotNull($readWriteTxnOptions = $transactionOptions->getReadWrite());
$this->assertNotNull($readLockModeOption = $readWriteTxnOptions->getReadLockMode());
$this->assertEquals(
$expectedReadLockMode,
$readLockModeOption
);
return true;
})
)
->shouldBeCalledOnce()
->willReturn($stream->reveal());

$database = new Database(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
$this->instance,
$this->lro->reveal(),
$this->lroCallables,
self::PROJECT,
self::DATABASE
);

// Test TransactionOption array format with base level property set for readLockMode
// This helps test proper formating by the library to the format expected by Spanner backend
// (i.e. readLockMode should be inside readWrite)
$database->runTransaction(
function (Transaction $t) use ($sql) {
// Run a fake query
$t->executeUpdate($sql);

// Simulate calling Transaction::commmit()
$prop = new \ReflectionProperty($t, 'state');
$prop->setAccessible(true);
$prop->setValue($t, Transaction::STATE_COMMITTED);
},
['transactionOptions' => ['readLockMode' => $expectedReadLockMode, ] ]
);
}

public function testTransactionWithReadLockMode()
{
$expectedReadLockMode = ReadLockMode::OPTIMISTIC;

$this->connection->beginTransaction(
Argument::that(function (array $args) use ($expectedReadLockMode) {
$this->assertArrayHasKey('transactionOptions', $args);
$this->assertArrayHasKey('readWrite', $args['transactionOptions']);
$this->assertArrayHasKey('readLockMode', $args['transactionOptions']['readWrite']);
$this->assertEquals(
$expectedReadLockMode,
$args['transactionOptions']['readWrite']['readLockMode'],
"The read lock mode received was {$args['transactionOptions']['readWrite']['readLockMode']} " .
"does not match expected {$expectedReadLockMode}"
);
return true;
})
)
->shouldBeCalled()
->willReturn(['id' => self::TRANSACTION]);

$t = $this->database->transaction(['transactionOptions' => ['readLockMode' => $expectedReadLockMode, ]]);
$this->assertInstanceOf(Transaction::class, $t);
}

private function createStreamingAPIArgs()
{
$row = ['id' => 1];
Expand Down
83 changes: 81 additions & 2 deletions Spanner/tests/Unit/OperationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
use Google\ApiCore\ServerStream;
use Google\Cloud\Core\Testing\GrpcTestTrait;
use Google\Cloud\Core\Testing\TestHelpers;
use Google\Cloud\Spanner\Admin\Database\V1\DatabaseAdminClient;
use Google\Cloud\Spanner\Batch\QueryPartition;
use Google\Cloud\Spanner\Batch\ReadPartition;
use Google\Cloud\Spanner\Connection\Grpc;
Expand All @@ -36,12 +35,12 @@
use Google\Cloud\Spanner\Tests\StubCreationTrait;
use Google\Cloud\Spanner\Timestamp;
use Google\Cloud\Spanner\Transaction;
use Google\Cloud\Spanner\V1\CommitResponse;
use Google\Cloud\Spanner\V1\ResultSet;
use Google\Cloud\Spanner\V1\ResultSetStats;
use Google\Cloud\Spanner\V1\SpannerClient;
use Google\Cloud\Spanner\V1\Transaction as TransactionProto;
use Google\Cloud\Spanner\V1\TransactionOptions;
use Google\Cloud\Spanner\V1\TransactionOptions\ReadWrite\ReadLockMode as ReadLockMode;
use PHPUnit\Framework\TestCase;
use Prophecy\Argument;
use Prophecy\PhpUnit\ProphecyTrait;
Expand Down Expand Up @@ -421,6 +420,86 @@ public function testExecuteAndExecuteUpdateWithExcludeTxnFromChangeStreams()
]);
}

public function testTransactionWithReadLockMode()
{
$expectedReadLockMode = ReadLockMode::OPTIMISTIC;
$gapic = $this->prophesize(SpannerClient::class);
$gapic->beginTransaction(
self::SESSION,
Argument::that(function (TransactionOptions $options) use ($expectedReadLockMode) {
$this->assertNotNull($readWriteTxnOptions = $options->getReadWrite());
$this->assertNotNull($readLockModeOption = $readWriteTxnOptions->getReadLockMode());
$this->assertEquals(
$expectedReadLockMode,
$readLockModeOption
);
return true;
}),
Argument::type('array')
)
->shouldBeCalled()
->willReturn(new TransactionProto(['id' => 'foo']));

$operation = new Operation(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
true
);

$transaction = $operation->transaction($this->session, [
'transactionOptions' => ['readLockMode' => $expectedReadLockMode, ]
]);

$this->assertEquals('foo', $transaction->id());
}

public function testExecuteAndExecuteUpdateWithReadLockMode()
{
$expectedReadLockMode = ReadLockMode::OPTIMISTIC;
$sql = 'SELECT example FROM sql_query';

$resultSet = new ResultSet(['stats' => new ResultSetStats(['row_count_exact' => 0])]);
$stream = $this->prophesize(ServerStream::class);
$stream->readAll()->shouldBeCalledTimes(2)->willReturn([$resultSet]);

$gapic = $this->prophesize(SpannerClient::class);
$gapic->executeStreamingSql(
self::SESSION,
$sql,
Argument::that(function (array $options) use ($expectedReadLockMode) {
$this->assertArrayHasKey('transaction', $options);
$this->assertNotNull($transactionOptions = $options['transaction']->getBegin());
$this->assertNotNull($readWriteTxnOptions = $transactionOptions->getReadWrite());
$this->assertNotNull($readLockModeOption = $readWriteTxnOptions->getReadLockMode());
$this->assertEquals(
$expectedReadLockMode,
$readLockModeOption
);
return true;
})
)
->shouldBeCalledTimes(2)
->willReturn($stream->reveal());

$operation = new Operation(
new Grpc(['gapicSpannerClient' => $gapic->reveal()]),
true
);

$lockModeOptions = [
'transaction' => [
'begin' => [
'readLockMode' => $expectedReadLockMode,
]
]
];

$operation->execute($this->session, $sql, $lockModeOptions);

$transaction = $this->prophesize(Transaction::class)->reveal();

$operation->executeUpdate($this->session, $transaction, $sql, $lockModeOptions);
}

public function testSnapshot()
{
$this->connection->beginTransaction(Argument::allOf(
Expand Down
Loading