Skip to content

Commit

Permalink
Restructure library into record and batch
Browse files Browse the repository at this point in the history
  • Loading branch information
gutzbenj committed Dec 22, 2023
1 parent db9e892 commit efe4347
Show file tree
Hide file tree
Showing 22 changed files with 365 additions and 541 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
src/isd/_version.py
isd/_version.py

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
56 changes: 32 additions & 24 deletions examples/check_timesteps.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,38 @@
"""Given a directory of ISD files, checks that all timesteps monotonically increase."""

import os
from pathlib import Path
import sys
from typing import Union

import tqdm

import isd.io

directory = sys.argv[1]
paths = [os.path.join(directory, file_name) for file_name in os.listdir(directory)]
all_monotonic = True
bad_paths = []
for path in tqdm.tqdm(paths):
data_frame = isd.io.read_to_data_frame(path)
min = data_frame.timestamp.min()
max = data_frame.timestamp.max()
is_monotonic = data_frame.timestamp.is_monotonic
if not is_monotonic:
all_monotonic = False
bad_paths.append(path)
tqdm.tqdm.write(f"{path}: min={min}, max={max}, is_monotonic={is_monotonic}")

if all_monotonic:
print("All files have monotonically increasing timestamps!")
else:
print("Not all files have monotonically increasing timestamps, here they are:")
for path in bad_paths:
print(f" - {path}")
sys.exit(1)
from isd import Batch


def main(path: Union[str, Path]) -> None:
path = Path(path)
file_names = list(path.glob("*"))
all_monotonic = True
bad_files = []
for file_name in tqdm.tqdm(file_names):
df = Batch.from_path(file_name).to_df()
ts_min = df.timestamp.min()
ts_max = df.timestamp.max()
is_monotonic = df.timestamp.is_monotonic
if not is_monotonic:
all_monotonic = False
bad_files.append(file_name)
tqdm.tqdm.write(f"{file_name}: min={ts_min}, max={ts_max}, is_monotonic={is_monotonic}")

if all_monotonic:
print("All files have monotonically increasing timestamps!")
else:
print("Not all files have monotonically increasing timestamps, here they are:")
for file_name in bad_files:
print(f" - {path}")
sys.exit(1)


if __name__ == "__main__":
directory = sys.argv[1]
main(directory)
5 changes: 5 additions & 0 deletions isd/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from isd.errors import IsdError
from isd.record import Record
from isd.batch import Batch

__all__ = ["IsdError", "Record", "Batch"]
53 changes: 53 additions & 0 deletions isd/batch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import gzip
from io import BytesIO
from pathlib import Path
from dataclasses import dataclass
from typing import List, TYPE_CHECKING, Union, Optional
import datetime as dt

from isd.record import Record

if TYPE_CHECKING:
import pandas as pd


@dataclass
class Batch:
records: List[Record]

@classmethod
def from_path(cls, path: Union[str, Path]) -> "Batch":
"""Opens a local ISD file and returns an iterator over its records.
If the path has a .gz extension, this function will assume it has gzip
compression and will attempt to open it using `gzip.open`.
"""
path = Path(path)
if path.suffix == ".gz":
with gzip.open(path) as gzip_file:
return cls([Record.from_string(gzip_line.decode("utf-8")) for gzip_line in gzip_file])
else:
with open(path) as uncompressed_file:
return cls([Record.from_string(uncompressed_line) for uncompressed_line in uncompressed_file])

@classmethod
def from_string(cls, string: Union[str, BytesIO]) -> "Batch":
"""Reads records from a text io stream."""
if isinstance(string, BytesIO):
string = string.read().decode("utf-8")
return cls([Record.from_string(line) for line in string.splitlines()])

def filter_by_datetime(self, start_date: Optional[dt.datetime] = None, end_date: Optional[dt.datetime] = None,
) -> List[Record]:
"""Returns an iterator over records filtered by start and end datetimes (both optional)."""
return [
record
for record in self.records
if (not start_date or record.datetime() >= start_date)
and (not end_date or record.datetime() < end_date)
]

def to_df(self) -> "pd.DataFrame":
"""Reads a local ISD file into a DataFrame."""
import pandas as pd
return pd.DataFrame([record.to_dict() for record in self.records])
18 changes: 7 additions & 11 deletions src/isd/cli.py → isd/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
# type: ignore

import dataclasses
import itertools
import json

import click
from click import ClickException

import isd.io
from isd.batch import Batch


