Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Amit Jaiswal committed Sep 22, 2024
1 parent 9fd09be commit d2fca79
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
10 changes: 6 additions & 4 deletions python-packages/core/src/omigo_core/tsv.py
Original file line number Diff line number Diff line change
Expand Up @@ -5187,7 +5187,9 @@ def from_df(df):
# return
return tsv.TSV("\t".join(header_fields), data).validate()

def from_maps(mps, accepted_cols = None, excluded_cols = None, url_encoded_cols = None):
def from_maps(mps, accepted_cols = None, excluded_cols = None, url_encoded_cols = None, dmsg = ""):
dmsg = utils.extend_inherit_message(dmsg, "from_maps")

# validation
if (len(mps) == 0):
utils.warn_once("from_maps: empty list")
Expand All @@ -5201,9 +5203,9 @@ def from_maps(mps, accepted_cols = None, excluded_cols = None, url_encoded_cols

# use explode
result = merge_union(xtsvs) \
.explode_json("json", prefix = "json", accepted_cols = accepted_cols, excluded_cols = excluded_cols, url_encoded_cols = url_encoded_cols) \
.remove_prefix("json") \
.drop_cols_if_exists(["__json_index__", "__explode_json_index__"])
.explode_json("json", prefix = "json", accepted_cols = accepted_cols, excluded_cols = excluded_cols, url_encoded_cols = url_encoded_cols, collapse = True, dmsg = dmsg) \
.remove_prefix("json", dmsg = dmsg) \
.drop_cols_if_exists(["__json_index__", "__explode_json_index__"], dmsg = dmsg)

# return
return result
Expand Down
18 changes: 13 additions & 5 deletions python-packages/core/src/omigo_core/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
OMIGO_DEBUG_REPORT_PROGRESS_MIN_THRESH = "OMIGO_DEBUG_REPORT_PROGRESS_MIN_THRESH"
OMIGO_CODE_TODO_WARNING = "OMIGO_CODE_TODO_WARNING"
OMIGO_BIG_TSV_WARN_SIZE_THRESH = "OMIGO_BIG_TSV_WARN_SIZE_THRESH"
OMIGO_RATE_LIMIT_N_WARNINGS = "OMIGO_RATE_LIMIT_N_WARNINGS"
OMIGO_NOOP_N_WARNINGS = "OMIGO_NOOP_N_WARNINGS"

def is_critical():
return str(os.environ.get(OMIGO_CRITICAL, "1")) == "1"
Expand Down Expand Up @@ -446,35 +448,41 @@ def raise_exception_after_n_warnings(msg, num_warnings = 1000):
del EXCEPTION_AFTER_WARNINGS_MSG_CACHE[msg]
raise Exception(msg)

def rate_limit_after_n_warnings(msg, num_warnings = 1000, sleep_secs = 10):
def rate_limit_after_n_warnings(msg, sleep_secs = 10):
global RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE

# resolve num_warnings
num_warnings = int(resolve_default_parameter("num_warnings", os.getenv(OMIGO_RATE_LIMIT_N_WARNINGS), "10000", "rate_limit_after_n_warnings"))

# check if msg is new
if (msg not in RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE.keys()):
RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE[msg] = 1

# check the counter
if (RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE[msg] < num_warnings):
if (num_warnings == -1 or RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE[msg] < num_warnings):
RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE[msg] = RATE_LIMIT_AFTER_WARNINGS_MSG_CACHE[msg] + 1
warn_once(msg)
else:
trace("{}: Sleeping for {} seconds".format(msg, sleep_secs))
time.sleep(sleep_secs)

def noop_after_n_warnings(msg, func, *args, **kwargs):
global NOOP_AFTER_WARNINGS_MSG_CACHE
num_warnings = 1000

# resolve num_warnings
num_warnings = int(resolve_default_parameter("num_warnings", os.getenv(OMIGO_NOOP_N_WARNINGS), "10000", "noop_after_n_warnings"))

# check if msg is new
if (msg not in NOOP_AFTER_WARNINGS_MSG_CACHE.keys()):
NOOP_AFTER_WARNINGS_MSG_CACHE[msg] = 1

# check the counter
if (NOOP_AFTER_WARNINGS_MSG_CACHE[msg] < num_warnings):
if (num_warnings == -1 or NOOP_AFTER_WARNINGS_MSG_CACHE[msg] < num_warnings):
NOOP_AFTER_WARNINGS_MSG_CACHE[msg] = NOOP_AFTER_WARNINGS_MSG_CACHE[msg] + 1
func(*args, **kwargs)
warn_once(msg)
else:
pass
trace("{}: noop".format(msg, sleep_secs))

# Deprecated
def strip_spl_white_spaces(v):
Expand Down
6 changes: 2 additions & 4 deletions python-packages/hydra/src/omigo_hydra/cluster_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -726,11 +726,9 @@ def dir_exists_with_wait(self, path, wait_sec = DEFAULT_WAIT_SEC, attempts = DEF
return False

def is_file(self, path):
utils.debug("is_file : {}".format(path))
return self.fs.is_file(self.__makepath__(path))

def is_directory(self, path):
utils.trace("is_directory: {}".format(path))
return self.fs.is_directory(self.__makepath__(path))

def is_non_empty_dir(self, path):
Expand Down Expand Up @@ -785,8 +783,8 @@ def read_json(self, path, retries = 5, wait_sec = 1):

# TODO: remove tsvutils dependency
def read_tsv(self, path_or_paths):
paths = tsvutils.__get_argument_as_array__(path_or_paths)
full_paths = list([self.__makepath__(self.__normalize_path__(p)) for p in tsvutils.__get_argument_as_array__(path_or_paths)])
paths = utils.get_argument_as_array(path_or_paths)
full_paths = list([self.__makepath__(self.__normalize_path__(p)) for p in utils.get_argument_as_array(path_or_paths)])
return tsv.read(full_paths)

# this is a special method that reads all the files in a directory. The format of the filename is <something>.TIMESTAMP.EXTENSION
Expand Down

0 comments on commit d2fca79

Please sign in to comment.