Skip to content

Commit a8e7f63

Browse files
authored
Support for async runtime trace (#36)
* support async runtime
1 parent 506e656 commit a8e7f63

27 files changed

Lines changed: 1185 additions & 19 deletions

File tree

.github/workflows/plugin_test.yaml

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,29 @@ on:
77
- main
88

99
jobs:
10-
Plugins-Test:
11-
name: SkyWalking
10+
Plugin-SkyWalking:
11+
name: Plugin With Apache SkyWalking - ${{matrix.case.name}}
1212
runs-on: ubuntu-latest
13-
env:
14-
e2e_file: "plugin/skywalking/test/sync-request/e2e.yaml"
13+
strategy:
14+
matrix:
15+
case:
16+
- name: Sync Request
17+
e2e: "plugin/skywalking/test/sync-request/e2e.yaml"
18+
env: compose
19+
- name: Binding Event
20+
e2e: "plugin/skywalking/test/binding-event/e2e.yaml"
21+
env: kind
22+
- name: Topic Event
23+
e2e: "plugin/skywalking/test/topic-event/e2e.yaml"
24+
env: kind
1525
steps:
1626
- uses: actions/checkout@v2
17-
with:
18-
submodules: true
1927
- uses: apache/skywalking-infra-e2e@main
2028
with:
21-
e2e-file: ${e2e_file}
29+
e2e-file: ${{matrix.case.e2e}}
2230
- name: Show Container Logs
23-
if: ${{ failure() }}
31+
if: ${{ failure() }} && matrix.case.e2e == 'compose'
2432
run: docker ps -a | grep -v CONTAINER | awk '{print $1}' | xargs -i docker logs {}
2533
- name: Cleanup
2634
if: ${{ failure() }}
27-
run: e2e cleanup -c ${e2e_file}
35+
run: e2e cleanup -c ${{matrix.case.e2e}}

context/context.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,16 @@ const (
5858
type Runtime string
5959
type ResourceType string
6060

61+
type NativeContext interface {
62+
// GetNativeContext returns the Go native context object.
63+
GetNativeContext() context.Context
64+
65+
// SetNativeContext set the Go native context object.
66+
SetNativeContext(context.Context)
67+
}
68+
6169
type RuntimeContext interface {
70+
NativeContext
6271

6372
// GetName returns the function's name.
6473
GetName() string
@@ -69,9 +78,6 @@ type RuntimeContext interface {
6978
// GetContext returns the pointer of raw OpenFunction FunctionContext object.
7079
GetContext() *FunctionContext
7180

72-
// GetNativeContext returns the Go native context object.
73-
GetNativeContext() context.Context
74-
7581
// GetOut returns the pointer of raw OpenFunction FunctionOut object.
7682
GetOut() Out
7783

@@ -150,6 +156,7 @@ type RuntimeContext interface {
150156
}
151157

152158
type Context interface {
159+
NativeContext
153160

154161
// Send provides the ability to allow the user to send data to a specified output target.
155162
Send(outputName string, data []byte) ([]byte, error)
@@ -466,6 +473,10 @@ func (ctx *FunctionContext) GetNativeContext() context.Context {
466473
return ctx.Ctx
467474
}
468475

476+
func (ctx *FunctionContext) SetNativeContext(c context.Context) {
477+
ctx.Ctx = c
478+
}
479+
469480
func (ctx *FunctionContext) SetSyncRequest(w http.ResponseWriter, r *http.Request) {
470481
ctx.mu.Lock()
471482
defer ctx.mu.Unlock()
@@ -805,6 +816,9 @@ func ConvertUserDataToBytes(data interface{}) []byte {
805816
if d, ok := data.([]byte); ok {
806817
return d
807818
}
819+
if d, ok := data.(string); ok {
820+
return []byte(d)
821+
}
808822
if d, err := json.Marshal(data); err != nil {
809823
return nil
810824
} else {

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/OpenFunction/functions-framework-go
33
go 1.15
44

55
require (
6-
github.com/SkyAPM/go2sky v1.4.0
6+
github.com/SkyAPM/go2sky v1.4.1-0.20220302064553-acee2ee29345
77
github.com/cloudevents/sdk-go/v2 v2.4.1
88
github.com/dapr/dapr v1.6.0
99
github.com/dapr/go-sdk v1.3.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,8 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdko
131131
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
132132
github.com/Shopify/sarama v1.23.1/go.mod h1:XLH1GYJnLVE0XCr6KdJGVJRTwY30moWNJ4sERjXX6fs=
133133
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
134-
github.com/SkyAPM/go2sky v1.4.0 h1:4425zOGAd6TQ2DXdUqyFmebxLdqkOFmdtLRQvDsZM7c=
135-
github.com/SkyAPM/go2sky v1.4.0/go.mod h1:O31qs9zF/NYcIqb2ZgAbGloOfhVLvhrxc0qNTqfzErM=
134+
github.com/SkyAPM/go2sky v1.4.1-0.20220302064553-acee2ee29345 h1:Hze7WbR05KIg2U4vojf9iGZ9L5EGZsWq1+IvK//mQ68=
135+
github.com/SkyAPM/go2sky v1.4.1-0.20220302064553-acee2ee29345/go.mod h1:O31qs9zF/NYcIqb2ZgAbGloOfhVLvhrxc0qNTqfzErM=
136136
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
137137
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
138138
github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g=

plugin/skywalking/async-request.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package skywalking
2+
3+
import (
4+
"time"
5+
6+
ofctx "github.com/OpenFunction/functions-framework-go/context"
7+
"github.com/SkyAPM/go2sky"
8+
)
9+
10+
func preAsyncRequestCommonLogic(ofCtx ofctx.RuntimeContext, tracer *go2sky.Tracer) (go2sky.Span, error) {
11+
event := ofCtx.GetInnerEvent()
12+
13+
span, nCtx, err := tracer.CreateEntrySpan(ofCtx.GetNativeContext(), ofCtx.GetName(), func(headerKey string) (string, error) {
14+
value, _ := event.GetMetadata()[headerKey]
15+
return value, nil
16+
})
17+
if err != nil {
18+
return nil, err
19+
}
20+
ofCtx.SetNativeContext(nCtx)
21+
span.Tag(tagRuntime, string(ofctx.Async))
22+
setPublicAttrs(nCtx, ofCtx, span)
23+
24+
return span, err
25+
}
26+
27+
func preTopicEventLogic(ofCtx ofctx.RuntimeContext, tracer *go2sky.Tracer) error {
28+
span, err := preAsyncRequestCommonLogic(ofCtx, tracer)
29+
if err != nil {
30+
return err
31+
}
32+
span.Tag(tagComponentType, string(ofctx.OpenFuncTopic))
33+
return nil
34+
}
35+
36+
func preBindingEventLogic(ofCtx ofctx.RuntimeContext, tracer *go2sky.Tracer) error {
37+
span, err := preAsyncRequestCommonLogic(ofCtx, tracer)
38+
if err != nil {
39+
return err
40+
}
41+
span.Tag(tagComponentType, string(ofctx.OpenFuncBinding))
42+
return nil
43+
}
44+
45+
func postAsyncRequestLogic(ctx ofctx.RuntimeContext) error {
46+
span := go2sky.ActiveSpan(ctx.GetNativeContext())
47+
if span == nil {
48+
return nil
49+
}
50+
defer span.End()
51+
52+
if ofctx.InternalError == ctx.GetOut().GetCode() {
53+
span.Error(time.Now(), "Error on binding event")
54+
}
55+
56+
if ctx.GetError() != nil {
57+
span.Error(time.Now(), ctx.GetError().Error())
58+
}
59+
return nil
60+
}

plugin/skywalking/plugin-skywalking.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ const (
2323

2424
var (
2525
initGo2skyOnce sync.Once
26+
27+
tagComponentType go2sky.Tag = "component.type"
28+
tagRuntime go2sky.Tag = "runtime"
2629
)
2730

2831
type klogWrapper struct {
@@ -96,8 +99,11 @@ func (p *PluginSkywalking) ExecPreHook(ctx ofctx.RuntimeContext, plugins map[str
9699
}
97100

98101
if ctx.GetSyncRequest().Request != nil {
99-
// SyncRequest
100102
return preSyncRequestLogic(ctx, p.tracer)
103+
} else if ctx.GetBindingEvent() != nil {
104+
return preBindingEventLogic(ctx, p.tracer)
105+
} else if ctx.GetTopicEvent() != nil {
106+
return preTopicEventLogic(ctx, p.tracer)
101107
}
102108
return nil
103109
}
@@ -106,8 +112,11 @@ func (p *PluginSkywalking) ExecPostHook(ctx ofctx.RuntimeContext, plugins map[st
106112
if p.tracer == nil {
107113
return nil
108114
}
115+
109116
if ctx.GetSyncRequest().Request != nil {
110117
return postSyncRequestLogic(ctx)
118+
} else if ctx.GetBindingEvent() != nil || ctx.GetTopicEvent() != nil {
119+
return postAsyncRequestLogic(ctx)
111120
}
112121
return nil
113122
}

plugin/skywalking/sync-request.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,22 @@ func preSyncRequestLogic(ofCtx ofctx.RuntimeContext, tracer *go2sky.Tracer) erro
1818
if err != nil {
1919
return err
2020
}
21-
ofCtx.GetSyncRequest().Request = request.WithContext(nCtx)
21+
ofCtx.GetSyncRequest().Request = request.WithContext(nCtx) // HTTPFunction
22+
ofCtx.SetNativeContext(go2sky.WithSpan(ofCtx.GetNativeContext(), span)) // OpenFunction
2223

2324
span.Tag(go2sky.TagHTTPMethod, request.Method)
2425
span.Tag(go2sky.TagURL, fmt.Sprintf("%s%s", request.Host, request.URL.Path))
26+
span.Tag(tagRuntime, string(ofctx.Knative))
2527
setPublicAttrs(nCtx, ofCtx, span)
2628
return nil
2729
}
2830

2931
func postSyncRequestLogic(ctx ofctx.RuntimeContext) error {
30-
request := ctx.GetSyncRequest().Request
31-
span := go2sky.ActiveSpan(request.Context())
32+
span := go2sky.ActiveSpan(ctx.GetNativeContext())
3233
if span == nil {
3334
return nil
3435
}
36+
3537
defer span.End()
3638

3739
if ofctx.InternalError == ctx.GetOut().GetCode() {
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM golang:1.16
2+
3+
ADD . /functions-framework-go
4+
WORKDIR /functions-framework-go
5+
6+
EXPOSE 12345
7+
8+
ENTRYPOINT ["go"]
9+
10+
CMD ["run", "plugin/skywalking/test/binding-event/of/of.go"]
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
FROM golang:1.16
2+
3+
ADD . /functions-framework-go
4+
WORKDIR /functions-framework-go
5+
6+
EXPOSE 12345
7+
8+
ENTRYPOINT ["go"]
9+
10+
CMD ["run", "plugin/skywalking/test/binding-event/provider/provider.go"]
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
setup:
2+
env: kind
3+
file: kind.yaml
4+
steps:
5+
- name: install dapr
6+
command: |
7+
dapr -v || (wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash)
8+
dapr init -k --log-as-json
9+
- name: install helm
10+
command: |
11+
curl https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3 | bash
12+
- name: install kafka
13+
command: |
14+
helm repo add bitnami https://charts.bitnami.com/bitnami
15+
helm repo update
16+
helm install dapr-kafka bitnami/kafka --wait -n kafka --create-namespace -f ./plugin/skywalking/test/script/kafka/kafka-non-persistence.yaml
17+
- name: install mock collector
18+
path: ../script/oap-mock/manifests.yaml
19+
wait:
20+
- namespace: default
21+
resource: pod
22+
for: condition=Ready
23+
- name: build image
24+
command: |
25+
docker build -t skywalking_e2e_provider:latest -f plugin/skywalking/test/binding-event/docker/Dockerfile.provider .
26+
kind load docker-image skywalking_e2e_provider:latest
27+
docker build -t skywalking_e2e_of:latest -f plugin/skywalking/test/binding-event/docker/Dockerfile.of .
28+
kind load docker-image skywalking_e2e_of:latest
29+
- name: setup manifests
30+
path: manifests
31+
wait:
32+
- namespace: default
33+
resource: pod
34+
for: condition=Ready
35+
kind:
36+
expose-ports:
37+
- namespace: default
38+
resource: service/collector
39+
port: 12800
40+
timeout: 20m
41+
42+
cleanup:
43+
# always never success failure
44+
on: success
45+
46+
trigger:
47+
action: ""
48+
49+
verify:
50+
# verify with retry strategy
51+
retry:
52+
# max retry count
53+
count: 10
54+
# the interval between two attempts, e.g. 10s, 1m.
55+
interval: 10s
56+
cases:
57+
- query: curl http://${service_collector_host}:${service_collector_12800}/receiveData
58+
expected: expected.data.yml

0 commit comments

Comments
 (0)