@click.group()
Expand All @@ -20,9 +16,9 @@ def main() -> None:
@click.option("-i", "--index", default=0)
def record(infile: str, index: int) -> None:
"""Prints a single record to standard output in JSON format."""
with isd.io.open(infile) as records:
record = next(itertools.islice(records, index, None), None)
if record:
print(json.dumps(dataclasses.asdict(record), indent=4))
else:
raise ClickException(f"No record with index {index}")
batch = Batch.from_path(infile)
try:
record_ = batch.records[index]
print(record_.to_json())
except IndexError:
raise ClickException(f"No record with index {index}")
File renamed without changes.
223 changes: 223 additions & 0 deletions isd/record.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
import datetime
import json
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Callable, List, Optional, Tuple, Union, Dict

from isd.errors import IsdError

MIN_LINE_LENGTH = 105


@dataclass
class Record:
"""A single string of an ISD file."""

usaf_id: str
ncei_id: str
year: int
month: int
day: int
hour: int
minute: int
data_source: str
latitude: Optional[float]
longitude: Optional[float]
report_type: Optional[str]
elevation: Optional[float]
call_letters: Optional[str]
quality_control_process: str
wind_direction: Optional[int]
wind_direction_quality_code: str
wind_observation_type: Optional[str]
wind_speed: Optional[float]
wind_speed_quality_code: str
ceiling: Optional[int]
ceiling_quality_code: str
ceiling_determination_code: Optional[str]
cavok_code: Optional[str]
visibility: Optional[int]
visibility_quality_code: str
visibility_variability_code: Optional[str]
visibility_variability_quality_code: str
air_temperature: Optional[float]
air_temperature_quality_code: str
dew_point_temperature: Optional[float]
dew_point_temperature_quality_code: str
sea_level_pressure: Optional[float]
sea_level_pressure_quality_code: str
additional_data: str
remarks: str
element_quality_data: str
original_observation_data: str

@classmethod
def from_string(cls, string: Union[str, BytesIO]) -> "Record":
"""Parses an ISD string into a record."""
if isinstance(string, BytesIO):
string = string.read().decode("utf-8")
if len(string) < MIN_LINE_LENGTH:
raise IsdError(f"Invalid ISD string (too short): {string}")
string = string.strip()
usaf_id = string[4:10]
ncei_id = string[10:15]
year = int(string[15:19])
month = int(string[19:21])
day = int(string[21:23])
hour = int(string[23:25])
minute = int(string[25:27])
data_source = string[27]
# TODO test missing latitudes and longitudes
latitude = cls._transform_value(string[28:34], "+99999", lambda s: float(s) / 1000)
longitude = cls._transform_value(string[34:41], "+999999", lambda s: float(s) / 1000)
report_type = cls._transform_value(string[41:46], "99999")
elevation = cls._transform_value(string[46:51], "+9999", lambda s: float(s))
call_letters = cls._transform_value(string[51:56], "99999")
quality_control_process = string[56:60]
wind_direction = cls._transform_value(string[60:63], "999", lambda s: int(s))
wind_direction_quality_code = string[63]
wind_observation_type = cls._transform_value(string[64], "9")
wind_speed = cls._transform_value(string[65:69], "9999", lambda s: float(s) / 10)
wind_speed_quality_code = string[69]
ceiling = cls._transform_value(string[70:75], "99999", lambda s: int(s))
ceiling_quality_code = string[75]
ceiling_determination_code = cls._transform_value(string[76], "9")
cavok_code = cls._transform_value(string[77], "9")
visibility = cls._transform_value(string[78:84], "999999", lambda s: int(s))
visibility_quality_code = string[84]
visibility_variability_code = cls._transform_value(string[85], "9")
visibility_variability_quality_code = string[86]
air_temperature = cls._transform_value(string[87:92], "+9999", lambda s: float(s) / 10)
air_temperature_quality_code = string[92]
dew_point_temperature = cls._transform_value(string[93:98], "+9999", lambda s: float(s) / 10)
dew_point_temperature_quality_code = string[98]
sea_level_pressure = cls._transform_value(string[99:104], "99999", lambda s: float(s) / 10)
sea_level_pressure_quality_code = string[104]
additional_data, remainder = cls._extract_data(
string[105:], "ADD", ["REM", "EQD", "QNN"]
)
remarks, remainder = cls._extract_data(remainder, "REM", ["EQD", "QNN"])
element_quality_data, remainder = cls._extract_data(remainder, "EQD", ["QNN"])
original_observation_data, remainder = cls._extract_data(remainder, "QNN", [])

