Skip to content

Commit da18664

Browse files
feat(bulk): add error logging to bulk loader (#9601)
**Description** This PR adds error logging to the bulk loader. Prior to these changes, error handling in bulk loading allowed for only ignoring errors or stopping the load upon any error. These changes allow for errors to be logged to a file so that the entire bulk loading operation does not have to be an "all or nothing" exercise -- important for loads that can take hours to complete. The new flags look like this: ``` dgraph bulk .... --ignore_errors --log_errors --error_log=/tmp/bulktest/errors.log ``` The existing --ignore_errors flag is required. The location of the error log can be specfied, the default is `bulk_errors.log`. **Checklist** - [x] The PR title follows the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/#summary) syntax, leading with `fix:`, `feat:`, `chore:`, `ci:`, etc. - [x] Code compiles correctly and linting (via trunk) passes locally - [ ] Tests added for new functionality, or regression tests for bug fixes added as applicable - [ ] For public APIs, new features, etc., a PR on the [docs repo](https://github.com/dgraph-io/dgraph-docs) staged and linked here. This process can be simplified by going to the [public docs site](https://docs.dgraph.io/) and clicking the "Edit this page" button at the bottom of page(s) relevant to your changes. Ensure that you indicate in the PR that this is an **unreleased** feature so that it does not get merged into the main docs prematurely.
1 parent 837bc14 commit da18664

File tree

3 files changed

+76
-6
lines changed

3 files changed

+76
-6
lines changed

dgraph/cmd/bulk/loader.go

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type BulkOptions struct {
5959
ConnStr string
6060
HttpAddr string
6161
IgnoreErrors bool
62+
LogErrors bool
63+
ErrorLogPath string
6264
CustomTokenizers string
6365
NewUids bool
6466
ClientDir string
@@ -79,18 +81,59 @@ type BulkOptions struct {
7981
Badger badger.Options
8082
}
8183

84+
// chunkWithMeta wraps a chunk buffer with metadata about the source file.
85+
type chunkWithMeta struct {
86+
buf *bytes.Buffer
87+
filename string
88+
}
89+
8290
type state struct {
8391
opt *BulkOptions
8492
prog *progress
8593
xids *xidmap.XidMap
8694
schema *schemaStore
8795
shards *shardMap
88-
readerChunkCh chan *bytes.Buffer
96+
readerChunkCh chan *chunkWithMeta
8997
mapFileId uint32 // Used atomically to name the output files of the mappers.
9098
dbs []*badger.DB
9199
tmpDbs []*badger.DB // Temporary DB to write the split lists to avoid ordering issues.
92100
writeTs uint64 // All badger writes use this timestamp
93101
namespaces *sync.Map // To store the encountered namespaces.
102+
errorLog *errorLogger // Error logger for --log_errors
103+
}
104+
105+
// errorLogger provides thread-safe logging of parsing errors to a file.
106+
type errorLogger struct {
107+
mu sync.Mutex
108+
file *os.File
109+
}
110+
111+
func newErrorLogger(path string) (*errorLogger, error) {
112+
f, err := os.OpenFile(path, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
113+
if err != nil {
114+
return nil, err
115+
}
116+
return &errorLogger{file: f}, nil
117+
}
118+
119+
func (e *errorLogger) Log(filename string, err error, extra string) {
120+
if e == nil || e.file == nil {
121+
return
122+
}
123+
e.mu.Lock()
124+
defer e.mu.Unlock()
125+
// Truncate very long extra info in the log
126+
if len(extra) > 500 {
127+
extra = extra[:500] + "..."
128+
}
129+
fmt.Fprintf(e.file, "file: %s\nerror: %v\n%s\n", filename, err, extra)
130+
}
131+
132+
func (e *errorLogger) Close() error {
133+
if e == nil || e.file == nil {
134+
return nil
135+
}
136+
return e.file.Close()
94137
}
95138

96139
type loader struct {
@@ -130,14 +173,26 @@ func newLoader(opt *BulkOptions) *loader {
130173
x.Checkf(err, "Unable to connect to alpha, Is it running at %s?", opt.ConnStr)
131174
}
132175

176+
var errLog *errorLogger
177+
if opt.LogErrors {
178+
if !opt.IgnoreErrors {
179+
fmt.Fprintln(os.Stderr, "Warning: --log_errors requires --ignore_errors to be set")
180+
}
181+
var err error
182+
errLog, err = newErrorLogger(opt.ErrorLogPath)
183+
x.Checkf(err, "Unable to create error log file at %s", opt.ErrorLogPath)
184+
fmt.Printf("Error logging enabled, writing to: %s\n", opt.ErrorLogPath)
185+
}
186+
133187
st := &state{
134188
opt: opt,
135189
prog: newProgress(),
136190
shards: newShardMap(opt.MapShards),
137191
// Lots of gz readers, so not much channel buffer needed.
138-
readerChunkCh: make(chan *bytes.Buffer, opt.NumGoroutines),
192+
readerChunkCh: make(chan *chunkWithMeta, opt.NumGoroutines),
139193
writeTs: getWriteTimestamp(zero, dg),
140194
namespaces: &sync.Map{},
195+
errorLog: errLog,
141196
}
142197
st.schema = newSchemaStore(readSchema(opt), opt, st)
143198
ld := &loader{
@@ -348,7 +403,7 @@ func (ld *loader) mapStage() {
348403
for {
349404
chunkBuf, err := chunk.Chunk(r)
350405
if chunkBuf != nil && chunkBuf.Len() > 0 {
351-
ld.readerChunkCh <- chunkBuf
406+
ld.readerChunkCh <- &chunkWithMeta{buf: chunkBuf, filename: file}
352407
}
353408
if err == io.EOF {
354409
break
@@ -453,7 +508,7 @@ func (ld *loader) processGqlSchema(loadType chunker.InputFormat) {
453508
_, err := fmt.Fprintf(gqlBuf, jsonSchema, ns, schema)
454509
x.Check(err)
455510
}
456-
ld.readerChunkCh <- gqlBuf
511+
ld.readerChunkCh <- &chunkWithMeta{buf: gqlBuf, filename: "<gql_schema>"}
457512
}
458513

459514
buf := readGqlSchema(ld.opt)
@@ -531,5 +586,10 @@ func (ld *loader) cleanup() {
531586
x.Check(db.Close())
532587
x.Check(os.RemoveAll(opts.Dir))
533588
}
589+
if ld.errorLog != nil {
590+
if err := ld.errorLog.Close(); err != nil {
591+
glog.Warningf("error closing error log: %v", err)
592+
}
593+
}
534594
ld.prog.endSummary()
535595
}

dgraph/cmd/bulk/mapper.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,9 +221,12 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {
221221
chunk := chunker.NewChunker(inputFormat, 1000)
222222
nquads := chunk.NQuads()
223223
go func() {
224-
for chunkBuf := range m.readerChunkCh {
225-
if err := chunk.Parse(chunkBuf); err != nil {
224+
for chunkMeta := range m.readerChunkCh {
225+
if err := chunk.Parse(chunkMeta.buf); err != nil {
226226
atomic.AddInt64(&m.prog.errCount, 1)
227+
if m.errorLog != nil {
228+
m.errorLog.Log(chunkMeta.filename, err, "")
229+
}
227230
if !m.opt.IgnoreErrors {
228231
x.Check(err)
229232
}
@@ -250,6 +253,9 @@ func (m *mapper) run(inputFormat chunker.InputFormat) {
250253
for _, nq := range nqs {
251254
if err := facets.SortAndValidate(nq.Facets); err != nil {
252255
atomic.AddInt64(&m.prog.errCount, 1)
256+
if m.errorLog != nil {
257+
m.errorLog.Log("<facet_validation>", err, fmt.Sprintf("subject=%s predicate=%s", nq.Subject, nq.Predicate))
258+
}
253259
if !m.opt.IgnoreErrors {
254260
x.Check(err)
255261
}

dgraph/cmd/bulk/run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ func init() {
9191
// TODO: Potentially move http server to main.
9292
flag.String("http", "localhost:8080", "Address to serve http (pprof).")
9393
flag.Bool("ignore_errors", false, "ignore line parsing errors in rdf files")
94+
flag.Bool("log_errors", false, "log parsing errors to a file (requires --ignore_errors)")
95+
flag.String("error_log", "bulk_errors.log", "path to error log file when --log_errors is set")
9496
flag.Int("map_shards", 1,
9597
"Number of map output shards. Must be greater than or equal to the number of reduce "+
9698
"shards. Increasing allows more evenly sized reduce shards, at the expense of "+
@@ -155,6 +157,8 @@ func run() {
155157
ZeroAddr: Bulk.Conf.GetString("zero"),
156158
HttpAddr: Bulk.Conf.GetString("http"),
157159
IgnoreErrors: Bulk.Conf.GetBool("ignore_errors"),
160+
LogErrors: Bulk.Conf.GetBool("log_errors"),
161+
ErrorLogPath: Bulk.Conf.GetString("error_log"),
158162
MapShards: Bulk.Conf.GetInt("map_shards"),
159163
ReduceShards: Bulk.Conf.GetInt("reduce_shards"),
160164
CustomTokenizers: Bulk.Conf.GetString("custom_tokenizers"),

0 commit comments

Comments
 (0)