Skip to content

Commit b0879ce

Browse files
committed
adjust the structure of Input and Output
Signed-off-by: laminar <fangtian@kubesphere.io>
1 parent 67402e3 commit b0879ce

4 files changed

Lines changed: 57 additions & 38 deletions

File tree

context/context.go

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -256,21 +256,33 @@ type SyncRequest struct {
256256

257257
type Input struct {
258258
Uri string `json:"uri,omitempty"`
259-
Component string `json:"component"`
259+
ComponentName string `json:"componentName"`
260260
ComponentType string `json:"componentType"`
261-
Type ResourceType `json:"type"`
262261
Metadata map[string]string `json:"metadata,omitempty"`
263262
}
264263

264+
// GetType will be called after the context has been parsed correctly,
265+
// therefore we do not have to handle the error return of getBuildingBlockType()
266+
func (i *Input) GetType() ResourceType {
267+
bbt, _ := getBuildingBlockType(i.ComponentType)
268+
return bbt
269+
}
270+
265271
type Output struct {
266272
Uri string `json:"uri,omitempty"`
267-
Component string `json:"component"`
273+
ComponentName string `json:"componentName"`
268274
ComponentType string `json:"componentType"`
269-
Type ResourceType `json:"type"`
270275
Metadata map[string]string `json:"metadata,omitempty"`
271276
Operation string `json:"operation,omitempty"`
272277
}
273278

279+
// GetType will be called after the context has been parsed correctly,
280+
// therefore we do not have to handle the error return of getBuildingBlockType()
281+
func (o *Output) GetType() ResourceType {
282+
bbt, _ := getBuildingBlockType(o.ComponentType)
283+
return bbt
284+
}
285+
274286
type FunctionOut struct {
275287
mu sync.Mutex
276288
Code int `json:"code"`
@@ -348,12 +360,12 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
348360
payloadBytes = ie.GetCloudEventJSON()
349361
}
350362

351-
switch output.Type {
363+
switch output.GetType() {
352364
case OpenFuncTopic:
353-
err = ctx.daprClient.PublishEvent(context.Background(), output.Component, output.Uri, payload)
365+
err = ctx.daprClient.PublishEvent(context.Background(), output.ComponentName, output.Uri, payload)
354366
case OpenFuncBinding:
355367
in := &dapr.InvokeBindingRequest{
356-
Name: output.Component,
368+
Name: output.ComponentName,
357369
Operation: output.Operation,
358370
Data: payloadBytes,
359371
Metadata: output.Metadata,
@@ -684,22 +696,18 @@ func parseContext() (*FunctionContext, error) {
684696

685697
if !ctx.HasInputs() {
686698
for name, in := range ctx.Inputs {
687-
switch in.Type {
688-
case OpenFuncBinding, OpenFuncTopic:
689-
break
690-
default:
691-
return nil, fmt.Errorf("invalid input type %s: %s", name, in.Type)
699+
if _, err := getBuildingBlockType(in.ComponentType); err != nil {
700+
klog.Errorf("failed to get building block type for input %s: %v", name, err)
701+
return nil, err
692702
}
693703
}
694704
}
695705

696706
if !ctx.HasOutputs() {
697707
for name, out := range ctx.Outputs {
698-
switch out.Type {
699-
case OpenFuncBinding, OpenFuncTopic:
700-
break
701-
default:
702-
return nil, fmt.Errorf("invalid output type %s: %s", name, out.Type)
708+
if _, err := getBuildingBlockType(out.ComponentType); err != nil {
709+
klog.Errorf("failed to get building block type for output %s: %v", name, err)
710+
return nil, err
703711
}
704712
}
705713
}
@@ -784,3 +792,17 @@ func traceable(t string) bool {
784792
// determine if the tracing metadata can be added.
785793
return bindingQueueComponents[t]
786794
}
795+
796+
func getBuildingBlockType(componentType string) (ResourceType, error) {
797+
typeSplit := strings.Split(componentType, ".")
798+
if len(typeSplit) > 1 {
799+
t := typeSplit[0]
800+
switch ResourceType(t) {
801+
case OpenFuncBinding, OpenFuncTopic:
802+
return ResourceType(t), nil
803+
default:
804+
return "", fmt.Errorf("unknown component type: %s", t)
805+
}
806+
}
807+
return "", errors.New("invalid component type")
808+
}

context/innerevent_test.go

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,36 @@ var funcCtx = `{
1717
"inputs": {
1818
"cron": {
1919
"uri": "cron_input",
20-
"type": "bindings",
21-
"component": "cron_input"
20+
"componentName": "cron_input",
21+
"componentType": "bindings.cron"
2222
},
2323
"eventbus": {
2424
"uri": "default",
25-
"type": "pubsub",
26-
"component": "nats_eventbus"
25+
"componentName": "nats_eventbus",
26+
"componentType": "pubsub.natsstreaming"
2727
}
2828
},
2929
"outputs": {
3030
"echo": {
3131
"uri": "echo",
3232
"operation": "create",
33-
"component": "echo",
33+
"componentName": "echo",
3434
"componentType": "bindings.kafka",
3535
"metadata": {
3636
"path": "echo",
3737
"Content-Type": "application/json; charset=utf-8"
38-
},
39-
"type": "bindings"
38+
}
4039
},
4140
"target": {
4241
"uri": "sample",
4342
"operation": "create",
44-
"component": "kafka-server",
45-
"componentType": "pubsub.kafka",
46-
"type": "pubsub"
43+
"componentName": "kafka-server",
44+
"componentType": "pubsub.kafka"
4745
},
4846
"target2": {
4947
"uri": "cron_output",
50-
"component": "cron_output",
51-
"componentType": "bindings.cron",
52-
"type": "bindings"
48+
"componentName": "cron_output",
49+
"componentType": "bindings.cron"
5350
}
5451
}
5552
}`

framework/framework_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ func TestAsyncBindingsFunction(t *testing.T) {
176176
"inputs": {
177177
"cron": {
178178
"uri": "test",
179-
"type": "bindings",
180-
"component": "test"
179+
"componentName": "test",
180+
"componentType": "bindings.Kafka"
181181
}
182182
}
183183
}`
@@ -259,8 +259,8 @@ func TestAsyncPubsubTopic(t *testing.T) {
259259
"inputs": {
260260
"sub": {
261261
"uri": "my_topic",
262-
"type": "pubsub",
263-
"component": "msg"
262+
"componentName": "msg",
263+
"componentType": "pubsub.kafka"
264264
}
265265
}
266266
}`

runtime/async/async.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,9 @@ func (r *Runtime) RegisterOpenFunction(
9090
// Serving function with inputs
9191
if ctx.HasInputs() {
9292
for name, input := range ctx.GetInputs() {
93-
switch input.Type {
93+
switch input.GetType() {
9494
case ofctx.OpenFuncBinding:
95-
input.Uri = input.Component
95+
input.Uri = input.ComponentName
9696
funcErr = r.handler.AddBindingInvocationHandler(input.Uri, func(c context.Context, in *dapr.BindingEvent) (out []byte, err error) {
9797
rm := runtime.NewRuntimeManager(ctx, prePlugins, postPlugins)
9898
rm.FuncContext.SetEvent(name, in)
@@ -109,7 +109,7 @@ func (r *Runtime) RegisterOpenFunction(
109109
})
110110
case ofctx.OpenFuncTopic:
111111
sub := &dapr.Subscription{
112-
PubsubName: input.Component,
112+
PubsubName: input.ComponentName,
113113
Topic: input.Uri,
114114
}
115115
funcErr = r.handler.AddTopicEventHandler(sub, func(c context.Context, e *dapr.TopicEvent) (retry bool, err error) {
@@ -137,7 +137,7 @@ func (r *Runtime) RegisterOpenFunction(
137137
}
138138
})
139139
default:
140-
return fmt.Errorf("invalid input type: %s", input.Type)
140+
return fmt.Errorf("invalid input type: %s", input.GetType())
141141
}
142142
if funcErr != nil {
143143
// When the function throws an exception,

0 commit comments

Comments
 (0)