Skip to content

Commit

Permalink
Merge pull request #49 from OObasuyi/v.2.1-dev
Browse files Browse the repository at this point in the history
V.2.1 dev
  • Loading branch information
OObasuyi authored Sep 6, 2022
2 parents f6e1dd8 + cd859e4 commit a7cbe06
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 34 deletions.
1 change: 1 addition & 0 deletions fw_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ def collapse_fmc_rules(self,comment:str=False,recover:bool=False):
rollback_acp = pickle.load(save_rule)
self.logfmc.debug(f'recovered {recovery_loc} file')
acp_rules = rollback_acp
# todo: need to let the user chose if they want to optimze the config are just insert the old config from the recover file
else:
# in case we fail our rule test or error happens while processing
rollback_acp = acp_rules.copy()
Expand Down
65 changes: 43 additions & 22 deletions fw_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,29 +357,47 @@ def zone_to_ip_information(self):
return route_zone_info

def find_nested_group_objects(self, object_item):
item_holder = []
try:
object_item = object_item.get('objects')
if not isinstance(object_item, list):
return object_item
item_holder = []
for obj_info in object_item:
if 'group' not in obj_info.get('type').lower():
item_holder.append(obj_info.get('name'))
elif 'group' in obj_info.get('type').lower():
if 'port' not in obj_info.get('type').lower():
for i in self.net_group_object:
if i[0] == obj_info['name']:
for v in i[1]:
for ip in self.net_data:
if v == ip[0]:
item_holder.append(ip[0])
# handle literals
for k in object_item.keys():
if k == 'objects':
if not isinstance(object_item[k], list):
item_holder.append(object_item[k])
else:
for i in self.port_group_object:
if obj_info.get('name') == i[0]:
for v in i[1]:
for ports in self.port_data:
if v[0] == ports[0]:
item_holder.append(ports[0])
for obj_info in object_item[k]:
if 'group' not in obj_info.get('type').lower():
if obj_info.get('name') is not None:
item_holder.append(obj_info['name'])

elif 'group' in obj_info.get('type').lower():
if 'port' not in obj_info.get('type').lower():
for i in self.net_group_object:
if i[0] == obj_info['name']:
for v in i[1]:
for ip in self.net_data:
if v == ip[0]:
item_holder.append(ip[0])
else:
for i in self.port_group_object:
if obj_info.get('name') == i[0]:
for v in i[1]:
for ports in self.port_data:
if v[0] == ports[0]:
item_holder.append(ports[0])
elif k == 'literals':
if not isinstance(object_item[k], list):
item_holder.append(object_item[k])
else:
for obj_info in object_item[k]:
if obj_info.get('value') is not None:
item_holder.append(obj_info['value'])
elif obj_info.get('port') is not None:
if obj_info.get('protocol') == '6':
item_holder.append(f'TCP:{obj_info.get("port")}')
elif obj_info.get('protocol') == '17':
item_holder.append(f'UDP:{obj_info.get("port")}')

if len(item_holder) == 1:
return item_holder[0]
sorted(item_holder)
Expand Down Expand Up @@ -465,8 +483,11 @@ def find_inter_dup_policies(self, ruleset):
# zone
if ruleset['source_zone'][idx] == cur_src_z and ruleset['destination_zone'][idx] == cur_dst_z:
quondam += 1
# action
if current_ruleset['action'][i] == self.ruleset_type:
quondam += 1

if quondam >= 4:
if quondam >= 5:
idx_collector.append(idx)

idx_collector = list(set(idx_collector))
Expand Down
6 changes: 3 additions & 3 deletions terminal_entry.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ def terminal_entry():
raise ValueError('RuleSet_type must be either allow or deny')

if config_file.get('rule_cleanup'):
fb = FireBroom(access_policy=config_file.get('access_policy'), ftd_host=config_file.get('firewall_sensor'),
fb = FireBroom(cred_file=args.cred_file,access_policy=config_file.get('access_policy'), ftd_host=config_file.get('firewall_sensor'),
fmc_host=config_file.get('management_center'), rule_prepend_name=config_file.get('rule_prepend_name'),
zone_of_last_resort=config_file.get('zone_of_last_resort'), same_cred=config_file.get('same_creds'),
strict_checkup=config_file.get('strict_checkup'))
strict_checkup=config_file.get('strict_checkup'),ruleset_type=config_file.get('ruleset_type'))

