Skip to content

Commit

Permalink
Parquet refs: nan or missing references (#1738)
Browse files Browse the repository at this point in the history
  • Loading branch information
martindurant authored Oct 23, 2024
1 parent 4517882 commit 199ee82
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
12 changes: 9 additions & 3 deletions fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def _first(d):

def _prot_in_references(path, references):
ref = references.get(path)
if isinstance(ref, (list, tuple)):
if isinstance(ref, (list, tuple)) and isinstance(ref[0], str):
return split_protocol(ref[0])[0] if ref[0] else ref[0]


Expand Down Expand Up @@ -173,8 +173,11 @@ def open_refs(field, record):
"""cached parquet file loader"""
path = self.url.format(field=field, record=record)
data = io.BytesIO(self.fs.cat_file(path))
df = self.pd.read_parquet(data, engine=self.engine)
refs = {c: df[c].to_numpy() for c in df.columns}
try:
df = self.pd.read_parquet(data, engine=self.engine)
refs = {c: df[c].to_numpy() for c in df.columns}
except IOError:
refs = None
return refs

self.open_refs = open_refs
Expand Down Expand Up @@ -871,6 +874,9 @@ def cat(self, path, recursive=False, on_error="raise", **kwargs):
# found and on_error is "raise"
try:
u, s, e = self._cat_common(p)
if not isinstance(u, (bytes, str)):
# nan/None from parquet
continue
except FileNotFoundError as err:
if on_error == "raise":
raise
Expand Down
41 changes: 41 additions & 0 deletions fsspec/implementations/tests/test_reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,3 +792,44 @@ def test_deep_parq(m, engine):
"instant/one/.zarray",
"instant/one/0",
]


def test_parquet_no_data(m):
zarr = pytest.importorskip("zarr")
lz = fsspec.implementations.reference.LazyReferenceMapper.create(
"memory://out.parq", fs=m
)

g = zarr.open_group(lz, mode="w")
arr = g.create_dataset(
name="one",
dtype="int32",
shape=(10,),
chunks=(5,),
compression=None,
fill_value=1,
)
lz.flush()

assert (arr[:] == 1).all()


def test_parquet_no_references(m):
zarr = pytest.importorskip("zarr")
lz = fsspec.implementations.reference.LazyReferenceMapper.create(
"memory://out.parq", fs=m
)

g = zarr.open_group(lz, mode="w")
arr = g.create_dataset(
name="one",
dtype="int32",
shape=(),
chunks=(),
compression=None,
fill_value=1,
)
lz.flush()
arr[...]

assert arr[...].tolist() == 1 # scalar, equal to fill value
2 changes: 1 addition & 1 deletion fsspec/mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def getitems(self, keys, on_error="raise"):
for k, v in out.items()
}
return {
key: out[k2]
key: out[k2] if on_error == "raise" else out.get(k2, KeyError(k2))
for key, k2 in zip(keys, keys2)
if on_error == "return" or not isinstance(out[k2], BaseException)
}
Expand Down

0 comments on commit 199ee82

Please sign in to comment.