if remainder:
raise IsdError(f"Invalid remainder after parsing: {remainder}")

return cls(
usaf_id=usaf_id,
ncei_id=ncei_id,
year=year,
month=month,
day=day,
hour=hour,
minute=minute,
data_source=data_source,
latitude=latitude,
longitude=longitude,
report_type=report_type,
elevation=elevation,
call_letters=call_letters,
quality_control_process=quality_control_process,
wind_direction=wind_direction,
wind_direction_quality_code=wind_direction_quality_code,
wind_observation_type=wind_observation_type,
wind_speed=wind_speed,
wind_speed_quality_code=wind_speed_quality_code,
ceiling=ceiling,
ceiling_quality_code=ceiling_quality_code,
ceiling_determination_code=ceiling_determination_code,
cavok_code=cavok_code,
visibility=visibility,
visibility_quality_code=visibility_quality_code,
visibility_variability_code=visibility_variability_code,
visibility_variability_quality_code=visibility_variability_quality_code,
air_temperature=air_temperature,
air_temperature_quality_code=air_temperature_quality_code,
dew_point_temperature=dew_point_temperature,
dew_point_temperature_quality_code=dew_point_temperature_quality_code,
sea_level_pressure=sea_level_pressure,
sea_level_pressure_quality_code=sea_level_pressure_quality_code,
additional_data=additional_data,
remarks=remarks,
element_quality_data=element_quality_data,
original_observation_data=original_observation_data,
)

@classmethod
def _extract_data(cls, message: str, tag: str, later_tags: List[str]) -> Tuple[str, str]:
if message.startswith(tag):
index = None
for other_tag in later_tags:
try:
index = message.find(other_tag)
except ValueError:
continue
break
if index != -1:
data = message[len(tag): index]
tail = message[index:]
return data, tail
else:
return message[len(tag):], ""
else:
return "", message

@classmethod
def _transform_value(
cls, string: str, missing_value: str, transform: Optional[Callable[[str], Any]] = None
) -> Any:
if string == missing_value:
return None
elif transform:
return transform(string)
else:
return string

def datetime(self) -> datetime.datetime:
"""Returns this record's datetime."""
return datetime.datetime(
self.year, self.month, self.day, self.hour, self.minute
)

def to_dict(self) -> Dict[str, Any]:
"""Returns a dictionary representation of this record."""
return {
"usaf_id": self.usaf_id,
"ncei_id": self.ncei_id,
# use datetime instead of year, month, day, hour, minute
"datetime": self.datetime(),
"data_source": self.data_source,
"latitude": self.latitude,
"longitude": self.longitude,
"report_type": self.report_type,
"elevation": self.elevation,
"call_letters": self.call_letters,
"quality_control_process": self.quality_control_process,
"wind_direction": self.wind_direction,
"wind_direction_quality_code": self.wind_direction_quality_code,
"wind_observation_type": self.wind_observation_type,
"wind_speed": self.wind_speed,
"wind_speed_quality_code": self.wind_speed_quality_code,
"ceiling": self.ceiling,
"ceiling_quality_code": self.ceiling_quality_code,
"ceiling_determination_code": self.ceiling_determination_code,
"cavok_code": self.cavok_code,
"visibility": self.visibility,
"visibility_quality_code": self.visibility_quality_code,
"visibility_variability_code": self.visibility_variability_code,
"visibility_variability_quality_code": self.visibility_variability_quality_code,
"air_temperature": self.air_temperature,
"air_temperature_quality_code": self.air_temperature_quality_code,
"dew_point_temperature": self.dew_point_temperature,
"dew_point_temperature_quality_code": self.dew_point_temperature_quality_code,
"sea_level_pressure": self.sea_level_pressure,
"sea_level_pressure_quality_code": self.sea_level_pressure_quality_code,
"additional_data": self.additional_data,
"remarks": self.remarks,
"element_quality_data": self.element_quality_data,
"original_observation_data": self.original_observation_data,
}

def to_json(self, indent: int = 4) -> str:
"""Returns a JSON representation of this record."""
return json.dumps(self.to_dict(), indent=indent)
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ requires = [
build-backend = "setuptools.build_meta"

[tool.setuptools_scm]
write_to = "src/isd/_version.py"
write_to = "isd/_version.py"
local_scheme = "no-local-version"
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mypy
pandas~=1.3
pandas-stubs
pre-commit
pytest
Expand Down
Loading

0 comments on commit efe4347

Please sign in to comment.