Skip to content

Commit ff633d5

Browse files
chore(spanner): add support for new cloud client test framework in google-cloud-spanner-executor (#7593)
* chore(spanner): add support for new cloud client test framework in google-cloud-spanner-executor * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 9554035 commit ff633d5

8 files changed

Lines changed: 886 additions & 4 deletions

File tree

.github/workflows/system-tests-against-emulator.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,4 @@
3131
# working-directory: handwritten/spanner
3232
# env:
3333
# SPANNER_EMULATOR_HOST: localhost:9010
34-
# GCLOUD_PROJECT: emulator-test-project
34+
# GCLOUD_PROJECT: emulator-test-project
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*!
2+
* Copyright 2026 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import {ServerDuplexStream, status} from '@grpc/grpc-js';
18+
import {Spanner} from '../../src';
19+
import {trace, context, Tracer} from '@opentelemetry/api';
20+
import * as protos from '../../protos/protos';
21+
import {CloudUtil} from './cloud-util';
22+
import {OutcomeSender, ExecutionFlowContextInterface} from './cloud-executor';
23+
import spanner = protos.google.spanner;
24+
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
25+
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;
26+
import ISpannerAction = spanner.executor.v1.ISpannerAction;
27+
import IAdminAction = spanner.executor.v1.IAdminAction;
28+
import ICreateCloudInstanceAction = spanner.executor.v1.ICreateCloudInstanceAction;
29+
30+
/**
31+
* Context for a single stream connection.
32+
*/
33+
export class ExecutionFlowContext implements ExecutionFlowContextInterface {
34+
private call: ServerDuplexStream<
35+
SpannerAsyncActionRequest,
36+
SpannerAsyncActionResponse
37+
>;
38+
39+
constructor(
40+
call: ServerDuplexStream<
41+
SpannerAsyncActionRequest,
42+
SpannerAsyncActionResponse
43+
>,
44+
) {
45+
this.call = call;
46+
}
47+
48+
/**
49+
* Sends a response back to the client.
50+
*/
51+
public onNext(response: SpannerAsyncActionResponse): void {
52+
// Prevent writing if client cancelled the call, or the underlying Node stream is un-writable/destroyed
53+
if (
54+
this.call.cancelled ||
55+
this.call.destroyed ||
56+
this.call.writable === false
57+
) {
58+
console.warn('Attempted to write to a closed or cancelled stream.');
59+
return;
60+
}
61+
62+
this.call.write(response);
63+
}
64+
65+
/**
66+
* Sends an error back to the client.
67+
*/
68+
public onError(error: Error): void {
69+
const stream = this.call as any;
70+
71+
if (this.call.cancelled || stream.destroyed || stream.writable === false) {
72+
console.warn(
73+
'Attempted to emit error to a closed or cancelled stream.',
74+
error,
75+
);
76+
return;
77+
}
78+
79+
this.call.emit('error', error);
80+
}
81+
82+
/**
83+
* Clean up resources associated with the context.
84+
*/
85+
public cleanup(): void {
86+
console.log('Cleaning up ExecutionFlowContext');
87+
}
88+
}
89+
90+
type ActionHandler = (action: any, sender: OutcomeSender) => Promise<void>;
91+
92+
export class CloudClientExecutor {
93+
private spanner: Spanner;
94+
private tracer: Tracer;
95+
96+
private readonly adminActionRegistry: Record<string, ActionHandler> = {
97+
createCloudInstance: (action, sender) =>
98+
this.executeCreateCloudInstance(
99+
action as ICreateCloudInstanceAction,
100+
sender,
101+
),
102+
};
103+
104+
private readonly actionRegistry: Record<string, ActionHandler> = {
105+
admin: (action, sender) =>
106+
this.executeAdminAction(action as IAdminAction, sender),
107+
};
108+
109+
constructor() {
110+
const spannerOptions = CloudUtil.getSpannerOptions();
111+
this.spanner = new Spanner(spannerOptions);
112+
this.tracer = trace.getTracer(CloudClientExecutor.name);
113+
}
114+
115+
/**
116+
* Creates a new ExecutionFlowContext for a stream.
117+
*/
118+
public createExecutionFlowContext(
119+
call: ServerDuplexStream<
120+
SpannerAsyncActionRequest,
121+
SpannerAsyncActionResponse
122+
>,
123+
): ExecutionFlowContext {
124+
return new ExecutionFlowContext(call);
125+
}
126+
127+
/**
128+
* Starts handling a SpannerAsyncActionRequest.
129+
*/
130+
public startHandlingRequest(
131+
req: SpannerAsyncActionRequest,
132+
executionContext: ExecutionFlowContext,
133+
): {code: number; details: string} {
134+
const outcomeSender = new OutcomeSender(req.actionId!, executionContext);
135+
136+
if (!req.action) {
137+
return outcomeSender.finishWithError({
138+
code: status.INVALID_ARGUMENT,
139+
message: 'Invalid request: No action present',
140+
});
141+
}
142+
this.executeAction(outcomeSender, req.action).catch(err => {
143+
console.error('Unhandled exception in action execution:', err);
144+
outcomeSender.finishWithError(err);
145+
});
146+
147+
return {code: status.OK, details: ''};
148+
}
149+
150+
/**
151+
* Determines the specific Spanner action type and routes it to the appropriate handler.
152+
*/
153+
private async executeAction(
154+
outcomeSender: OutcomeSender,
155+
action: ISpannerAction,
156+
): Promise<void> {
157+
const actionType =
158+
Object.keys(action).find(
159+
k =>
160+
action[k as keyof typeof action] !== undefined &&
161+
!!this.actionRegistry[k],
162+
) || 'unknown';
163+
const span = this.tracer.startSpan(`performaction_${actionType}`);
164+
165+
return context.with(trace.setSpan(context.active(), span), async () => {
166+
try {
167+
const handler = this.actionRegistry[actionType];
168+
if (handler) {
169+
await handler(
170+
action[actionType as keyof typeof action],
171+
outcomeSender,
172+
);
173+
return;
174+
}
175+
176+
outcomeSender.finishWithError({
177+
code: status.UNIMPLEMENTED,
178+
message: `Action ${actionType} not implemented yet`,
179+
});
180+
} catch (e: any) {
181+
span.recordException(e);
182+
console.error('Unexpected error:', e);
183+
outcomeSender.finishWithError({
184+
code: status.INVALID_ARGUMENT,
185+
message: `Unexpected error: ${e.message}`,
186+
});
187+
} finally {
188+
span.end();
189+
}
190+
});
191+
}
192+
193+
private async executeAdminAction(
194+
action: IAdminAction,
195+
sender: OutcomeSender,
196+
): Promise<void> {
197+
try {
198+
const adminType = Object.keys(action).find(
199+
k =>
200+
action[k as keyof typeof action] !== undefined &&
201+
!!this.adminActionRegistry[k],
202+
);
203+
204+
if (adminType && this.adminActionRegistry[adminType]) {
205+
await this.adminActionRegistry[adminType](
206+
action[adminType as keyof typeof action],
207+
sender,
208+
);
209+
return;
210+
}
211+
212+
sender.finishWithError({
213+
code: status.UNIMPLEMENTED,
214+
message: `Admin action ${adminType || 'unknown'} not implemented`,
215+
});
216+
} catch (e: any) {
217+
sender.finishWithError(e);
218+
}
219+
}
220+
221+
private async executeCreateCloudInstance(
222+
action: ICreateCloudInstanceAction,
223+
sender: OutcomeSender,
224+
): Promise<void> {
225+
try {
226+
console.log(`Creating instance: \n${JSON.stringify(action, null, 2)}`);
227+
228+
const instanceId = action.instanceId!;
229+
const projectId = action.projectId!;
230+
const configId = action.instanceConfigId!;
231+
232+
const instanceAdminClient = this.spanner.getInstanceAdminClient();
233+
234+
const [operation] = await instanceAdminClient.createInstance({
235+
parent: instanceAdminClient.projectPath(projectId),
236+
instanceId: instanceId,
237+
instance: {
238+
config: instanceAdminClient.instanceConfigPath(projectId, configId),
239+
displayName: instanceId,
240+
nodeCount: action.nodeCount || 1,
241+
processingUnits: action.processingUnits,
242+
labels: action.labels || {},
243+
},
244+
});
245+
246+
console.log('Waiting for instance creation operation to complete...');
247+
await operation.promise();
248+
249+
console.log(`Instance ${instanceId} created successfully.`);
250+
251+
sender.finishWithOK();
252+
} catch (err: any) {
253+
if (err.code === status.ALREADY_EXISTS) {
254+
console.log('Instance already exists, returning OK.');
255+
sender.finishWithOK();
256+
return;
257+
}
258+
console.error('Failed to create instance:', err);
259+
sender.finishWithError(err);
260+
}
261+
}
262+
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*!
2+
* Copyright 2026 Google LLC. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import {ServerDuplexStream, status} from '@grpc/grpc-js';
18+
import {trace, context, Tracer} from '@opentelemetry/api';
19+
import {CloudClientExecutor} from './cloud-client-executor';
20+
import * as protos from '../../protos/protos';
21+
import spanner = protos.google.spanner;
22+
import SpannerAsyncActionRequest = spanner.executor.v1.SpannerAsyncActionRequest;
23+
import SpannerAsyncActionResponse = spanner.executor.v1.SpannerAsyncActionResponse;
24+
25+
/**
26+
* Implements the SpannerExecutorProxy service, which handles asynchronous
27+
* Spanner actions via a bidirectional gRPC stream.
28+
*/
29+
export class CloudExecutorImpl {
30+
private clientExecutor: CloudClientExecutor;
31+
private tracer: Tracer;
32+
33+
constructor() {
34+
this.clientExecutor = new CloudClientExecutor();
35+
36+
this.tracer = trace.getTracer(CloudClientExecutor.name);
37+
}
38+
39+
/**
40+
* Handles incoming SpannerAsyncActionRequest messages from the client.
41+
*/
42+
public executeActionAsync(
43+
call: ServerDuplexStream<
44+
SpannerAsyncActionRequest,
45+
SpannerAsyncActionResponse
46+
>,
47+
): void {
48+
// Create a top-level OpenTelemetry span for streaming request.
49+
const span = this.tracer.startSpan(
50+
'nodejs_systest_execute_actions_stream',
51+
{
52+
root: true,
53+
},
54+
);
55+
56+
const streamContext = trace.setSpan(context.active(), span);
57+
58+
// The executionContext manages the lifecycle and flow state for this specific gRPC stream context.
59+
const executionContext =
60+
this.clientExecutor.createExecutionFlowContext(call);
61+
62+
// Handle receiving requests on duplex stream
63+
// Handle incoming requests sequentially on the duplex stream.
64+
call.on('data', (request: SpannerAsyncActionRequest) => {
65+
context.with(streamContext, () => {
66+
console.log(`Receiving request: \n${JSON.stringify(request, null, 2)}`);
67+
// TODO: Set requestHasReadOrQueryAction flag here when Read/Query are implemented.
68+
try {
69+
const reqStatus = this.clientExecutor.startHandlingRequest(
70+
request,
71+
executionContext,
72+
);
73+
if (reqStatus.code !== status.OK) {
74+
console.error(
75+
`Failed to handle request, half closed: ${reqStatus.details}`,
76+
);
77+
}
78+
} catch (err) {
79+
console.error('Exception when handling request', err);
80+
}
81+
});
82+
});
83+
84+
// Handle stream errors
85+
call.on('error', (err: Error) => {
86+
context.with(streamContext, () => {
87+
console.error('Client ends the stream with error.', err);
88+
span.recordException(err);
89+
span.end();
90+
executionContext.cleanup();
91+
});
92+
});
93+
94+
// Handle the completion of the client stream
95+
call.on('end', () => {
96+
context.with(streamContext, () => {
97+
span.end();
98+
// TODO: Add End-to-End trace verification here once Read/Query actions are implemented.
99+
console.log('Client called Done, half closed');
100+
executionContext.cleanup();
101+
102+
call.end();
103+
});
104+
});
105+
}
106+
}

0 commit comments

Comments
 (0)