Skip to content

Commit

Permalink
enable local reading in Bloomfilter (Fixes #83)
Browse files Browse the repository at this point in the history
  • Loading branch information
mauriceweber committed Nov 22, 2023
1 parent f43906a commit 04270d9
Showing 1 changed file with 28 additions and 2 deletions.
30 changes: 28 additions & 2 deletions app/src/bloomfilter.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def __parse_listings(self):
return uris

@staticmethod
def _load_file(uri, client) -> Tuple[ReadStatus, io.BytesIO]:
def __load_from_s3(uri, client):
try:
streaming_body = client.get_object(
Bucket=uri.netloc, Key=uri.path.lstrip("/")
Expand All @@ -193,6 +193,33 @@ def _load_file(uri, client) -> Tuple[ReadStatus, io.BytesIO]:
buffer = None
is_success = False

return is_success, msg, buffer

@staticmethod
def __load_from_disk(uri):
try:
with open(uri.path, "rb") as f:
buffer = io.BytesIO(f.read())
msg = f"__DISK_URI_READ_SUCCESS__ success reading {uri.path}"
is_success = True
except Exception as e:
msg = (
f"__DISK_URI_READ_ERROR__ failed reading {uri.path}: "
f"caught exception {e.__class__.__name__}: {e}"
)
buffer = None
is_success = False

return is_success, msg, buffer

def _load_file(self, uri, client) -> Tuple[ReadStatus, io.BytesIO]:
if uri.scheme == "s3":
is_success, msg, buffer = self.__load_from_s3(uri, client)
elif uri.scheme == "file":
is_success, msg, buffer = self.__load_from_disk(uri)
else:
raise ValueError(f"Unknown scheme {uri.scheme}")

read_status = ReadStatus(
is_success=is_success, msg=msg, uri=uri.geturl()
)
Expand Down Expand Up @@ -394,7 +421,6 @@ def __parallel_run(self, input_uris):
def run(self):
start_time = dt.now()
print(f"start @ {start_time.strftime('%Y-%m-%d %H:%M:%S')}")
# self.__threaded_run(input_uris=self.__parse_listings())
self.__parallel_run(input_uris=self.__parse_listings())
end_time = dt.now()
print(f"end @ {end_time.strftime('%Y-%m-%d %H:%M:%S')}")
Expand Down

0 comments on commit 04270d9

Please sign in to comment.