Skip to content

Commit 9f2ac64

Browse files
committed
feat: implement input compactors for various registries
- Added input compactor functionality to ModelRegistry, KnowledgeBaseRegistry, TabularStorageRegistry, TaskRegistry, McpServerRegistry, and CredentialStoreRegistry to convert resolved instances back to their string IDs. - Introduced a new InputCompactor module to handle compaction logic. - Updated InputResolver and Task classes to support the new compaction feature. - Added tests for the InputCompactor to ensure correct behavior across different schemas and formats.
1 parent 54ad587 commit 9f2ac64

14 files changed

Lines changed: 797 additions & 7 deletions

File tree

packages/ai/src/model/ModelRegistry.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import {
88
createServiceToken,
99
globalServiceRegistry,
10+
registerInputCompactor,
1011
registerInputResolver,
1112
ServiceRegistry,
1213
} from "@workglow/util";
@@ -66,3 +67,19 @@ async function resolveModelFromRegistry(
6667

6768
// Register the model resolver for format: "model" and "model:*"
6869
registerInputResolver("model", resolveModelFromRegistry);
70+
71+
// Register the model compactor — extracts model_id from a ModelConfig
72+
registerInputCompactor("model", async (value, _format, registry) => {
73+
if (typeof value === "object" && value !== null && "model_id" in value) {
74+
const id = (value as Record<string, unknown>).model_id;
75+
if (typeof id !== "string") return undefined;
76+
const modelRepo = registry.has(MODEL_REPOSITORY)
77+
? registry.get<ModelRepository>(MODEL_REPOSITORY)
78+
: getGlobalModelRepository();
79+
80+
const model = await modelRepo.findByName(id);
81+
if (!model) return undefined;
82+
return id;
83+
}
84+
return undefined;
85+
});

packages/knowledge-base/src/knowledge-base/KnowledgeBaseRegistry.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import {
88
createServiceToken,
99
globalServiceRegistry,
10+
registerInputCompactor,
1011
registerInputResolver,
1112
ServiceRegistry,
1213
} from "@workglow/util";
@@ -125,3 +126,15 @@ async function resolveKnowledgeBaseFromRegistry(
125126

126127
// Register the resolver for format: "knowledge-base"
127128
registerInputResolver("knowledge-base", resolveKnowledgeBaseFromRegistry);
129+
130+
// Register the compactor — reverse map lookup by identity
131+
registerInputCompactor("knowledge-base", (value, _format, registry) => {
132+
const kbs = registry.has(KNOWLEDGE_BASES)
133+
? registry.get<Map<string, KnowledgeBase>>(KNOWLEDGE_BASES)
134+
: getGlobalKnowledgeBases();
135+
136+
for (const [id, kb] of kbs) {
137+
if (kb === value) return id;
138+
}
139+
return undefined;
140+
});

packages/storage/src/tabular/TabularStorageRegistry.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import {
88
createServiceToken,
99
globalServiceRegistry,
10+
registerInputCompactor,
1011
registerInputResolver,
1112
ServiceRegistry,
1213
} from "@workglow/util";
@@ -77,3 +78,15 @@ function resolveRepositoryFromRegistry(
7778

7879
// Register the repository resolver for format: "storage:tabular"
7980
registerInputResolver("storage:tabular", resolveRepositoryFromRegistry);
81+
82+
// Register the compactor — reverse map lookup by identity
83+
registerInputCompactor("storage:tabular", (value, _format, registry) => {
84+
const repos = registry.has(TABULAR_REPOSITORIES)
85+
? registry.get(TABULAR_REPOSITORIES)
86+
: getGlobalTabularRepositories();
87+
88+
for (const [id, repo] of repos) {
89+
if (repo === value) return id;
90+
}
91+
return undefined;
92+
});

packages/task-graph/src/task-graph/GraphToWorkflowCode.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -453,12 +453,28 @@ function extractLoopConfig(task: ITask): Record<string, unknown> {
453453
/**
454454
* Builds the argument string for a prototype method call.
455455
*/
456+
/**
457+
* Strips entries whose value is `undefined` — they carry no information
458+
* and would emit noisy `key: undefined` literals in the generated code.
459+
*/
460+
function stripUndefined(
461+
obj: Record<string, unknown> | undefined
462+
): Record<string, unknown> | undefined {
463+
if (!obj) return obj;
464+
const result: Record<string, unknown> = {};
465+
for (const [k, v] of Object.entries(obj)) {
466+
if (v !== undefined) result[k] = v;
467+
}
468+
return Object.keys(result).length > 0 ? result : undefined;
469+
}
470+
456471
function buildMethodArgs(
457472
defaults: Record<string, unknown> | undefined,
458473
config: Record<string, unknown>,
459474
baseIndent: string = "",
460475
columnOffset: number = 0
461476
): string {
477+
defaults = stripUndefined(defaults);
462478
const hasDefaults = defaults && Object.keys(defaults).length > 0;
463479
const hasConfig = Object.keys(config).length > 0;
464480

packages/task-graph/src/task-graph/Workflow.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,19 @@ export class Workflow<
361361
* @param cache - Optional repository for task outputs
362362
* @param parent - Optional parent workflow (for loop builder mode)
363363
* @param iteratorTask - Optional iterator task being configured (for loop builder mode)
364+
* @param registry - Optional service registry to use for this workflow run
365+
* @returns A new Workflow instance
364366
*/
365-
constructor(cache?: TaskOutputRepository, parent?: Workflow, iteratorTask?: GraphAsTask) {
367+
constructor(
368+
cache?: TaskOutputRepository,
369+
parent?: Workflow,
370+
iteratorTask?: GraphAsTask,
371+
registry?: ServiceRegistry
372+
) {
366373
this._outputCache = cache;
367374
this._parentWorkflow = parent;
368375
this._iteratorTask = iteratorTask;
376+
this._registry = registry ?? parent?._registry;
369377
this._graph = new TaskGraph({ outputCache: this._outputCache });
370378

371379
if (!parent) {
@@ -379,6 +387,7 @@ export class Workflow<
379387
private _dataFlows: Dataflow[] = [];
380388
private _error: string = "";
381389
private _outputCache?: TaskOutputRepository;
390+
/** @internal */ private _registry?: ServiceRegistry;
382391

383392
// Abort controller for cancelling task execution
384393
private _abortController?: AbortController;
@@ -601,7 +610,7 @@ export class Workflow<
601610
const output = await this.graph.run<Output>(input, {
602611
parentSignal: this._abortController.signal,
603612
outputCache: this._outputCache,
604-
registry: config?.registry,
613+
registry: config?.registry ?? this._registry,
605614
});
606615
const results = this.graph.mergeExecuteOutputsToRunOutput<Output, typeof PROPERTY_ARRAY>(
607616
output,
@@ -970,7 +979,11 @@ export class Workflow<
970979
O extends DataPorts,
971980
C extends TaskConfig = TaskConfig,
972981
>(taskClass: ITaskConstructor<I, O, C>, input: I, config: C): ITask<I, O, C> {
973-
const task = new taskClass(input, config);
982+
const task = new taskClass(
983+
input,
984+
config,
985+
this._registry ? { registry: this._registry } : undefined
986+
);
974987
const id = this.graph.addTask(task);
975988
this.events.emit("changed", id);
976989
return task;
@@ -1044,7 +1057,8 @@ export class Workflow<
10441057
const loopBuilder = new Workflow(
10451058
this.outputCache(),
10461059
this,
1047-
task as unknown as GraphAsTask
1060+
task as unknown as GraphAsTask,
1061+
this._registry
10481062
) as unknown as Workflow<I, O>;
10491063
if (parent) {
10501064
loopBuilder._pendingLoopConnect = { parent, iteratorTask: task };
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
/**
2+
* @license
3+
* Copyright 2025 Steven Roussey <sroussey@gmail.com>
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
import type { DataPortSchema } from "@workglow/util/schema";
8+
import type { ServiceRegistry } from "@workglow/util";
9+
import { getInputCompactors } from "@workglow/util";
10+
import { getSchemaFormat, getFormatPrefix, getObjectSchema } from "./InputResolver";
11+
12+
/**
13+
* Configuration for the input compactor
14+
*/
15+
export interface InputCompactorConfig {
16+
readonly registry: ServiceRegistry;
17+
}
18+
19+
/**
20+
* Checks if a schema allows a string variant, recursively checking
21+
* through oneOf/anyOf nesting (e.g., TypeSingleOrArray(TypeModel(...))).
22+
*/
23+
function schemaAllowsString(schema: unknown): boolean {
24+
if (typeof schema !== "object" || schema === null) return false;
25+
const s = schema as Record<string, unknown>;
26+
27+
if (s.type === "string") return true;
28+
29+
const variants = (s.oneOf ?? s.anyOf) as unknown[] | undefined;
30+
if (Array.isArray(variants)) {
31+
for (const variant of variants) {
32+
if (schemaAllowsString(variant)) return true;
33+
}
34+
}
35+
36+
return false;
37+
}
38+
39+
/**
40+
* Compacts resolved inputs by converting instances back to their string IDs.
41+
* This is the reverse of `resolveSchemaInputs()` — objects with registered
42+
* compactors are replaced with their string identifier when the schema
43+
* allows a string variant (oneOf/anyOf with type: "string").
44+
*
45+
* @param input The task input object with resolved values
46+
* @param schema The task's input/config schema
47+
* @param config Configuration including the service registry
48+
* @returns The input with compacted values (objects replaced with string IDs)
49+
*
50+
* @example
51+
* ```typescript
52+
* // Compact a resolved model config back to its ID
53+
* const compacted = await compactSchemaInputs(
54+
* { model: { model_id: "gpt-4", provider: "openai", ... } },
55+
* taskSchema,
56+
* { registry: globalServiceRegistry }
57+
* );
58+
* // compacted.model === "gpt-4"
59+
* ```
60+
*/
61+
export async function compactSchemaInputs<T extends Record<string, unknown>>(
62+
input: T,
63+
schema: DataPortSchema,
64+
config: InputCompactorConfig
65+
): Promise<T> {
66+
if (typeof schema === "boolean") return input;
67+
68+
const properties = schema.properties;
69+
if (!properties || typeof properties !== "object") return input;
70+
71+
const compactors = getInputCompactors();
72+
const compacted: Record<string, unknown> = { ...input };
73+
74+
for (const [key, propSchema] of Object.entries(properties)) {
75+
let value = compacted[key];
76+
77+
const format = getSchemaFormat(propSchema);
78+
if (format) {
79+
let compactor = compactors.get(format);
80+
if (!compactor) {
81+
const prefix = getFormatPrefix(format);
82+
compactor = compactors.get(prefix);
83+
}
84+
85+
if (compactor) {
86+
// Handle object values: attempt to compact to string ID
87+
// Only compact if the schema allows a string variant (oneOf/anyOf with type: "string")
88+
if (
89+
value !== null &&
90+
value !== undefined &&
91+
typeof value === "object" &&
92+
!Array.isArray(value) &&
93+
schemaAllowsString(propSchema)
94+
) {
95+
const id = await compactor(value, format, config.registry);
96+
if (id !== undefined) {
97+
compacted[key] = id;
98+
continue; // Replaced with string — skip recursion
99+
}
100+
}
101+
// Handle arrays: compact object elements to strings where possible
102+
else if (Array.isArray(value)) {
103+
compacted[key] = await Promise.all(
104+
value.map(async (item) => {
105+
if (
106+
item !== null &&
107+
item !== undefined &&
108+
typeof item === "object" &&
109+
!Array.isArray(item)
110+
) {
111+
const id = await compactor(item, format, config.registry);
112+
return id !== undefined ? id : item;
113+
}
114+
return item;
115+
})
116+
);
117+
continue;
118+
}
119+
// String values are already compact — pass through
120+
}
121+
}
122+
123+
// Recurse into object values that have nested properties in schema
124+
if (
125+
value !== null &&
126+
value !== undefined &&
127+
typeof value === "object" &&
128+
!Array.isArray(value)
129+
) {
130+
const objectSchema = getObjectSchema(propSchema);
131+
if (objectSchema) {
132+
compacted[key] = await compactSchemaInputs(
133+
value as Record<string, unknown>,
134+
objectSchema as DataPortSchema,
135+
config
136+
);
137+
}
138+
}
139+
}
140+
141+
return compacted as T;
142+
}

packages/task-graph/src/task/InputResolver.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export interface InputResolverConfig {
1818
/**
1919
* Extracts the format string from a schema, handling oneOf/anyOf wrappers.
2020
*/
21-
function getSchemaFormat(schema: unknown): string | undefined {
21+
export function getSchemaFormat(schema: unknown): string | undefined {
2222
if (typeof schema !== "object" || schema === null) return undefined;
2323

2424
const s = schema as Record<string, unknown>;
@@ -45,7 +45,7 @@ function getSchemaFormat(schema: unknown): string | undefined {
4545
* This is needed for patterns like `oneOf: [{ type: "string" }, { type: "object", properties: {...} }]`
4646
* where the model can be either a string ID or an inline config object.
4747
*/
48-
function getObjectSchema(
48+
export function getObjectSchema(
4949
schema: unknown
5050
): (Record<string, unknown> & { properties: Record<string, unknown> }) | undefined {
5151
if (typeof schema !== "object" || schema === null) return undefined;
@@ -78,7 +78,7 @@ function getObjectSchema(
7878
* For "model:TextEmbedding" returns "model"
7979
* For "storage:tabular" returns "storage"
8080
*/
81-
function getFormatPrefix(format: string): string {
81+
export function getFormatPrefix(format: string): string {
8282
const colonIndex = format.indexOf(":");
8383
return colonIndex >= 0 ? format.substring(0, colonIndex) : format;
8484
}

packages/task-graph/src/task/TaskRegistry.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import {
88
createServiceToken,
99
globalServiceRegistry,
10+
registerInputCompactor,
1011
registerInputResolver,
1112
ServiceRegistry,
1213
} from "@workglow/util";
@@ -145,3 +146,15 @@ function resolveTaskFromRegistry(
145146

146147
// Register the tasks resolver for format: "tasks"
147148
registerInputResolver("tasks", resolveTaskFromRegistry);
149+
150+
// Register the tasks compactor — extracts name from a resolved task definition
151+
registerInputCompactor("tasks", (value, _format, registry) => {
152+
if (typeof value === "object" && value !== null && "name" in value) {
153+
const name = (value as Record<string, unknown>).name;
154+
if (typeof name !== "string") return undefined;
155+
const constructors = getTaskConstructors(registry);
156+
const ctor = constructors.get(name);
157+
return ctor ? name : undefined;
158+
}
159+
return undefined;
160+
});

packages/task-graph/src/task/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export * from "./FallbackTask";
1010
export * from "./FallbackTaskRunner";
1111
export * from "./GraphAsTask";
1212
export * from "./GraphAsTaskRunner";
13+
export * from "./InputCompactor";
1314
export * from "./InputResolver";
1415
export * from "./ITask";
1516
export * from "./iterationSchema";

packages/tasks/src/mcp-server/McpServerRegistry.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import {
88
createServiceToken,
99
globalServiceRegistry,
10+
registerInputCompactor,
1011
registerInputResolver,
1112
ServiceRegistry,
1213
} from "@workglow/util";
@@ -92,3 +93,12 @@ async function resolveServerFromRegistry(
9293
}
9394

9495
registerInputResolver("mcp-server", resolveServerFromRegistry);
96+
97+
// Register the compactor — extracts server_id from an McpServerRecord
98+
registerInputCompactor("mcp-server", (value) => {
99+
if (typeof value === "object" && value !== null && "server_id" in value) {
100+
const id = (value as Record<string, unknown>).server_id;
101+
return typeof id === "string" ? id : undefined;
102+
}
103+
return undefined;
104+
});

0 commit comments

Comments
 (0)