Skip to content

Commit

Permalink
Merge pull request #52 from OObasuyi/v.2.2-dev
Browse files Browse the repository at this point in the history
V.2.2 dev
  • Loading branch information
OObasuyi authored Sep 13, 2022
2 parents a7cbe06 + c89fd45 commit 3f54dfb
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 110 deletions.
147 changes: 111 additions & 36 deletions fw_cleanup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import datetime
from re import split, match

from fw_deploy import FireStick
from tqdm import tqdm
Expand All @@ -9,13 +10,34 @@


class FireBroom(FireStick):
def __init__(self, fmc_host: str, ftd_host: str, access_policy: str, rule_prepend_name: str,
zone_of_last_resort: str, same_cred=True, ruleset_type='ALLOW',domain='Global',**kwargs):
ippp_location = None
super().__init__(fmc_host, ftd_host, ippp_location, access_policy, rule_prepend_name,
zone_of_last_resort,same_cred=same_cred,ruleset_type=ruleset_type,domain=domain,**kwargs)
def __init__(self, configuration_data: dict, cred_file=None):
configuration_data['ippp_location'] = None
super().__init__(configuration_data=configuration_data, cred_file=cred_file)
self.rest_connection()

def is_prepend_naming_correct(self,name_of_obj):
# check if it's a exact name match of the group
name_check = split(r'[^a-zA-Z0-9\s]', name_of_obj)
# # UC1: name kinda similar
# if self.rule_prepend_name != name_check[0]:
# return False

try:
# UC1: name kinda similar
# UC2: if its anything other than our naming that follows then dont use.
if 'NetGroup' in name_check or ('net' and 'group' in name_check):
if match(f'^{self.rule_prepend_name}', name_of_obj).group() == self.rule_prepend_name:
return True
else:
return False
except Exception as error:
self.logfmc.debug(error)

if self.rule_prepend_name == name_check[0]:
return True
else:
return False

def del_fmc_objects(self, type_, obj_type):
"""PLEASE BE AS SPECIFIC AS POSSIBLE"""
# get the latest created objects
Expand All @@ -25,7 +47,7 @@ def del_fmc_objects(self, type_, obj_type):
self.utils.permission_check(f'Are you sure you want to delete {obj_type.upper()} ***{self.rule_prepend_name}*** {type_} objects?')
if type_ == 'network':
def net_delete():
del_list = [i[2] for i in self.net_data if self.rule_prepend_name in i[0]] if self.rule_prepend_name != 'all' else [i[2] for i in self.net_data]
del_list = [i[2] for i in self.net_data if self.is_prepend_naming_correct(i[0])] if self.rule_prepend_name != 'all' else [i[2] for i in self.net_data]
for obj_id in tqdm(del_list, total=len(del_list), desc=f'deleting {obj_type} objects'):
try:
if '/' in obj_id[1]:
Expand All @@ -35,8 +57,8 @@ def net_delete():
except Exception as error:
self.logfmc.error(f'Cannot delete {obj_id} from set {obj_type} of {type_} \n received code: {error}')

def net_port_delete():
del_list = [i[2] for i in self.net_group_object if self.rule_prepend_name in i[0]] if self.rule_prepend_name != 'all' else [i[2] for i in self.net_group_object]
def net_group_delete():
del_list = [i[2] for i in self.net_group_object if self.is_prepend_naming_correct(i[0])] if self.rule_prepend_name != 'all' else [i[2] for i in self.net_group_object]
for obj_id in tqdm(del_list, total=len(del_list), desc=f'deleting {obj_type} objects'):
try:
self.fmc.object.networkgroup.delete(obj_id)
Expand All @@ -46,9 +68,9 @@ def net_port_delete():
if obj_type == 'net':
net_delete()
elif obj_type == 'net_group':
net_port_delete()
net_group_delete()
elif obj_type == 'all':
net_port_delete()
net_group_delete()
net_delete()

elif type_ == 'port':
Expand Down Expand Up @@ -77,23 +99,23 @@ def del_port_group():
del_port()

