From abd39f7aca669ea0a99e92a768875f5ee99d6771 Mon Sep 17 00:00:00 2001 From: del42 Date: Thu, 10 Oct 2024 16:13:33 -0400 Subject: [PATCH 1/3] zarr format updated --- waveform_benchmark/formats/zarr.py | 55 +++++++++++++++++++++++++----- 1 file changed, 47 insertions(+), 8 deletions(-) diff --git a/waveform_benchmark/formats/zarr.py b/waveform_benchmark/formats/zarr.py index c221600..1f46fa9 100644 --- a/waveform_benchmark/formats/zarr.py +++ b/waveform_benchmark/formats/zarr.py @@ -6,7 +6,7 @@ class Zarr(BaseFormat): """ - Example format using Zarr + Example format using Zarr with 16-bit integer waveforms. """ def write_waveforms(self, path, waveforms): # Initialize Zarr group @@ -14,8 +14,8 @@ def write_waveforms(self, path, waveforms): for name, waveform in waveforms.items(): length = waveform['chunks'][-1]['end_sample'] - samples = np.empty(length, dtype=np.float32) - samples[:] = np.nan + samples = np.empty(length, dtype=np.int16) + samples[:] = 0 for chunk in waveform['chunks']: start = chunk['start_sample'] @@ -23,7 +23,7 @@ def write_waveforms(self, path, waveforms): samples[start:end] = chunk['samples'] # Create a dataset for each waveform within the root group. - ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.float32) + ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16) ds.attrs['units'] = waveform['units'] ds.attrs['samples_per_second'] = waveform['samples_per_second'] @@ -43,10 +43,48 @@ def read_waveforms(self, path, start_time, end_time, signal_names): results[signal_name] = ds[start_sample:end_sample] return results + + def open_waveforms(self, path: str, signal_names: list, **kwargs): + """ + Open Zarr waveforms. + """ + output = {} + root_group = zarr.open_group(path, mode='r') + for signal_name in signal_names: + output[signal_name] = root_group[signal_name] + return output + + def read_opened_waveforms(self, opened_files: dict, start_time: float, end_time: float, + signal_names: list): + """ + Read the already opened Zarr waveforms between `start_time` and `end_time`. + """ + results = {} + for signal_name in signal_names: + ds = opened_files[signal_name] + + # Extract the sampling rate from the attributes of the Zarr dataset + fs = ds.attrs['samples_per_second'] + + start_sample = round(start_time * fs) + end_sample = round(end_time * fs) + + # Random access the Zarr array + samples = ds[start_sample:end_sample] + + results[signal_name] = samples + + return results + + def close_waveforms(self, opened_files: dict): + """ + Clear references to the opened Zarr files. + """ + opened_files.clear() class Zarr_compressed(BaseFormat): """ - Example format using Zarr with compression. + Example format using Zarr with compression and 16-bit integer waveforms. """ def write_waveforms(self, path, waveforms): @@ -55,7 +93,7 @@ def write_waveforms(self, path, waveforms): for name, waveform in waveforms.items(): length = waveform['chunks'][-1]['end_sample'] - samples = np.empty(length, dtype=np.float32) + samples = np.empty(length, dtype=np.int16) samples[:] = np.nan for chunk in waveform['chunks']: @@ -64,7 +102,8 @@ def write_waveforms(self, path, waveforms): samples[start:end] = chunk['samples'] # each waveform within the root group with compression. - ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.float32, compressor=zarr.Blosc(cname='zstd', clevel=9, shuffle=zarr.Blosc.BITSHUFFLE)) + ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16, + compressor=zarr.Blosc(cname='zstd', clevel=9, shuffle=zarr.Blosc.BITSHUFFLE)) ds.attrs['units'] = waveform['units'] ds.attrs['samples_per_second'] = waveform['samples_per_second'] @@ -80,7 +119,7 @@ def read_waveforms(self, path, start_time, end_time, signal_names): start_sample = round(start_time * samples_per_second) end_sample = round(end_time * samples_per_second) - + # Random access the Zarr array results[signal_name] = ds[start_sample:end_sample] From b7ab475278f009a4659a6ee5d0b77db093075daa Mon Sep 17 00:00:00 2001 From: del42 Date: Thu, 17 Oct 2024 12:58:43 -0400 Subject: [PATCH 2/3] Gain handled --- waveform_benchmark/formats/zarr.py | 74 ++++++++++++++---------------- 1 file changed, 35 insertions(+), 39 deletions(-) diff --git a/waveform_benchmark/formats/zarr.py b/waveform_benchmark/formats/zarr.py index 1f46fa9..ca339a1 100644 --- a/waveform_benchmark/formats/zarr.py +++ b/waveform_benchmark/formats/zarr.py @@ -11,21 +11,28 @@ class Zarr(BaseFormat): def write_waveforms(self, path, waveforms): # Initialize Zarr group root_group = zarr.open_group(path, mode='w') + nanval = -32768 # Sentinel value for NaN for name, waveform in waveforms.items(): length = waveform['chunks'][-1]['end_sample'] samples = np.empty(length, dtype=np.int16) - samples[:] = 0 + samples[:] = nanval + + max_gain = max(chunk['gain'] for chunk in waveform['chunks']) # Get max gain from the chunks for chunk in waveform['chunks']: start = chunk['start_sample'] end = chunk['end_sample'] - samples[start:end] = chunk['samples'] + # Replace NaN values in the chunk with sentinel value + cursamples = np.where(np.isnan(chunk['samples']), nanval, np.round(chunk['samples'] * chunk['gain']).astype(np.int16)) + samples[start:end] = cursamples # Create a dataset for each waveform within the root group. - ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16) + ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16) ds.attrs['units'] = waveform['units'] ds.attrs['samples_per_second'] = waveform['samples_per_second'] + ds.attrs['nanvalue'] = nanval # Store the sentinel value for NaN + ds.attrs['gain'] = max_gain # Store the gain def read_waveforms(self, path, start_time, end_time, signal_names): # Open the Zarr group @@ -35,12 +42,20 @@ def read_waveforms(self, path, start_time, end_time, signal_names): for signal_name in signal_names: ds = root_group[signal_name] samples_per_second = ds.attrs['samples_per_second'] + nanval = ds.attrs['nanvalue'] # Retrieve the sentinel value for NaN + gain = ds.attrs['gain'] # Retrieve the gain start_sample = round(start_time * samples_per_second) end_sample = round(end_time * samples_per_second) # Random access the Zarr array - results[signal_name] = ds[start_sample:end_sample] + sig_data = ds[start_sample:end_sample] + naninds = (sig_data == nanval) + sig_data = sig_data.astype(np.float32) + sig_data = sig_data / gain + sig_data[naninds] = np.nan + + results[signal_name] = sig_data return results @@ -54,33 +69,6 @@ def open_waveforms(self, path: str, signal_names: list, **kwargs): output[signal_name] = root_group[signal_name] return output - def read_opened_waveforms(self, opened_files: dict, start_time: float, end_time: float, - signal_names: list): - """ - Read the already opened Zarr waveforms between `start_time` and `end_time`. - """ - results = {} - for signal_name in signal_names: - ds = opened_files[signal_name] - - # Extract the sampling rate from the attributes of the Zarr dataset - fs = ds.attrs['samples_per_second'] - - start_sample = round(start_time * fs) - end_sample = round(end_time * fs) - - # Random access the Zarr array - samples = ds[start_sample:end_sample] - - results[signal_name] = samples - - return results - - def close_waveforms(self, opened_files: dict): - """ - Clear references to the opened Zarr files. - """ - opened_files.clear() class Zarr_compressed(BaseFormat): """ @@ -90,23 +78,25 @@ class Zarr_compressed(BaseFormat): def write_waveforms(self, path, waveforms): # Initialize Zarr group root_group = zarr.open_group(path, mode='w') + nanval = -32768 # Sentinel value for NaN for name, waveform in waveforms.items(): length = waveform['chunks'][-1]['end_sample'] samples = np.empty(length, dtype=np.int16) - samples[:] = np.nan + samples[:] = nanval for chunk in waveform['chunks']: start = chunk['start_sample'] end = chunk['end_sample'] - samples[start:end] = chunk['samples'] + cursamples = np.where(np.isnan(chunk['samples']), nanval, chunk['samples']) + samples[start:end] = cursamples - # each waveform within the root group with compression. - ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16, - compressor=zarr.Blosc(cname='zstd', clevel=9, shuffle=zarr.Blosc.BITSHUFFLE)) + # each wavefrom is stored as a dataset within the root group + ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16, + compressor=zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.BITSHUFFLE)) ds.attrs['units'] = waveform['units'] ds.attrs['samples_per_second'] = waveform['samples_per_second'] - + ds.attrs['nanvalue'] = nanval def read_waveforms(self, path, start_time, end_time, signal_names): # Open the Zarr group @@ -116,11 +106,17 @@ def read_waveforms(self, path, start_time, end_time, signal_names): for signal_name in signal_names: ds = root_group[signal_name] samples_per_second = ds.attrs['samples_per_second'] - + nanval = ds.attrs['nanvalue'] # Retrieve the sentinel value + start_sample = round(start_time * samples_per_second) end_sample = round(end_time * samples_per_second) # Random access the Zarr array - results[signal_name] = ds[start_sample:end_sample] + sig_data = ds[start_sample:end_sample] + naninds = (sig_data == nanval) + sig_data = sig_data.astype(float) + sig_data[naninds] = np.nan + + results[signal_name] = sig_data return results \ No newline at end of file From e52b458f15f6f06f53dfe3d68e3a795bc3f83e6c Mon Sep 17 00:00:00 2001 From: del42 Date: Fri, 18 Oct 2024 09:21:24 -0400 Subject: [PATCH 3/3] Compressed version handled with gain --- waveform_benchmark/formats/zarr.py | 79 ++++++------------------------ 1 file changed, 15 insertions(+), 64 deletions(-) diff --git a/waveform_benchmark/formats/zarr.py b/waveform_benchmark/formats/zarr.py index ca339a1..0b27114 100644 --- a/waveform_benchmark/formats/zarr.py +++ b/waveform_benchmark/formats/zarr.py @@ -1,9 +1,7 @@ import numpy as np import zarr - from waveform_benchmark.formats.base import BaseFormat - class Zarr(BaseFormat): """ Example format using Zarr with 16-bit integer waveforms. @@ -27,8 +25,17 @@ def write_waveforms(self, path, waveforms): cursamples = np.where(np.isnan(chunk['samples']), nanval, np.round(chunk['samples'] * chunk['gain']).astype(np.int16)) samples[start:end] = cursamples - # Create a dataset for each waveform within the root group. - ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16) + if self.fmt == 'Compressed': + ds = root_group.create_dataset( + name, + data=samples, + chunks=True, + dtype=np.int16, + compressor=zarr.Blosc(cname='zstd', clevel=3, shuffle=zarr.Blosc.BITSHUFFLE) + ) + else: + ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16) + ds.attrs['units'] = waveform['units'] ds.attrs['samples_per_second'] = waveform['samples_per_second'] ds.attrs['nanvalue'] = nanval # Store the sentinel value for NaN @@ -59,64 +66,8 @@ def read_waveforms(self, path, start_time, end_time, signal_names): return results - def open_waveforms(self, path: str, signal_names: list, **kwargs): - """ - Open Zarr waveforms. - """ - output = {} - root_group = zarr.open_group(path, mode='r') - for signal_name in signal_names: - output[signal_name] = root_group[signal_name] - return output - - -class Zarr_compressed(BaseFormat): - """ - Example format using Zarr with compression and 16-bit integer waveforms. - """ - - def write_waveforms(self, path, waveforms): - # Initialize Zarr group - root_group = zarr.open_group(path, mode='w') - nanval = -32768 # Sentinel value for NaN - - for name, waveform in waveforms.items(): - length = waveform['chunks'][-1]['end_sample'] - samples = np.empty(length, dtype=np.int16) - samples[:] = nanval - - for chunk in waveform['chunks']: - start = chunk['start_sample'] - end = chunk['end_sample'] - cursamples = np.where(np.isnan(chunk['samples']), nanval, chunk['samples']) - samples[start:end] = cursamples - - # each wavefrom is stored as a dataset within the root group - ds = root_group.create_dataset(name, data=samples, chunks=True, dtype=np.int16, - compressor=zarr.Blosc(cname='zstd', clevel=1, shuffle=zarr.Blosc.BITSHUFFLE)) - ds.attrs['units'] = waveform['units'] - ds.attrs['samples_per_second'] = waveform['samples_per_second'] - ds.attrs['nanvalue'] = nanval - - def read_waveforms(self, path, start_time, end_time, signal_names): - # Open the Zarr group - root_group = zarr.open_group(path, mode='r') - - results = {} - for signal_name in signal_names: - ds = root_group[signal_name] - samples_per_second = ds.attrs['samples_per_second'] - nanval = ds.attrs['nanvalue'] # Retrieve the sentinel value - - start_sample = round(start_time * samples_per_second) - end_sample = round(end_time * samples_per_second) - - # Random access the Zarr array - sig_data = ds[start_sample:end_sample] - naninds = (sig_data == nanval) - sig_data = sig_data.astype(float) - sig_data[naninds] = np.nan - - results[signal_name] = sig_data +class Zarr_Compressed(Zarr): + fmt = 'Compressed' - return results \ No newline at end of file +class Zarr_Uncompressed(Zarr): + fmt = 'Uncompressed' \ No newline at end of file