Skip to content

Commit e38a4bf

Browse files
authored
fix(core): add map locking and ensure resource cleanup (#9483)
**Description** This PR addresses several stability issues: - Added read/write locks for concurrent access to maps to prevent race conditions. - Ensured transactions created via `NewTransactionAt` are explicitly discarded to free `readTs` usage. - Ensured file handles are explicitly closed after use to prevent file descriptor leaks. These changes improve thread safety, avoid stale transactions blocking version GC, and prevent potential resource exhaustion. **Checklist** - [x] Code compiles correctly and linting passes locally - [ ] For all _code_ changes, an entry added to the `CHANGELOG.md` file describing and linking to this PR - [ ] Tests added for new functionality, or regression tests for bug fixes added as applicable
1 parent d9a9da5 commit e38a4bf

4 files changed

Lines changed: 7 additions & 1 deletion

File tree

dgraph/cmd/root.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ func initCmds() {
150150
if err != nil {
151151
x.Fatalf("unable to open config file for reading: %v", err)
152152
}
153+
defer cfgFile.Close()
153154
cfgData, err := io.ReadAll(cfgFile)
154155
if err != nil {
155156
x.Fatalf("unable to read config file: %v", err)

posting/list.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2242,6 +2242,7 @@ func (l *List) readListPart(startUid uint64) (*pb.PostingList, error) {
22422242
hex.EncodeToString(l.key), startUid)
22432243
}
22442244
txn := pstore.NewTransactionAt(l.minTs, false)
2245+
defer txn.Discard()
22452246
item, err := txn.Get(key)
22462247
if err != nil {
22472248
return nil, errors.Wrapf(err, "could not read list part with key %s",

posting/mvcc.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,9 @@ func NewCachePL() *CachePL {
554554
}
555555

556556
func checkForRollup(key []byte, l *List) {
557+
l.RLock()
557558
deltaCount := l.mutationMap.len()
559+
l.RUnlock()
558560
// If deltaCount is high, send it to high priority channel instead.
559561
if deltaCount > 500 {
560562
IncrRollup.addKeyToBatch(key, 0)

schema/schema.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ func (s *state) Delete(attr string, ts uint64) error {
9595

9696
glog.Infof("Deleting schema for predicate: [%s]", attr)
9797
txn := pstore.NewTransactionAt(ts, true)
98+
defer txn.Discard()
9899
if err := txn.Delete(x.SchemaKey(attr)); err != nil {
99100
return err
100101
}
@@ -119,6 +120,7 @@ func (s *state) DeleteType(typeName string, ts uint64) error {
119120

120121
glog.Infof("Deleting type definition for type: [%s]", typeName)
121122
txn := pstore.NewTransactionAt(ts, true)
123+
defer txn.Discard()
122124
if err := txn.Delete(x.TypeKey(typeName)); err != nil {
123125
return err
124126
}
@@ -530,7 +532,7 @@ func Load(predicate string) error {
530532
if len(predicate) == 0 {
531533
return errors.Errorf("Empty predicate")
532534
}
533-
delete(State().mutSchema, predicate)
535+
State().DeleteMutSchema(predicate)
534536
key := x.SchemaKey(predicate)
535537
txn := pstore.NewTransactionAt(math.MaxUint64, false)
536538
defer txn.Discard()

0 commit comments

Comments
 (0)