Skip to content

Commit c2c4f2d

Browse files
fix(txn): for lossy indexes, change comparison function to first check the txn cache (#9567)
**Description** Fixes #9556 This PR fixes an issue where comparison functions invoked in transactions would miss un-committed values. This change ensure the transaction cache is first searched, before resuming the old flow through posting.GetNoStore and fetchValue. A test was also added. **Checklist** - [x] The PR title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/#summary) syntax, leading with `fix:`, `feat:`, `chore:`, `ci:`, etc. - [x] Code compiles correctly and linting (via trunk) passes locally - [x] Tests added for new functionality, or regression tests for bug fixes added as applicable
1 parent 4a5187d commit c2c4f2d

2 files changed

Lines changed: 82 additions & 6 deletions

File tree

query/query0_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,15 @@ package query
1010

1111
import (
1212
"context"
13+
"encoding/json"
1314
"fmt"
1415
"strings"
1516
"testing"
17+
"time"
1618

1719
"github.com/stretchr/testify/require"
1820

21+
"github.com/dgraph-io/dgo/v250/protos/api"
1922
"github.com/dgraph-io/dgraph/v25/dgraphapi"
2023
"github.com/dgraph-io/dgraph/v25/dgraphtest"
2124
"github.com/dgraph-io/dgraph/v25/dql"
@@ -2886,6 +2889,70 @@ func TestDateTimeQuery(t *testing.T) {
28862889
processQueryNoErr(t, query))
28872890
}
28882891

2892+
// TestLossyIndexInUncommittedTxn tests that queries within an uncommitted
2893+
// transaction can find data that was mutated in that same transaction when using
2894+
// a lossy index like @index(hour).
2895+
//
2896+
// Issue #9556: The bug occurs because lossy indexes require a two-step query:
2897+
// 1. Index lookup - finds candidate UIDs
2898+
// 2. Value verification - re-checks actual values since hour granularity is imprecise
2899+
func TestLossyIndexInUncommittedTxn(t *testing.T) {
2900+
ctx := context.Background()
2901+
2902+
// Use a unique datetime value to avoid conflicts with existing test data
2903+
testTime := time.Now().UTC().Truncate(time.Second)
2904+
testTimeStr := testTime.Format(time.RFC3339)
2905+
2906+
// Create a new transaction - DO NOT commit yet
2907+
txn := client.NewTxn()
2908+
defer func() {
2909+
if err := txn.Discard(ctx); err != nil {
2910+
t.Logf("error discarding txn: %v", err)
2911+
}
2912+
}()
2913+
2914+
mutationJSON := fmt.Sprintf(`{
2915+
"uid": "_:newnode",
2916+
"dgraph.type": "TestNode",
2917+
"created_at": "%s"
2918+
}`, testTimeStr)
2919+
2920+
resp, err := txn.Mutate(ctx, &api.Mutation{
2921+
SetJson: []byte(mutationJSON),
2922+
})
2923+
require.NoError(t, err, "mutation should succeed")
2924+
require.NotEmpty(t, resp.Uids["newnode"], "should get a UID for the new node")
2925+
2926+
newUID := resp.Uids["newnode"]
2927+
t.Logf("Created node with UID %s and created_at=%s", newUID, testTimeStr)
2928+
2929+
// Query for the same data within the SAME uncommitted transaction
2930+
// This query uses the lossy @index(hour) on created_at
2931+
query := fmt.Sprintf(`{
2932+
q(func: eq(created_at, "%s")) {
2933+
uid
2934+
created_at
2935+
}
2936+
}`, testTimeStr)
2937+
2938+
queryResp, err := txn.Query(ctx, query)
2939+
require.NoError(t, err, "query should succeed")
2940+
2941+
var result struct {
2942+
Q []struct {
2943+
UID string `json:"uid"`
2944+
CreatedAt string `json:"created_at"`
2945+
} `json:"q"`
2946+
}
2947+
err = json.Unmarshal(queryResp.Json, &result)
2948+
require.NoError(t, err, "should be able to parse response")
2949+
2950+
t.Logf("Query response: %s", string(queryResp.Json))
2951+
2952+
require.Len(t, result.Q, 1, "should find exactly 1 node with the matching created_at")
2953+
require.Equal(t, newUID, result.Q[0].UID, "should find the node we just created")
2954+
}
2955+
28892956
func TestCountUidWithAlias(t *testing.T) {
28902957
query := `
28912958
{

worker/task.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,7 +1396,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
13961396
switch lang {
13971397
case "":
13981398
if isList {
1399-
pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
1399+
pl, err := qs.cache.Get(x.DataKey(attr, uid))
14001400
if err != nil {
14011401
filterErr = err
14021402
return false
@@ -1418,7 +1418,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
14181418
return false
14191419
}
14201420

1421-
pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
1421+
pl, err := qs.cache.Get(x.DataKey(attr, uid))
14221422
if err != nil {
14231423
filterErr = err
14241424
return false
@@ -1433,7 +1433,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
14331433
dst, err := types.Convert(sv, typ)
14341434
return err == nil && compareFunc(dst)
14351435
case ".":
1436-
pl, err := posting.GetNoStore(x.DataKey(attr, uid), arg.q.ReadTs)
1436+
pl, err := qs.cache.Get(x.DataKey(attr, uid))
14371437
if err != nil {
14381438
filterErr = err
14391439
return false
@@ -1451,17 +1451,26 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e
14511451
}
14521452
return false
14531453
default:
1454-
sv, err := fetchValue(uid, attr, arg.q.Langs, typ, arg.q.ReadTs)
1454+
pl, err := qs.cache.Get(x.DataKey(attr, uid))
1455+
if err != nil {
1456+
filterErr = err
1457+
return false
1458+
}
1459+
src, err := pl.ValueFor(arg.q.ReadTs, arg.q.Langs)
14551460
if err != nil {
14561461
if err != posting.ErrNoValue {
14571462
filterErr = err
14581463
}
14591464
return false
14601465
}
1461-
if sv.Value == nil {
1466+
dst, err := types.Convert(src, typ)
1467+
if err != nil {
1468+
return false
1469+
}
1470+
if dst.Value == nil {
14621471
return false
14631472
}
1464-
return compareFunc(sv)
1473+
return compareFunc(dst)
14651474
}
14661475
})
14671476
if filterErr != nil {

0 commit comments

Comments
 (0)