elif type_ == 'rule':
acp_id,acp_rules = self.retrieve_rule_objects()
acp_id, acp_rules = self.retrieve_rule_objects()
del_list = [i['name'] for i in acp_rules if self.rule_prepend_name in i['name']] if self.rule_prepend_name != 'all' else acp_rules
for obj_id in tqdm(del_list, total=len(del_list), desc=f'deleting {self.rule_prepend_name} rules'):
try:
self.fmc.policy.accesspolicy.accessrule.delete(container_uuid=acp_id, name=obj_id)
except Exception as error:
self.logfmc.error(f'Cannot delete {obj_id} from set {self.rule_prepend_name} for rules \n received code: {error}')
else:
raise NotImplementedError(f'type_ not found please select rule, port, or network. you passed {type_}')
raise ValueError(f'type_ not found please select rule, port, or network. you passed {type_}')

def collapse_fmc_rules(self,comment:str=False,recover:bool=False):
def collapse_fmc_rules(self, comment: str = False, recover: bool = False):
temp_dir = 'temp_rules'
dt_now = datetime.now().replace(microsecond=0).strftime("%Y%m%d%H%M%S")
save_ext = 'rulbk'
recovery_fname = f'{self.rule_prepend_name}_save_{dt_now}.{save_ext}'
recovery_loc = self.utils.create_file_path(temp_dir,recovery_fname)
if not isinstance(comment,str):
recovery_loc = self.utils.create_file_path(temp_dir, recovery_fname)
if not isinstance(comment, str):
raise ValueError('COMMENT VALUE MUST BE PASSED')
acp_id, acp_rules = self.retrieve_rule_objects()
self.fmc_net_port_info()
Expand Down Expand Up @@ -122,17 +144,17 @@ def collapse_fmc_rules(self,comment:str=False,recover:bool=False):
for col in acp_rules.columns:
acp_rules[col] = acp_rules[col].apply(lambda x: tuple(v for v in x) if isinstance(x, list) else x)
# fill in vals that are really any
acp_rules.replace({None:'any'},inplace=True)
acp_rules.replace({None: 'any'}, inplace=True)
# collapse FW rules by zone
grouped_rules = acp_rules.groupby(['src_z','dst_z'])
grouped_rules = acp_rules.groupby(['src_z', 'dst_z'])
gpl = grouped_rules.size()[grouped_rules.size() > 0].index.values.tolist()
collapsed_rules = []
for grules in gpl:
grules = grouped_rules.get_group(grules)
# separate rules with "any" values
grules_any = grules[(grules['source'] == 'any') | (grules['destination'] == 'any') | (grules['port'] == 'any')]
grules = grules[(grules['source'] != 'any') & (grules['destination'] != 'any') & (grules['port'] != 'any')]
for sep_rules in [grules_any,grules]:
for sep_rules in [grules_any, grules]:
if not sep_rules.empty:
# group frame attributes
agg_src_net = []
Expand Down Expand Up @@ -167,12 +189,12 @@ def collapse_fmc_rules(self,comment:str=False,recover:bool=False):
collapsed_rules.append(group.to_dict())
collapsed_rules = pd.DataFrame(collapsed_rules)
# remove tuples from multi-zoned rows
for rule_pd in [collapsed_rules,acp_rules]:
for rule_pd in [collapsed_rules, acp_rules]:
for col in rule_pd.columns:
rule_pd[col] = rule_pd[col].apply(lambda x: list(v for v in x) if isinstance(x, tuple) else x)

collapsed_rules.rename(columns={'src_z':'source_zone','dst_z':'destination_zone'},inplace=True)
collapsed_rules.drop(columns=['policy_name','source','destination'],inplace=True)
collapsed_rules.rename(columns={'src_z': 'source_zone', 'dst_z': 'destination_zone'}, inplace=True)
collapsed_rules.drop(columns=['policy_name', 'source', 'destination'], inplace=True)
if comment:
collapsed_rules['comment'] = comment

Expand All @@ -185,9 +207,9 @@ def collapse_fmc_rules(self,comment:str=False,recover:bool=False):
return

