Skip to content

Commit

Permalink
Restructure library into record and batch
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Dec 28, 2023
1 parent 2f30856 commit 9f03ce8
Show file tree
Hide file tree
Showing 23 changed files with 444 additions and 452 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added

- Restructure library into record and batch

## [0.2.1] - 2023-12-27

### Fixed
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ There is a simple command line interface.
The `isd record` command prints a single record in JSON format:

```shell
isd record 720538-00164-2021
isd record tests/data/720538-00164-2021
```

The Python API allows reading compressed and uncompressed ISD files:

```python
from isd import Batch

batch = Batch.from_path("isd-file")
for record in batch.records:
print(record)
```

Streaming is also supported:

```python
import isd.io

Expand Down
5 changes: 5 additions & 0 deletions docs/api/batch.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
batch
=====

.. automodule:: isd.batch
:members:
4 changes: 2 additions & 2 deletions docs/api/errors.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
isd.errors
==========
errors
======

.. automodule:: isd.errors
:members:
5 changes: 0 additions & 5 deletions docs/api/io.rst

This file was deleted.

3 changes: 1 addition & 2 deletions docs/api/mod.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ Most useful functions and classes are contained in submodules.
:caption: Submodules:

errors
io
pandas
record
batch

isd
---
Expand Down
5 changes: 0 additions & 5 deletions docs/api/pandas.rst

This file was deleted.

4 changes: 2 additions & 2 deletions docs/api/record.rst
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
isd.record
==========
record
======

.. automodule:: isd.record
:members:
64 changes: 39 additions & 25 deletions examples/check_timesteps.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,44 @@
"""Given a directory of ISD files, checks that all timesteps monotonically increase."""

import os
import logging
from pathlib import Path
import sys
from typing import Union

import tqdm

import isd.io

directory = sys.argv[1]
paths = [os.path.join(directory, file_name) for file_name in os.listdir(directory)]
all_monotonic = True
bad_paths = []
for path in tqdm.tqdm(paths):
data_frame = isd.io.read_to_data_frame(path)
min = data_frame.timestamp.min()
max = data_frame.timestamp.max()
is_monotonic = data_frame.timestamp.is_monotonic
if not is_monotonic:
all_monotonic = False
bad_paths.append(path)
tqdm.tqdm.write(f"{path}: min={min}, max={max}, is_monotonic={is_monotonic}")

if all_monotonic:
print("All files have monotonically increasing timestamps!")
else:
print("Not all files have monotonically increasing timestamps, here they are:")
for path in bad_paths:
print(f" - {path}")
sys.exit(1)
from isd import Batch

logging.basicConfig(level=logging.INFO)

log = logging.getLogger(__name__)


def main(path: Union[str, Path]) -> None:
path = Path(path)
file_names = list(path.glob("*"))
all_monotonic = True
bad_files = []
for file_name in tqdm.tqdm(file_names):
df = Batch.from_path(file_name).to_df()
ts_min = df.datetime.min()
ts_max = df.datetime.max()
is_monotonic = df.datetime.is_monotonic_increasing
if not is_monotonic:
all_monotonic = False
bad_files.append(file_name)
log.info(
f"{file_name}: min={ts_min}, max={ts_max}, is_monotonic={is_monotonic}"
)

if all_monotonic:
print("All files have monotonically increasing timestamps!")
else:
print("Not all files have monotonically increasing timestamps, here they are:")
for file_name in bad_files:
print(f" - {file_name}")
sys.exit(1)


if __name__ == "__main__":
directory = sys.argv[1]
main(directory)
3 changes: 2 additions & 1 deletion src/isd/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from isd.errors import IsdError
from isd.batch import Batch
from isd.record import Record

__all__ = ["IsdError", "Record"]
__all__ = ["IsdError", "Batch", "Record"]
87 changes: 87 additions & 0 deletions src/isd/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import gzip
import json
from io import BytesIO
from pathlib import Path
from dataclasses import dataclass
from typing import List, Union, Optional, Dict, Any
import datetime as dt

from isd.record import Record

import pandas as pd


@dataclass
class Batch:
records: List[Record]

def __len__(self) -> int:
return len(self.records)

def __getitem__(self, index: int) -> Record:
return self.records[index]

@classmethod
def from_path(cls, path: Union[str, Path]) -> "Batch":
"""Opens a local ISD file and returns an iterator over its records.
If the path has a .gz extension, this function will assume it has gzip
compression and will attempt to open it using `gzip.open`.
"""
path = Path(path)
if path.suffix == ".gz":
with gzip.open(path) as gzip_file:
return cls(
[
Record.from_string(gzip_line.decode("utf-8"))
for gzip_line in gzip_file
]
)
else:
with open(path) as uncompressed_file:
return cls(
[
Record.from_string(uncompressed_line)
for uncompressed_line in uncompressed_file
]
)

@classmethod
def from_string(cls, string: Union[str, BytesIO]) -> "Batch":
"""Reads records from a text io stream."""
if isinstance(string, BytesIO):
string = string.read().decode("utf-8")
return cls([Record.from_string(line) for line in string.splitlines()])

def filter_by_datetime(
self,
start_date: Optional[dt.datetime] = None,
end_date: Optional[dt.datetime] = None,
) -> "Batch":
"""Returns an iterator over records filtered by start and end datetimes (both optional)."""
return Batch(
[
record
for record in self.records
if (not start_date or record.datetime >= start_date)
and (not end_date or record.datetime < end_date)
]
)

def to_dict(self) -> List[Dict[str, Any]]:
"""Returns a list of dictionaries, one for each record."""
return [record.to_dict() for record in self.records]

def to_json(self, indent: int = 4) -> str:
"""Returns a JSON string of all records."""
data = []
for d in self.to_dict():
d["datetime"] = d["datetime"].isoformat()
data.append(d)
return json.dumps(data, indent=indent)

def to_df(self) -> pd.DataFrame:
"""Reads a local ISD file into a DataFrame."""
import pandas as pd

return pd.DataFrame([record.to_dict() for record in self.records])
18 changes: 7 additions & 11 deletions src/isd/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
# type: ignore

import dataclasses
import itertools
import json

import click
from click import ClickException

import isd.io
from isd.batch import Batch


@click.group()
Expand All @@ -20,9 +16,9 @@ def main() -> None:
@click.option("-i", "--index", default=0)
def record(infile: str, index: int) -> None:
"""Prints a single record to standard output in JSON format."""
with isd.io.open(infile) as records:
record = next(itertools.islice(records, index, None), None)
if record:
print(json.dumps(dataclasses.asdict(record), indent=4))
else:
raise ClickException(f"No record with index {index}")
batch = Batch.from_path(infile)
try:
record_ = batch[index]
print(record_.to_json())
except IndexError:
raise ClickException(f"No record with index {index}")
27 changes: 3 additions & 24 deletions src/isd/io.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
import datetime
import gzip
import os.path
from contextlib import contextmanager
from typing import Generator, Iterable, Iterator, Optional, TextIO
from typing import Generator, Iterable

from pandas import DataFrame

from . import pandas as isd_pandas
from .record import Record

builtin_open = open
Expand All @@ -21,28 +18,10 @@ def open(path: str) -> Generator[Iterable[Record], None, None]:
"""
if os.path.splitext(path)[1] == ".gz":
with gzip.open(path) as gzip_file:
yield (Record.parse(gzip_line.decode("utf-8")) for gzip_line in gzip_file)
yield (Record.from_string(gzip_line.decode("utf-8")) for gzip_line in gzip_file)
else:
with builtin_open(path) as uncompressed_file:
yield (
Record.parse(uncompressed_line)
Record.from_string(uncompressed_line)
for uncompressed_line in uncompressed_file
)


def from_text_io(text_io: TextIO) -> Iterator[Record]:
"""Reads records from a text io stream."""
while True:
line = text_io.readline()
if not line:
break
else:
yield Record.parse(line)


def read_to_data_frame(
path: str, since: Optional[datetime.datetime] = None
) -> DataFrame:
"""Reads a local ISD file into a DataFrame."""
with open(path) as file:
return isd_pandas.data_frame(file, since=since)
Loading

0 comments on commit 9f03ce8

Please sign in to comment.