Skip to content

Commit 08c5bb9

Browse files
Introduce hooks wrapper for worker-related operations
1 parent df3e9d9 commit 08c5bb9

4 files changed

Lines changed: 272 additions & 84 deletions

File tree

hooks/config.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package hooks
7+
8+
import (
9+
"context"
10+
"sync"
11+
"sync/atomic"
12+
13+
"github.com/dgraph-io/dgo/v250/protos/api"
14+
"github.com/dgraph-io/dgraph/v25/protos/pb"
15+
)
16+
17+
type ZeroHooks interface {
18+
AssignUIDs(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)
19+
AssignTimestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)
20+
AssignNsIDs(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)
21+
CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error)
22+
ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error)
23+
}
24+
25+
type ZeroHooksFns struct {
26+
AssignUIDsFn func(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)
27+
AssignTimestampsFn func(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)
28+
AssignNsIDsFn func(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)
29+
CommitOrAbortFn func(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error)
30+
ApplyMutationsFn func(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error)
31+
}
32+
33+
func (h ZeroHooksFns) AssignUIDs(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
34+
return h.AssignUIDsFn(ctx, num)
35+
}
36+
37+
func (h ZeroHooksFns) AssignTimestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
38+
return h.AssignTimestampsFn(ctx, num)
39+
}
40+
41+
func (h ZeroHooksFns) AssignNsIDs(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
42+
return h.AssignNsIDsFn(ctx, num)
43+
}
44+
45+
func (h ZeroHooksFns) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) {
46+
return h.CommitOrAbortFn(ctx, tc)
47+
}
48+
49+
func (h ZeroHooksFns) ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) {
50+
return h.ApplyMutationsFn(ctx, m)
51+
}
52+
53+
// Config holds the configuration for embedded mode operation.
54+
type Config struct {
55+
// Hooks for bypassing Zero operations
56+
ZeroHooks ZeroHooks
57+
58+
// DataDir is the directory where data files are stored
59+
DataDir string
60+
61+
// CacheSizeMB is the size of the in-memory cache in megabytes
62+
CacheSizeMB int64
63+
}
64+
65+
var (
66+
// globalConfig holds the current embedded configuration
67+
globalConfig atomic.Pointer[Config]
68+
69+
defaultZeroHooks atomic.Value
70+
71+
// enabled tracks whether embedded mode is active
72+
enabled atomic.Bool
73+
74+
// mu protects initialization
75+
mu sync.Mutex
76+
)
77+
78+
// Enable activates embedded mode with the given configuration.
79+
// This must be called before any Dgraph operations.
80+
func Enable(cfg *Config) {
81+
mu.Lock()
82+
defer mu.Unlock()
83+
84+
globalConfig.Store(cfg)
85+
enabled.Store(true)
86+
}
87+
88+
// Disable deactivates embedded mode.
89+
func Disable() {
90+
mu.Lock()
91+
defer mu.Unlock()
92+
93+
enabled.Store(false)
94+
globalConfig.Store(nil)
95+
}
96+
97+
// IsEnabled returns true if embedded mode is currently active.
98+
func IsEnabled() bool {
99+
return enabled.Load()
100+
}
101+
102+
// GetConfig returns the current embedded configuration, or nil if not enabled.
103+
func GetConfig() *Config {
104+
return globalConfig.Load()
105+
}
106+
107+
func SetDefaultZeroHooks(h ZeroHooks) {
108+
defaultZeroHooks.Store(h)
109+
}
110+
111+
// GetHooks returns the active Zero hooks.
112+
// If embedded mode is not enabled, it returns the default hooks implementation.
113+
func GetHooks() ZeroHooks {
114+
cfg := globalConfig.Load()
115+
if cfg != nil && cfg.ZeroHooks != nil {
116+
return cfg.ZeroHooks
117+
}
118+
if h := defaultZeroHooks.Load(); h != nil {
119+
return h.(ZeroHooks)
120+
}
121+
return nil
122+
}

