-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathwavpack.pyx
More file actions
331 lines (271 loc) · 11.3 KB
/
wavpack.pyx
File metadata and controls
331 lines (271 loc) · 11.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
# cython: embedsignature=True
# cython: profile=False
# cython: linetrace=False
# cython: binding=False
# cython: language_level=3
from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE
from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING
from .compat_ext cimport Buffer
from .compat_ext import Buffer
from .globals import get_num_encoding_threads, get_num_decoding_threads
import warnings
import numpy as np
from pathlib import Path
from packaging.version import parse
import numcodecs
from numcodecs.compat import ensure_contiguous_ndarray
from numcodecs.abc import Codec
cdef extern from "wavpack/wavpack.h":
const char* WavpackGetLibraryVersionString()
cdef extern from "src/encoder.c":
size_t WavpackEncodeFile (void *source, size_t num_samples, size_t num_chans, int level, float bps, void *destin,
size_t destin_bytes, int dtype, int dynamic_noise_shaping, float shaping_weight,
int num_threads) nogil
cdef extern from "src/decoder.c":
size_t WavpackDecodeFile (void *source, size_t source_bytes, int *num_chans, int *bytes_per_sample, void *destin,
size_t destin_bytes, int num_threads) nogil
VERSION_STRING = WavpackGetLibraryVersionString()
VERSION_STRING = str(VERSION_STRING, 'ascii')
wavpack_version = VERSION_STRING
SUPPORTS_PARALLEL = parse(wavpack_version) >= parse("5.6.4")
dtype_enum = {
"int8": 0,
"int16": 1,
"int32": 2,
"float32": 3
}
def compress(source, int level, int num_samples, int num_chans, float bps, int dtype,
int dynamic_noise_shaping, float shaping_weight, int num_encoding_threads):
"""Compress data.
Parameters
----------
source : bytes-like
Data to be compressed. Can be any object supporting the buffer
protocol.
level : int
Compression level. The larger the level, the slower the algorithm, but also
the higher the compression.
num_samples : int
Number of samples to compress.
num_chans : int
Number of channels to compress.
bps : float
Bytes per sample
dtype : int
Integer to indicat which dtype the data is ("int8": 0, "int16": 1, "int32": 2, "float32": 3)
dynamic_noise_shaping : int
Whether to use dynamic noise shaping
shaping_weight : float
The shaping factor
num_encoding_threads : int
Number of threads to use for encoding
Returns
-------
dest : bytes
Compressed data.
"""
cdef:
char *source_ptr
char *dest_ptr
char *dest_start
Buffer source_buffer
unsigned long source_size, dest_size, compressed_size
bytes dest
int level_c = level
int num_samples_c = num_samples
int num_chans_c = num_chans
float bps_c = bps
int dynamic_noise_shaping_c = dynamic_noise_shaping
float shaping_weight_c = shaping_weight
int dtype_c = dtype
int num_threads_c = num_encoding_threads
# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
source_size = source_buffer.nbytes
try:
# setup destination
dest = PyBytes_FromStringAndSize(NULL, source_size)
dest_ptr = PyBytes_AS_STRING(dest)
dest_size = source_size
compressed_size = WavpackEncodeFile(source_ptr, num_samples_c, num_chans_c, level_c, bps_c,
dest_ptr, dest_size, dtype_c, dynamic_noise_shaping_c,
shaping_weight_c, num_threads_c)
finally:
# release buffers
source_buffer.release()
# check compression was successful
if compressed_size == -1:
raise RuntimeError(f'WavPack compression error: {compressed_size}')
# resize after compression
dest = dest[:compressed_size]
return dest
def decompress(source, dest=None, num_decoding_threads=1):
"""Decompress data.
Parameters
----------
source : bytes-like
Compressed data. Can be any object supporting the buffer protocol.
dest : array-like, optional
Object to decompress into.
num_decoding_threads : int, optional
Number of threads to use for decoding, by default 8
Returns
-------
dest : bytes
Object containing decompressed data.
"""
cdef:
char *source_ptr
char *source_start
char *dest_ptr
Buffer source_buffer
Buffer dest_buffer = None
unsigned long source_size, dest_size, decompressed_samples
int num_chans
int *num_chans_ptr = &num_chans
int bytes_per_sample
int *bytes_per_sample_ptr = &bytes_per_sample
int max_bytes_per_sample = 4
int num_threads_c = num_decoding_threads
# setup source buffer
source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS)
source_ptr = source_buffer.ptr
source_size = source_buffer.nbytes
# determine number of samples, num_channels, and bytes_per_sample
n_decompressed_samples = WavpackDecodeFile(source_ptr, source_size, num_chans_ptr,
bytes_per_sample_ptr, dest_ptr, 0,
num_threads_c)
try:
# setup destination
if dest is None:
# allocate memory
dest_size = n_decompressed_samples * num_chans * bytes_per_sample
dest = PyBytes_FromStringAndSize(NULL, dest_size)
dest_ptr = PyBytes_AS_STRING(dest)
else:
arr = ensure_contiguous_ndarray(dest)
dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE)
dest_ptr = dest_buffer.ptr
dest_size = dest_buffer.nbytes
with nogil:
decompressed_samples = WavpackDecodeFile(source_ptr, source_size, num_chans_ptr,
bytes_per_sample_ptr, dest_ptr, dest_size,
num_threads_c)
finally:
# release buffers
source_buffer.release()
if dest_buffer is not None:
dest_buffer.release()
# check decompression was successful
if decompressed_samples <= 0:
raise RuntimeError(f'WavPack decompression error: {decompressed_samples}')
return dest
class WavPack(Codec):
codec_id = "wavpack"
max_block_size = 131072
supported_dtypes = ["int8", "int16", "int32", "float32"]
max_channels = 4096
max_buffer_size = 0x7E000000
def __init__(self, level=1, bps=None,
dynamic_noise_shaping=True,
shaping_weight=0.0,
num_encoding_threads=1,
num_decoding_threads=8):
"""
Numcodecs Codec implementation for WavPack (https://www.wavpack.com/) codec.
2D buffers exceeding the supported number of channels (buffer's second dimension)
and buffers > 2D are flattened before compression.
Parameters
----------
level : int, optional
The wavpack compression level (from low to high: 1, 2, 3, 4), by default 1
bps : float or None, optional
If the bps is not None or 0, the WavPack hybrid mode is used and compression is lossy.
The bps is between 2.25 and 24 (it can be a decimal, e.g. 3.5) and it
is the average number of bits used to encode each sample, by default None
dynamic_noise_shaping : bool, optional
If True, dynamic noise shaping is enabled.
Dynamic noise shaping is used in hybrid mode (when `bps` is set) and attempts to
move the noise up or down in frequency depending on the spectrum of the input, by default True
shaping_weight : float, optional
The shaping factor [-1, 1], used if `dynamic_noise_shaping` is False.
Negative values will move the noise to lower frequencies, positive ones to higher frequencies,
by default 0.0
num_encoding_threads : int, optional
The number of threads to use during encoding.
If using an external parallelization for encoding,
it is recommended to use 1, by default 1
num_decoding_threads : int, optional
The number of threads to use during decoding, by default 8
Returns
-------
Codec
The instantiated WavPack numcodecs codec
"""
self.level = int(level)
assert self.level in (1, 2, 3, 4)
if bps is not None:
if bps > 0:
self.bps = max(bps, 2.25)
else:
self.bps = 0
else:
self.bps = 0
self.dynamic_noise_shaping = dynamic_noise_shaping
self.shaping_weight = shaping_weight
if get_num_encoding_threads() is None:
assert num_encoding_threads >= 0, "num_encoding_threads must be positive!"
else:
num_encoding_threads = get_num_encoding_threads()
if get_num_decoding_threads() is None:
assert num_decoding_threads >= 0, "num_decoding_threads must be positive!"
else:
num_decoding_threads = get_num_decoding_threads()
if num_encoding_threads > 1 and not SUPPORTS_PARALLEL:
warnings.warn(
f"Multi-threading is supported for wavpack version>=5.6.4, "
f"but current version is {wavpack_version}. Parallel encoding will not be available."
)
if num_decoding_threads > 1 and not SUPPORTS_PARALLEL:
warnings.warn(
f"Multi-threading is supported for wavpack version>=5.6.4, "
f"but current version is {wavpack_version}. Parallel decoding will not be available."
)
self.num_encoding_threads = int(min(num_encoding_threads, 15))
self.num_decoding_threads = int(min(num_decoding_threads, 15))
def get_config(self):
# override to handle encoding dtypes
return dict(
id=self.codec_id,
level=self.level,
bps=float(self.bps),
dynamic_noise_shaping=self.dynamic_noise_shaping,
shaping_weight=self.shaping_weight,
num_encoding_threads=self.num_encoding_threads,
num_decoding_threads=self.num_decoding_threads
)
def _prepare_data(self, buf):
# checks
assert str(buf.dtype) in self.supported_dtypes, f"Unsupported dtype {buf.dtype}"
if buf.ndim == 1:
data = buf[:, None]
elif buf.ndim == 2:
_, nchannels = buf.shape
if nchannels > self.max_channels:
data = buf.flatten()[:, None]
else:
data = buf
else:
data = buf.flatten()[:, None]
return data
def encode(self, buf):
data = self._prepare_data(buf)
dtype = str(data.dtype)
nsamples, nchans = data.shape
dtype_id = dtype_enum[dtype]
return compress(data, self.level, nsamples, nchans, self.bps, dtype_id,
self.dynamic_noise_shaping, self.shaping_weight, self.num_encoding_threads)
def decode(self, buf, out=None):
buf = ensure_contiguous_ndarray(buf, self.max_buffer_size)
return decompress(buf, out, self.num_decoding_threads)