Skip to content

Commit

Permalink
Merge pull request #66 from OWASP/dev
Browse files Browse the repository at this point in the history
Dev RELEASE: v0.15.5
  • Loading branch information
dmdhrumilmistry authored Mar 20, 2024
2 parents a0990d8 + 6d6446a commit 1eaadfc
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 53 deletions.
5 changes: 4 additions & 1 deletion src/offat/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
load_dotenv()

app = FastAPI(
title='OFFAT - API'
title='OFFAT - API',
servers=[{
'url':'http://localhost:8000',
}],
)

auth_secret_key = environ.get(
Expand Down
51 changes: 37 additions & 14 deletions src/offat/parsers/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class OpenAPIv3Parser(BaseParser):
'''OpenAPI v3 Spec File Parser'''
# while adding new method to this class, make sure same method is present in SwaggerParser class


def __init__(self, file_or_url: str, spec: dict | None = None) -> None:
super().__init__(file_or_url=file_or_url, spec=spec) # noqa
if not self.is_v3:
Expand All @@ -25,6 +26,7 @@ def __init__(self, file_or_url: str, spec: dict | None = None) -> None:

self.request_response_params = self._get_request_response_params()


def _populate_hosts(self):
servers = self.specification.get('servers', [])
hosts = []
Expand All @@ -41,6 +43,7 @@ def _populate_hosts(self):
self.hosts = hosts
self.host = self.hosts[0]


def _get_scheme(self):
servers = self.specification.get('servers', [])
schemes = []
Expand All @@ -50,6 +53,21 @@ def _get_scheme(self):
scheme = 'https' if 'https' in schemes else 'http'
return scheme


def _fetch_schema_from_spec(self, param_schema_ref:str) -> dict:
schema_spec_path = param_schema_ref.split('/')[1:]

if len(schema_spec_path) > 3:
logger.error('Schema spec $ref path should not be greater than 3 (excluding #)')
return {}

schema_data:dict = self.specification
for child_ele in schema_spec_path:
schema_data:dict = schema_data.get(child_ele, {})

return schema_data


def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
param_schema = param.get('schema')
Expand All @@ -58,9 +76,7 @@ def _get_param_definition_schema(self, param: dict):
if param_schema:
param_schema_ref = param_schema.get('$ref')
if param_schema_ref:
model_slug = param_schema_ref.split('/')[-1]
param_schema = self.specification.get(
'components', {}).get('schemas', {}).get(model_slug, {}) # model schema
param_schema = self._fetch_schema_from_spec(param_schema_ref)

return param_schema

Expand All @@ -75,20 +91,27 @@ def _get_response_definition_schema(self, responses: dict):
'''
for status_code in responses.keys():
# below line could return: ["application/json", "application/xml"]
status_code_content_type_response = responses[status_code]['content'].keys()

for status_code_content_type in status_code_content_type_response:
status_code_content = responses[status_code]['content'][status_code_content_type].keys(
)
if 'parameters' in status_code_content:
# done
responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type])
content = responses[status_code].get('content', None)

if content:
status_code_content_type_responses = content.keys()
for status_code_content_type in status_code_content_type_responses:
status_code_content = responses[status_code]['content'][status_code_content_type].keys()
if 'parameters' in status_code_content:
responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type])

else:
# Fetch $ref schema directly
ref = responses[status_code].get('$ref', None)
if ref:
responses[status_code]['schema'] = self._fetch_schema_from_spec(ref)

return responses


def _get_request_response_params(self):
'''Returns Schema of requests and response params
Expand Down
2 changes: 1 addition & 1 deletion src/offat/parsers/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class InvalidSpecVersion(Exception):
class BaseParser:
def __init__(self, file_or_url: str, spec: dict = None) -> None:
if spec:
self.specification = spec
self.specification:dict = spec
base_uri = ""
else:
self.specification, base_uri = read_from_filename(file_or_url)
Expand Down
71 changes: 51 additions & 20 deletions src/offat/report/generator.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,31 @@
"""
Report Generator utils
"""
from copy import deepcopy
from html import escape
from json import dumps as json_dumps
from offat.report import templates
from os.path import dirname, join as path_join
from os import makedirs
from rich.table import Table
from yaml import dump as yaml_dump

from .templates.table import TestResultTable
from ..logger import logger, console
from ..report import templates


class ReportGenerator:
"""can generate report in json,yaml,table and html formats"""

@staticmethod
def generate_html_report(results: list[dict]):
"""generates html report from OFFAT results"""
html_report_template_file_name = 'report.html'
html_report_file_path = path_join(
dirname(templates.__file__), html_report_template_file_name)
dirname(templates.__file__), html_report_template_file_name
)

