Skip to content

Commit 67402e3

Browse files
authored
Merge pull request #33 from tpiperatgod/dev
Add innerEvent for carrying events in the functions-framework
2 parents ea84c96 + 338793c commit 67402e3

5 files changed

Lines changed: 521 additions & 44 deletions

File tree

context/context.go

Lines changed: 82 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"os"
1010
"reflect"
1111
"strconv"
12+
"strings"
1213
"sync"
1314

1415
cloudevents "github.com/cloudevents/sdk-go/v2"
@@ -19,7 +20,18 @@ import (
1920
)
2021

2122
var (
22-
clientGRPCPort string
23+
clientGRPCPort string
24+
bindingQueueComponents = map[string]bool{
25+
"bindings.kafka": true,
26+
"bindings.rabbitmq": true,
27+
"bindings.aws.sqs": true,
28+
"bindings.aws.kinesis": true,
29+
"bindings.gcp.pubsub": true,
30+
"bindings.azure.eventgrid": true,
31+
"bindings.azure.eventhubs": true,
32+
"bindings.azure.servicebusqueues": true,
33+
"bindings.azure.storagequeues": true,
34+
}
2335
)
2436

2537
const (
@@ -41,6 +53,7 @@ const (
4153
KubernetesMode = "kubernetes"
4254
SelfHostMode = "self-host"
4355
TestModeOn = "on"
56+
innerEventTypePrefix = "io.openfunction.function"
4457
)
4558

4659
type Runtime string
@@ -51,6 +64,9 @@ type RuntimeContext interface {
5164
// GetName returns the function's name.
5265
GetName() string
5366

67+
// GetMode returns the operating environment mode of the function.
68+
GetMode() string
69+
5470
// GetContext returns the pointer of raw OpenFunction FunctionContext object.
5571
GetContext() *FunctionContext
5672

@@ -97,12 +113,6 @@ type RuntimeContext interface {
97113
// SetEvent sets the name of the input source and the native event when an event request is received.
98114
SetEvent(inputName string, event interface{})
99115

100-
// SetEventMetadata sets the metadata of the EventRequest.
101-
SetEventMetadata(key string, value string)
102-
103-
// GetEventMetadata returns the metadata of the EventRequest.
104-
GetEventMetadata() map[string]string
105-
106116
// GetInputs returns the mapping relationship of *Input.
107117
GetInputs() map[string]*Input
108118

@@ -121,6 +131,9 @@ type RuntimeContext interface {
121131
// GetCloudEvent returns the pointer of v2.Event.
122132
GetCloudEvent() *cloudevents.Event
123133

134+
// GetInnerEvent returns the InnerEvent.
135+
GetInnerEvent() InnerEvent
136+
124137
// WithOut adds the FunctionOut object to the RuntimeContext.
125138
WithOut(out *FunctionOut) RuntimeContext
126139

@@ -159,6 +172,9 @@ type Context interface {
159172

160173
// GetCloudEvent returns the pointer of v2.Event.
161174
GetCloudEvent() *cloudevents.Event
175+
176+
// GetInnerEvent returns the InnerEvent.
177+
GetInnerEvent() InnerEvent
162178
}
163179

164180
type Out interface {
@@ -230,7 +246,7 @@ type EventRequest struct {
230246
BindingEvent *common.BindingEvent `json:"bindingEvent,omitempty"`
231247
TopicEvent *common.TopicEvent `json:"topicEvent,omitempty"`
232248
CloudEvent *cloudevents.Event `json:"cloudEventnt,omitempty"`
233-
Metadata map[string]string `json:"metadata,omitempty"`
249+
innerEvent InnerEvent
234250
}
235251

236252
type SyncRequest struct {
@@ -239,18 +255,20 @@ type SyncRequest struct {
239255
}
240256

241257
type Input struct {
242-
Uri string `json:"uri,omitempty"`
243-
Component string `json:"component,omitempty"`
244-
Type ResourceType `json:"type"`
245-
Metadata map[string]string `json:"metadata,omitempty"`
258+
Uri string `json:"uri,omitempty"`
259+
Component string `json:"component"`
260+
ComponentType string `json:"componentType"`
261+
Type ResourceType `json:"type"`
262+
Metadata map[string]string `json:"metadata,omitempty"`
246263
}
247264

248265
type Output struct {
249-
Uri string `json:"uri,omitempty"`
250-
Component string `json:"component,omitempty"`
251-
Type ResourceType `json:"type"`
252-
Metadata map[string]string `json:"metadata,omitempty"`
253-
Operation string `json:"operation,omitempty"`
266+
Uri string `json:"uri,omitempty"`
267+
Component string `json:"component"`
268+
ComponentType string `json:"componentType"`
269+
Type ResourceType `json:"type"`
270+
Metadata map[string]string `json:"metadata,omitempty"`
271+
Operation string `json:"operation,omitempty"`
254272
}
255273

256274
type FunctionOut struct {
@@ -310,21 +328,34 @@ func (ctx *FunctionContext) Send(outputName string, data []byte) ([]byte, error)
310328
var err error
311329
var output *Output
312330
var response *dapr.BindingEvent
331+
var payload interface{}
332+
var payloadBytes []byte
313333

314334
if v, ok := ctx.Outputs[outputName]; ok {
315335
output = v
316336
} else {
317337
return nil, fmt.Errorf("output %s not found", outputName)
318338
}
319339

340+
payload = data
341+
payloadBytes = data
342+
343+
if traceable(output.ComponentType) {
344+
ie := NewInnerEvent(ctx)
345+
ie.MergeMetadata(ctx.GetInnerEvent())
346+
ie.SetUserData(data)
347+
payload = ie.GetCloudEvent()
348+
payloadBytes = ie.GetCloudEventJSON()
349+
}
350+
320351
switch output.Type {
321352
case OpenFuncTopic:
322-
err = ctx.daprClient.PublishEvent(context.Background(), output.Component, output.Uri, data)
353+
err = ctx.daprClient.PublishEvent(context.Background(), output.Component, output.Uri, payload)
323354
case OpenFuncBinding:
324355
in := &dapr.InvokeBindingRequest{
325356
Name: output.Component,
326357
Operation: output.Operation,
327-
Data: data,
358+
Data: payloadBytes,
328359
Metadata: output.Metadata,
329360
}
330361
response, err = ctx.daprClient.InvokeBinding(context.Background(), in)
@@ -437,32 +468,32 @@ func (ctx *FunctionContext) SetSyncRequest(w http.ResponseWriter, r *http.Reques
437468
}
438469

439470
func (ctx *FunctionContext) SetEvent(inputName string, event interface{}) {
440-
ctx.mu.Lock()
441-
defer ctx.mu.Unlock()
442471
switch t := event.(type) {
443472
case *common.BindingEvent:
444-
ctx.Event.BindingEvent = event.(*common.BindingEvent)
473+
be := event.(*common.BindingEvent)
474+
ie := convertEvent(ctx, inputName, be.Data)
475+
ctx.setEvent(inputName, be, nil, nil, ie)
445476
case *common.TopicEvent:
446-
ctx.Event.TopicEvent = event.(*common.TopicEvent)
477+
te := event.(*common.TopicEvent)
478+
ie := convertEvent(ctx, inputName, te.Data)
479+
ctx.setEvent(inputName, nil, te, nil, ie)
447480
case *cloudevents.Event:
448-
ctx.Event.CloudEvent = event.(*cloudevents.Event)
481+
ce := event.(*cloudevents.Event)
482+
ie := convertEvent(ctx, inputName, ce.Data())
483+
ctx.setEvent(inputName, nil, nil, ce, ie)
449484
default:
450485
klog.Errorf("failed to resolve event type: %v", t)
451486
}
452-
ctx.Event.InputName = inputName
453487
}
454488

455-
func (ctx *FunctionContext) SetEventMetadata(key string, value string) {
489+
func (ctx *FunctionContext) setEvent(name string, be *common.BindingEvent, te *common.TopicEvent, ce *cloudevents.Event, ie InnerEvent) {
456490
ctx.mu.Lock()
457491
defer ctx.mu.Unlock()
458-
if ctx.Event.Metadata == nil {
459-
ctx.Event.Metadata = map[string]string{}
460-
}
461-
ctx.Event.Metadata[key] = value
462-
}
463-
464-
func (ctx *FunctionContext) GetEventMetadata() map[string]string {
465-
return ctx.Event.Metadata
492+
ctx.Event.InputName = name
493+
ctx.Event.BindingEvent = be
494+
ctx.Event.TopicEvent = te
495+
ctx.Event.CloudEvent = ce
496+
ctx.Event.innerEvent = ie
466497
}
467498

468499
func (ctx *FunctionContext) GetName() string {
@@ -505,6 +536,10 @@ func (ctx *FunctionContext) GetCloudEvent() *cloudevents.Event {
505536
return ctx.Event.CloudEvent
506537
}
507538

539+
func (ctx *FunctionContext) GetInnerEvent() InnerEvent {
540+
return ctx.Event.innerEvent
541+
}
542+
508543
func (ctx *FunctionContext) GetPluginsTracingCfg() TracingConfig {
509544
return ctx.PluginsTracing
510545
}
@@ -736,3 +771,16 @@ func parseContext() (*FunctionContext, error) {
736771
func NewFunctionOut() *FunctionOut {
737772
return &FunctionOut{}
738773
}
774+
775+
// Convert queue binding event into cloud event format to add tracing metadata in the cloud event context.
776+
func traceable(t string) bool {
777+
778+
// All events sent to dapr pubsub components need to be encapsulated
779+
if strings.HasPrefix(t, "pubsub") {
780+
return true
781+
}
782+
783+
// For dapr binding components, let the mapping conditions of the bindingQueueComponents
784+
// determine if the tracing metadata can be added.
785+
return bindingQueueComponents[t]
786+
}

0 commit comments

Comments
 (0)