hooks/init.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
/*
2+
* SPDX-FileCopyrightText: © Hypermode Inc. <hello@hypermode.com>
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package hooks
7+
8+
// This package provides configuration and hooks for running Dgraph in embedded mode.
9+
// The actual initialization functions that would create import cycles are intentionally
10+
// left to be called directly by the host application (e.g., modusGraph) using the
11+
// individual packages (edgraph, worker, posting, schema, x).
12+
//
13+
// Usage:
14+
// 1. Call hooks.Enable() with your ZeroHooks configuration
15+
// 2. Initialize packages directly: edgraph.Init(), worker.State.InitStorage(), etc.
16+
// 3. The hooks in this package will be called automatically by worker functions
17+
// 4. Call hooks.Disable() when shutting down

worker/default_zero_hooks.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* SPDX-FileCopyrightText: © 2017-2025 Istari Digital, Inc.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package worker
7+
8+
import (
9+
"context"
10+
11+
"google.golang.org/grpc/metadata"
12+
13+
"github.com/dgraph-io/dgo/v250/protos/api"
14+
"github.com/dgraph-io/dgraph/v25/conn"
15+
"github.com/dgraph-io/dgraph/v25/hooks"
16+
"github.com/dgraph-io/dgraph/v25/protos/pb"
17+
"github.com/dgraph-io/dgraph/v25/x"
18+
)
19+
20+
type defaultZeroHooks struct{}
21+
22+
func init() {
23+
hooks.SetDefaultZeroHooks(defaultZeroHooks{})
24+
}
25+
26+
func (defaultZeroHooks) AssignUIDs(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
27+
if num.Type == 0 {
28+
num.Type = pb.Num_UID
29+
}
30+
31+
// Pass on the incoming metadata to the zero. Namespace from the metadata is required by zero.
32+
if md, ok := metadata.FromIncomingContext(ctx); ok {
33+
ctx = metadata.NewOutgoingContext(ctx, md)
34+
}
35+
36+
pl := groups().Leader(0)
37+
if pl == nil {
38+
return nil, conn.ErrNoConnection
39+
}
40+
41+
c := pb.NewZeroClient(pl.Get())
42+
return c.AssignIds(ctx, num)
43+
}
44+
45+
func (defaultZeroHooks) AssignTimestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
46+
pl := groups().connToZeroLeader()
47+
if pl == nil {
48+
return nil, conn.ErrNoConnection
49+
}
50+
51+
c := pb.NewZeroClient(pl.Get())
52+
return c.Timestamps(ctx, num)
53+
}
54+
55+
func (defaultZeroHooks) AssignNsIDs(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
56+
num.Type = pb.Num_NS_ID
57+
58+
pl := groups().Leader(0)
59+
if pl == nil {
60+
return nil, conn.ErrNoConnection
61+
}
62+
63+
c := pb.NewZeroClient(pl.Get())
64+
return c.AssignIds(ctx, num)
65+
}
66+
67+
func (defaultZeroHooks) CommitOrAbort(ctx context.Context, tc *api.TxnContext) (*api.TxnContext, error) {
68+
pl := groups().Leader(0)
69+
if pl == nil {
70+
return nil, conn.ErrNoConnection
71+
}
72+
73+
// Do de-duplication before sending the request to zero.
74+
tc.Keys = x.Unique(tc.Keys)
75+
tc.Preds = x.Unique(tc.Preds)
76+
77+
zc := pb.NewZeroClient(pl.Get())
78+
return zc.CommitOrAbort(ctx, tc)
79+
}
80+
81+
func (defaultZeroHooks) ApplyMutations(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error) {
82+
if groups().ServesGroup(m.GroupId) {
83+
txnCtx := &api.TxnContext{}
84+
return txnCtx, (&grpcWorker{}).proposeAndWait(ctx, txnCtx, m)
85+
}
86+
87+
pl := groups().Leader(m.GroupId)
88+
if pl == nil {
89+
return nil, conn.ErrNoConnection
90+
}
91+
92+
var tc *api.TxnContext
93+
c := pb.NewWorkerClient(pl.Get())
94+
95+
ch := make(chan error, 1)
96+
go func() {
97+
var err error
98+
tc, err = c.Mutate(ctx, m)
99+
ch <- err
100+
}()
101+
102+
select {
103+
case <-ctx.Done():
104+
return nil, ctx.Err()
105+
case err := <-ch:
106+
return tc, err
107+
}
108+
}

0 commit comments

Comments
 (0)