with open(html_report_file_path, 'r') as f:
with open(html_report_file_path, 'r', encoding='utf-8') as f:
report_file_content = f.read()

# TODO: validate report data to avoid HTML injection attacks.
Expand All @@ -27,7 +34,7 @@ def generate_html_report(results: list[dict]):

# HTML escape data
escaped_results = []
escape_keys = ["response_body"]
escape_keys = ['response_body']
for result_dict in results:
escaped_result_dict = {}
for key, value in result_dict.items():
Expand All @@ -40,64 +47,88 @@ def generate_html_report(results: list[dict]):
escaped_results.append(escaped_result_dict)

report_file_content = report_file_content.replace(
'{ results }', json_dumps(escaped_results))
'{ results }', json_dumps(escaped_results)
)

return report_file_content

@staticmethod
def handle_report_format(results: list[dict], report_format: str | None) -> str | Table:
def handle_report_format(
results: list[dict], report_format: str | None
) -> str | Table:
"""returns report in specified format"""
result = None

match report_format:
case 'html':
logger.warning('HTML output format displays only basic data.')
result = ReportGenerator.generate_html_report(results=results)
case 'yaml':
logger.warning('YAML output format needs to be sanitized before using it further.')
result = yaml_dump({
'results': results,
})
logger.warning(
'YAML output format needs to be sanitized before using it further.'
)
result = yaml_dump(
{
'results': results,
}
)
case 'json':
report_format = 'json'
result = json_dumps({
'results': results,
})
result = json_dumps(
{
'results': results,
}
)
case _: # default: CLI table
# TODO: filter failed requests first and then create new table for failed requests
report_format = 'table'
results_table = TestResultTable().generate_result_table(
deepcopy(results))
deepcopy(results)
)
result = results_table

logger.info('Generated %s format report.', report_format.upper())
return result

@staticmethod
def save_report(report_path: str | None, report_file_content: str | Table | None):
"""saves/prints report to console"""
if report_path != '/' and report_path:
dir_name = dirname(report_path)
if dir_name != '' and report_path:
makedirs(dir_name, exist_ok=True)

# print to cli if report path and file content as absent else write to file location.
if report_path and report_file_content and not isinstance(report_file_content, Table):
with open(report_path, 'w') as f:
if (
report_path
and report_file_content
and not isinstance(report_file_content, Table)
):
with open(report_path, 'w', encoding='utf-8') as f:
logger.info('Writing report to file: %s', report_path)
f.write(report_file_content)
else:
if isinstance(report_file_content, Table) and report_file_content.columns:
TestResultTable().print_table(report_file_content)
elif isinstance(report_file_content, Table) and not report_file_content.columns:
elif (
isinstance(report_file_content, Table)
and not report_file_content.columns
):
logger.warning('No Columns found in Table.')
else:
console.print(report_file_content)

@staticmethod
def generate_report(results: list[dict], report_format: str | None, report_path: str | None):
def generate_report(
results: list[dict], report_format: str | None, report_path: str | None
):
"""main function used to generate report"""
if report_path:
report_format = report_path.split('.')[-1]

formatted_results = ReportGenerator.handle_report_format(
results=results, report_format=report_format)
results=results, report_format=report_format
)
ReportGenerator.save_report(
report_path=report_path, report_file_content=formatted_results)
report_path=report_path, report_file_content=formatted_results
)
2 changes: 1 addition & 1 deletion src/offat/tester/tester_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def is_host_up(openapi_parser: SwaggerParser | OpenAPIv3Parser) -> bool:
case _:
proto = http_client.HTTPConnection

logger.info("Checking whether host %s:%d is available", host, port)
logger.info("Checking whether host %s:%s is available", host, port)
try:
conn = proto(host=host, port=port, timeout=5)
conn.request("GET", "/")
Expand Down
2 changes: 1 addition & 1 deletion src/offat/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ def is_valid_url(url: str) -> bool:
Any exception occurred during operation
'''
url_regex = re_compile(
r'https?://(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)'
r'https?:\/\/[a-z.-]+(:\d+)?.*'
)
return bool(match(url_regex, url))
28 changes: 14 additions & 14 deletions src/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "offat"
version = "0.15.4"
version = "0.15.5"
description = "Offensive API tester tool automates checks for common API vulnerabilities"
authors = ["Dhrumil Mistry <dhrumil.mistry@owasp.org>"]
license = "MIT"
Expand Down

0 comments on commit 1eaadfc

Please sign in to comment.