fb.collapse_fmc_rules(comment=config_file.get('comment'), recover=config_file.get('recovery_mode'))
fb.collapse_fmc_rules(comment=config_file.get('rule_comment'), recover=config_file.get('recovery_mode'))
else:
fm = FireStick(cred_file=args.cred_file, ippp_location=config_file.get('ippp_location'), access_policy=config_file.get('access_policy'),
rule_prepend_name=config_file.get('rule_prepend_name'), fmc_host=config_file.get('management_center'), ftd_host=config_file.get('firewall_sensor'),
Expand Down
57 changes: 48 additions & 9 deletions utilites.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,21 +126,60 @@ def transform_acp(current_ruleset,self_instance):
for i in current_ruleset:
subset_rule = {}
subset_rule['policy_name'] = i.get('name')
subset_rule['action'] = i.get('action')
subset_rule['src_z'] = self_instance.find_nested_group_objects(i.get('sourceZones'))
subset_rule['dst_z'] = self_instance.find_nested_group_objects(i.get('destinationZones'))
subset_rule['source'] = self_instance.find_nested_group_objects(i.get('sourceNetworks'))
subset_rule['destination'] = self_instance.find_nested_group_objects(i.get('destinationNetworks'))
subset_rule['port'] = self_instance.find_nested_group_objects(i.get('destinationPorts'))
if 'strict_checkup' in self_instance.pass_thru_commands and self_instance.pass_thru_commands.get('strict_checkup'):
if i.get('destinationPorts')['objects'][0]['type'] == 'ProtocolPortObject':
for port_item in self_instance.port_data:
if port_item[0] == i.get('destinationPorts')['objects'][0]['name']:
subset_rule['real_port'] = [f'{port_item[1]}:{port_item[2]}']
elif i.get('destinationPorts')['objects'][0]['type'] == 'PortObjectGroup':
for port_item in self_instance.port_group_object:
if port_item[0] == i.get('destinationPorts')['objects'][0]['name']:
# recurvsly look through the port objects for its names and get real port mapping from the port_data,
subset_rule['real_port'] = [f'{port_item[1]}:{port_item[2]}' for port_list_item in port_item[1] for port_item in self_instance.port_data if port_item[0] == port_list_item[0]]
strict_holder = []
# changed to get since port can be NONE value AKA 'any' in the Rules

if i.get('destinationPorts') is not None:
real_dst_ports = i.get('destinationPorts')
for k in real_dst_ports.keys():
if k == 'literals':
for port_item in real_dst_ports[k]:
if port_item.get('port') is not None:
if port_item.get('protocol') == '6':
real_port = f'TCP:{port_item.get("port")}'
strict_holder.append(real_port)
elif port_item.get('protocol') == '17':
real_port = f'UDP:{port_item.get("port")}'
strict_holder.append(real_port)
elif k == 'objects':
for obj_item in real_dst_ports[k]:
if obj_item.get('type') == 'ProtocolPortObject':
for port_item in self_instance.port_data:
if port_item[0] == obj_item['name']:
real_port = [f'{port_item[1]}:{port_item[2]}']
strict_holder.append(real_port)
elif obj_item.get('type') == 'PortObjectGroup':
for port_item in self_instance.port_group_object:
if port_item[0] == obj_item['name']:
# recurvsly look through the port objects for its names and get real port mapping from the port_data
for port_list_item in port_item[1]:
for port_item in self_instance.port_data:
if port_item[0] == port_list_item[0]:
real_port = [f'{port_item[1]}:{port_item[2]}']
strict_holder.append(real_port)
if len(strict_holder) == 1:
if not isinstance(next(iter(strict_holder)),list):
subset_rule['real_port'] = strict_holder[0]
else:
subset_rule['real_port'] = [i for i in strict_holder[0]]
else:
save_list = []
for i in strict_holder:
if isinstance(i,list):
for inner_i in i:
save_list.append(inner_i)
else:
save_list.append(i)
subset_rule['real_port'] = save_list
else:
subset_rule['real_port'] = None

changed_ruleset.append(subset_rule)
current_ruleset = changed_ruleset
Expand Down

0 comments on commit a7cbe06

Please sign in to comment.