# Delete old rules
self.del_fmc_objects(type_='rule',obj_type='all')
self.del_fmc_objects(type_='rule', obj_type='all')
# send new rules
success = self.deploy_rules(collapsed_rules,acp_id)
success,_ = self.deploy_rules(collapsed_rules, acp_id)
if not success:
self.logfmc.critical('Couldnt push new configs. Rolling Back!')
self.rollback_acp_op(acp_rules, acp_id, comment=comment)
Expand All @@ -207,24 +229,77 @@ def collapse_fmc_rules(self,comment:str=False,recover:bool=False):
# reinstall old ones
self.rollback_acp_op(acp_rules, acp_id, comment=comment)
else:
self.del_fmc_objects(type_='port',obj_type='all')
self.del_fmc_objects(type_='network',obj_type='all')
self.del_fmc_objects(type_='port', obj_type='all')
self.del_fmc_objects(type_='network', obj_type='all')
self.logfmc.warning(f'completed firewall cleanup for ***{self.rule_prepend_name}***')
# move old temp to archive
archive_dir = self.utils.create_file_path('archive', recovery_fname)
replace(recovery_loc,archive_dir)
replace(recovery_loc, archive_dir)

def clean_object_store(self, clean_type):
self.fmc_net_port_info()

if clean_type == 'resolve':
# delete unused
if self.config_data.get('remove_unused'):
self.del_fmc_objects(type_='network', obj_type='net')
self.fmc_net_port_info()

objects_ = [tuple([str(x['name']), str(x['value']), str(x['id'])]) for x in self.fmc.object.host.get()]
for obj in tqdm(objects_, total=len(objects_), desc=f'updating {clean_type} for objects'):
obj_ip = obj[1]
try:
new_name = self.retrieve_hostname(obj_ip)
if new_name != obj_ip:
# update_objects.append({'name': new_name, 'value': obj[1],'id':obj[2]})
update_obj = {'name': new_name, 'value': obj[1], 'id': obj[2]}
self.fmc.object.host.update(update_obj)
except Exception as error:
self.logfmc.debug(error)
continue

elif clean_type == 'group':
new_name = self.config_data.get('group_clean_name') if self.config_data.get('group_clean_name') else '_NetGroup_'
# delete unused
if self.config_data.get('remove_unused'):
self.del_fmc_objects(type_='network',obj_type='net_group')
self.fmc_net_port_info()

target_change_list = []
# get objects that need to be changed
for ngo in self.net_group_object:
name_of_group = ngo[0]
if self.rule_prepend_name in name_of_group:
correct_name = self.is_prepend_naming_correct(name_of_group)
if not correct_name:
continue

def rollback_acp_op(self,rollback_pd,acp_id,comment:str=False):
rollback_pd.rename(columns={'src_z': 'source_zone', 'dst_z': 'destination_zone','source': 'source_network', 'destination': 'destination_network'}, inplace=True)
if new_name not in name_of_group:
target_change_list.append(ngo)
# change the name based on if this is the default or not
whole_name_change = False if new_name == '_NetGroup_' else True
counter = 0
for grouped_obj in tqdm(target_change_list, total=len(target_change_list), desc=f'updating {clean_type} objects name'):
existing_obj = self.fmc.object.networkgroup.get(grouped_obj[2])
# error looper
while True:
try:
repl_name = f"{self.rule_prepend_name}{new_name}{counter + 1}" if not whole_name_change else f"{new_name}_NetGroup_{counter + 1}"
existing_obj['name'] = repl_name
self.fmc.object.networkgroup.update(existing_obj)
counter += 1
break
except Exception as error:
self.logfmc.error(error)
# if the name exist already. up the counter and try again
if 'already exists' in str(error):
counter += 1
else:
raise NotImplementedError('grouped cleaned issue')

def rollback_acp_op(self, rollback_pd, acp_id, comment: str = False):
rollback_pd.rename(columns={'src_z': 'source_zone', 'dst_z': 'destination_zone', 'source': 'source_network', 'destination': 'destination_network'}, inplace=True)
rollback_pd.drop(columns=['policy_name'], inplace=True)
if comment:
rollback_pd['comment'] = comment
self.deploy_rules(rollback_pd, acp_id)


if __name__ == "__main__":
weeper = FireBroom(access_policy='test12', ftd_host='10.11.6.191', fmc_host='10.11.6.60', rule_prepend_name='test_st_beta_2', zone_of_last_resort='outside_zone',same_cred=False)
weeper.collapse_fmc_rules(comment='tester123')
# augWork.del_fmc_objects(type_='port',obj_type='all')
# augWork.del_fmc_objects(type_='network',,obj_type='all')
# augWork.del_fmc_objects(type_='network',obj_type='all')
Loading

0 comments on commit 3f54dfb

Please sign in to comment.