Skip to content

Commit

Permalink
adding multiprocessing
Browse files Browse the repository at this point in the history
  • Loading branch information
linkodm1 committed Aug 9, 2024
1 parent 92295e2 commit a8cc8fd
Showing 1 changed file with 46 additions and 33 deletions.
79 changes: 46 additions & 33 deletions anms-core/anms/routes/ARIs/reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from sqlalchemy.engine import Result
import re
from datetime import datetime
import multiprocessing as mp

from anms.components.schemas import ARIs
from anms.models.relational import get_async_session, get_session
Expand Down Expand Up @@ -102,6 +103,43 @@ async def find_var_type(obj_metadata):

return data_type_id

async def _process_report_entries(x):
entry, ac_types_and_id = x
# for entry in entries:
curr_values = []
time = datetime.fromtimestamp(int(entry.time)).strftime('%Y-%m-%d %H:%M:%S')

string_values = list(filter(None, re.split(r",|'(.*?)'", entry.string_values))) if entry.string_values else []
uint_values = entry.uint_values.split(',') if entry.uint_values else []
int_values = entry.int_values.split(',') if entry.int_values else []
real32_values = entry.real32_values.split(',') if entry.real32_values else []
real64_values = entry.real64_values.split(',') if entry.real64_values else []
uvast_values = entry.uvast_values.split(',') if entry.uvast_values else []
vast_values = entry.vast_values.split(',') if entry.vast_values else []
value_matchup = {18: string_values, 19: int_values, 20: uint_values, 21: vast_values, 22: uvast_values,
23: real32_values, 24: real64_values}
curr_values.append(time)
for type_id, obj_id in ac_types_and_id:
# find the type of ari
if type_id == 2:
curr_type = await find_edd_type(obj_id)
elif type_id == 12:
curr_type = await find_var_type(obj_id)
else:
curr_type = type_id
if value_matchup[curr_type]:
curr_values.append(value_matchup[curr_type].pop(0))
if ac_types_and_id is []:
curr_values.append(','.join(string_values))
curr_values.append(','.join(uint_values))
curr_values.append(','.join(int_values))
curr_values.append(','.join(real32_values))
curr_values.append(','.join(real64_values))
curr_values.append(','.join(uvast_values))
curr_values.append(','.join(vast_values))

return curr_values


# entries tabulated returns header and values in correct order
@router.get("/entries/table/{agent_id}/{adm}/{report_name}", status_code=status.HTTP_200_OK,
Expand Down Expand Up @@ -154,45 +192,20 @@ async def report_ac(agent_id: str, adm: str, report_name: str):

stmt = select(Report).where(Report.agent_id == agent_id , Report.ADM == adm_name
, Report.report_name == report_name)
# find the type of ari
type_matchup = {2: find_edd_type, 12: find_var_type, }

final_values = []
final_values.append(ac_names)
async with get_async_session() as session:
result: Result = await session.scalars(stmt)
entries = result.all()

pool = mp.Pool(mp.cpu_count())
args_to_use = []
for entry in entries:
curr_values = []
time = datetime.fromtimestamp(int(entry.time)).strftime('%Y-%m-%d %H:%M:%S')

string_values = list(filter(None, re.split(r",|'(.*?)'", entry.string_values))) if entry.string_values else []
uint_values = entry.uint_values.split(',') if entry.uint_values else []
int_values = entry.int_values.split(',') if entry.int_values else []
real32_values = entry.real32_values.split(',') if entry.real32_values else []
real64_values = entry.real64_values.split(',') if entry.real64_values else []
uvast_values = entry.uvast_values.split(',') if entry.uvast_values else []
vast_values = entry.vast_values.split(',') if entry.vast_values else []
value_matchup = {18: string_values, 19: int_values, 20: uint_values, 21: vast_values, 22: uvast_values,
23: real32_values, 24: real64_values}
curr_values.append(time)
for type_id, obj_id in ac_types_and_id:
if type_id in type_matchup:
curr_type = await type_matchup[type_id](obj_id)
else:
curr_type = type_id
if value_matchup[curr_type]:
curr_values.append(value_matchup[curr_type].pop(0))
if ac_types_and_id is []:
curr_values.append(','.join(string_values))
curr_values.append(','.join(uint_values))
curr_values.append(','.join(int_values))
curr_values.append(','.join(real32_values))
curr_values.append(','.join(real64_values))
curr_values.append(','.join(uvast_values))
curr_values.append(','.join(vast_values))

final_values.append(curr_values)
args_to_use.append([entry, ac_types_and_id])
res = pool.map_async(_process_report_entries, args_to_use)
for result in res.get():
final_values.append(result)

logger.info(f"{final_values}")
return final_values

0 comments on commit a8cc8fd

Please sign in to comment.