Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 71 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ Simple streaming JSON parser and encoder.
When [reading](#reading) JSON data, `json-stream` can decode JSON data in
a streaming manner, providing a pythonic dict/list-like interface, or a
[visitor-based interface](#visitor). It can stream from files, [URLs](#urls)
or [iterators](#iterators). It can process [multiple JSON documents](#multiple) in a single stream.
or [iterators](#iterators). It can process [multiple JSON documents](#multiple)
in a single stream, and can read JSON [mixed with other non-JSON data](#reading-mixed-data).

When [writing](#writing) JSON data, `json-stream` can stream JSON objects
as you generate them.
Expand Down Expand Up @@ -495,13 +496,81 @@ significant parsing speedup compared to pure python implementation.
`json-stream` will fallback to its pure python tokenizer implementation
if `json-stream-rs-tokenizer` is not available.

#### <a id="reading-mixed-data"></a> Reading mixed data

When using the Rust tokenizer, you can also use `json-stream` to parse mixed
data, for example a file with a JSON followed by binary data.

To do this, you should pass `correct_cursor=True` to `load()`. The ensures the
rust tokenizer keeps track of the exact stream position it has read up to. This
comes with a **significant performance cost** for un-seekable streams.

After reading the JSON data, call `read_all()` on the top-level object returned
by `load()` to ensure you have read up to the end of the JSON data, and then call
`.tokenizer.park_cursor()` to "park" the underlying file cursor at the correct
position.
Comment thread
daggaz marked this conversation as resolved.

```python
import json_stream

with open('test.bin', 'rb') as f:
# read JSON header
header = json_stream.load(f, correct_cursor=True)
# ... process JSON header ...
header.read_all()

# ensure the tokenizer has "parked" the file
# cursor at the end of the JSON data
header.tokenizer.park_cursor()

# now we can read binary data from the same file
binary_start = f.tell()
data = f.read()

#### <a id="mixed-scenarios"></a> Other mixed data scenarios

`json-stream` can also handle streams that start with binary data, or have binary
data between multiple JSON documents.

##### Binary then JSON

You can simply read the binary data from the file before calling `load()`.

```python
with open('test.bin', 'rb') as f:
binary_data = f.read(1024)
data = json_stream.load(f)
# ... process JSON ...
```

##### JSON then binary then JSON

You must use `correct_cursor=True` for any JSON document that is followed by
binary data.

```python
with open('test.bin', 'rb') as f:
# 1. Read first JSON
data1 = json_stream.load(f, correct_cursor=True)
# ... process data1 ...
data1.read_all()
data1.tokenizer.park_cursor()

# 2. Read binary data
binary_data = f.read(1024)

# 3. Read second JSON
data2 = json_stream.load(f)
# ... process data2 ...
```

### Custom tokenizer

You can supply an alternative JSON tokenizer implementation. Simply pass
a tokenizer to the `load()` or `visit()` methods.

```python
json_stream.load(f, tokenizer=some_tokenizer)
json_stream.load(f, tokenizer=some_tokenizer, **tokenizer_kwargs)
```

The requests methods also accept a customer tokenizer parameter.
Expand Down
4 changes: 4 additions & 0 deletions src/json_stream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ def __init__(self, token_stream):
self._stream = token_stream
self._child: Optional[StreamingJSONBase] = None

@property
def tokenizer(self):
return self._stream

def _clear_child(self):
if self._child is not None:
self._child.read_all()
Expand Down
8 changes: 4 additions & 4 deletions src/json_stream/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
from json_stream.select_tokenizer import default_tokenizer


def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer):
return next(load_many(fp_or_iterable, persistent, tokenizer))
def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer, **tokenizer_kwargs):
return next(load_many(fp_or_iterable, persistent, tokenizer, **tokenizer_kwargs))


def load_many(fp_or_iterable, persistent=False, tokenizer=default_tokenizer):
def load_many(fp_or_iterable, persistent=False, tokenizer=default_tokenizer, **tokenizer_kwargs):
fp = ensure_file(fp_or_iterable)
token_stream = tokenizer(fp)
token_stream = tokenizer(fp, **tokenizer_kwargs)
for token_type, token in token_stream:
if token_type == TokenType.OPERATOR:
data = StreamingJSONBase.factory(token, token_stream, persistent)
Expand Down
97 changes: 97 additions & 0 deletions src/json_stream/tests/test_binary_resumption.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import io
import json
from unittest import TestCase, skipUnless
import json_stream

try:
import json_stream_rs_tokenizer
HAS_RS_TOKENIZER = hasattr(json_stream_rs_tokenizer, 'RustTokenizer')
except ImportError:
HAS_RS_TOKENIZER = False

class TestBinaryResumption(TestCase):
@skipUnless(HAS_RS_TOKENIZER, 'Rust tokenizer not available')
def test_json_then_binary(self):
json_header = json.dumps({"header": "info"})
binary_data = b'\x00\x01\x02\x03'
test_data = json_header.encode('utf-8') + binary_data
test_file = io.BytesIO(test_data)

# Load with correct_cursor=True
header = json_stream.load(test_file, correct_cursor=True)

# Consume all data from header
header.read_all()

# Signal that we are done with JSON and want to resume binary read
header.tokenizer.park_cursor()

# Verify file cursor position
self.assertEqual(test_file.tell(), len(json_header))

# Verify binary data
remaining = test_file.read()
self.assertEqual(remaining, binary_data)

@skipUnless(HAS_RS_TOKENIZER, 'Rust tokenizer not available')
def test_binary_then_json(self):
binary_data = b'binary_start'
json_data = b'{"a": 1}'
test_data = binary_data + json_data
test_file = io.BytesIO(test_data)

# Read binary
read_binary = test_file.read(len(binary_data))
self.assertEqual(read_binary, binary_data)

# Load JSON
data = json_stream.load(test_file)
self.assertEqual(dict(data.items()), {"a": 1})

@skipUnless(HAS_RS_TOKENIZER, 'Rust tokenizer not available')
def test_json_then_binary_then_json(self):
json_1 = b'{"first": true}'
binary_middle = b'middle_binary'
json_2 = b'{"second": false}'
test_data = json_1 + binary_middle + json_2
test_file = io.BytesIO(test_data)

# Load first JSON
data1 = json_stream.load(test_file, correct_cursor=True)
self.assertEqual(dict(data1.items()), {"first": True})
data1.read_all()
data1.tokenizer.park_cursor()
self.assertEqual(test_file.tell(), len(json_1))

# Read middle binary
read_middle = test_file.read(len(binary_middle))
self.assertEqual(read_middle, binary_middle)

# Load second JSON
data2 = json_stream.load(test_file)
self.assertEqual(dict(data2.items()), {"second": False})

@skipUnless(HAS_RS_TOKENIZER, 'Rust tokenizer not available')
def test_load_many_then_binary(self):
json_1 = '{"a": 1}'
json_2 = '{"b": 2}'
binary_data = b'binary'
test_data = json_1.encode('utf-8') + json_2.encode('utf-8') + binary_data

test_file = io.BytesIO(test_data)

loader = json_stream.load_many(test_file, correct_cursor=True)

# Read first JSON
doc1 = next(loader)
doc1.read_all()

# Read second JSON
doc2 = next(loader)
doc2.read_all()

# Now park cursor
doc2.tokenizer.park_cursor()

self.assertEqual(test_file.tell(), len(json_1) + len(json_2))
self.assertEqual(test_file.read(), binary_data)
8 changes: 4 additions & 4 deletions src/json_stream/visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ def _visit(obj, visitor, path):
visitor(obj, path)


def visit_many(fp_or_iterator, visitor, tokenizer=default_tokenizer):
def visit_many(fp_or_iterator, visitor, tokenizer=default_tokenizer, **tokenizer_kwargs):
fp = ensure_file(fp_or_iterator)
token_stream = tokenizer(fp)
token_stream = tokenizer(fp, **tokenizer_kwargs)
for token_type, token in token_stream:
if token_type == TokenType.OPERATOR:
obj = StreamingJSONBase.factory(token, token_stream, persistent=False)
Expand All @@ -33,5 +33,5 @@ def visit_many(fp_or_iterator, visitor, tokenizer=default_tokenizer):
yield


def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer):
next(visit_many(fp_or_iterator, visitor, tokenizer))
def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer, **tokenizer_kwargs):
next(visit_many(fp_or_iterator, visitor, tokenizer, **tokenizer_kwargs))
Loading