Skip to content

Commit cec95ef

Browse files
added comments
1 parent 9c177fd commit cec95ef

3 files changed

Lines changed: 44 additions & 10 deletions

File tree

posting/index.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,6 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
148148
}
149149
}
150150

151-
wg := &sync.WaitGroup{}
152-
153151
strings := make([]string, 0, len(values))
154152
for i := range values {
155153
strings = append(strings, i)
@@ -158,6 +156,8 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
158156
//fmt.Println("START")
159157

160158
f := func(numGo int) *types.LockedShardedMap[string, *pb.PostingList] {
159+
wg := &sync.WaitGroup{}
160+
161161
globalMap := types.NewLockedShardedMap[string, *pb.PostingList]()
162162
process := func(start int) {
163163
defer wg.Done()
@@ -220,7 +220,10 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
220220
return globalMap
221221
}
222222

223-
globalMapI := f(1)
223+
globalMapI := f(100)
224+
mp.txn.cache.Lock()
225+
defer mp.txn.cache.Unlock()
226+
mp.txn.cache.globalMap[pipeline.attr] = globalMapI
224227
// parallelGlobalMap := f(100)
225228

226229
// parallelGlobalMap.ParallelIterate(func (key string, val *pb.PostingList) error {
@@ -240,13 +243,13 @@ func (mp *MutationPipeline) InsertTokenizerIndexes(ctx context.Context, pipeline
240243
// return nil
241244
// })
242245

243-
globalMapI.ParallelIterate(func(key string, val *pb.PostingList) error {
244-
if _, err := mp.txn.AddDelta(key, *val); err != nil {
245-
pipeline.errCh <- err
246-
return err
247-
}
248-
return nil
249-
})
246+
// globalMapI.ParallelIterate(func(key string, val *pb.PostingList) error {
247+
// if _, err := mp.txn.AddDelta(key, *val); err != nil {
248+
// pipeline.errCh <- err
249+
// return err
250+
// }
251+
// return nil
252+
// })
250253
}
251254

252255
func (mp *MutationPipeline) ProcessList(ctx context.Context, pipeline *PredicatePipeline, index bool, reverse bool, count bool) {

posting/lists.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ type LocalCache struct {
7575
// during commit.
7676
deltas *types.LockedShardedMap[string, []byte]
7777

78+
globalMap map[string]*types.LockedShardedMap[string, *pb.PostingList]
79+
7880
// max committed timestamp of the read posting lists.
7981
maxVersions map[string]uint64
8082

posting/mvcc.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,35 @@ func (txn *Txn) CommitToDisk(writer *TxnWriter, commitTs uint64) error {
292292
}
293293
}()
294294

295+
for _, i := range cache.globalMap {
296+
i.Iterate(func(key string, data *pb.PostingList) error {
297+
return writer.update(commitTs, func(btxn *badger.Txn) error {
298+
if len(data.Postings) == 0{
299+
return nil
300+
}
301+
dataBytes, err := proto.Marshal(data)
302+
if err != nil {
303+
return err
304+
}
305+
if ts := cache.maxVersions[key]; ts >= commitTs {
306+
// Skip write because we already have a write at a higher ts.
307+
// Logging here can cause a lot of output when doing Raft log replay. So, let's
308+
// not output anything here.
309+
return nil
310+
}
311+
err = btxn.SetEntry(&badger.Entry{
312+
Key: []byte(key),
313+
Value: dataBytes,
314+
UserMeta: BitDeltaPosting,
315+
})
316+
if err != nil {
317+
return err
318+
}
319+
return nil
320+
})
321+
})
322+
}
323+
295324
var idx int
296325
for idx < len(keys) {
297326
// writer.update can return early from the loop in case we encounter badger.ErrTxnTooBig. On

0 commit comments

Comments
 (0)