Skip to content

Commit

Permalink
Merge pull request #49 from OWASP/dev
Browse files Browse the repository at this point in the history
Dev RELEASE: v0.15.0
  • Loading branch information
dmdhrumilmistry authored Feb 3, 2024
2 parents 5fc2457 + adad0f0 commit 90e93f1
Show file tree
Hide file tree
Showing 18 changed files with 618 additions and 518 deletions.
4 changes: 2 additions & 2 deletions src/offat/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from .config_data_handler import validate_config_file_data
from .tester.tester_utils import generate_and_run_tests
from .openapi import OpenAPIParser
from .parsers import create_parser
from .utils import get_package_version, headers_list_to_dict, read_yaml


Expand Down Expand Up @@ -66,7 +66,7 @@ def start():
test_data_config = validate_config_file_data(test_data_config)

# parse args and run tests
api_parser = OpenAPIParser(args.fpath)
api_parser = create_parser(args.fpath)
generate_and_run_tests(
api_parser=api_parser,
regex_pattern=args.path_regex_pattern,
Expand Down
6 changes: 3 additions & 3 deletions src/offat/api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from offat.api.jobs import scan_api
from offat.api.models import CreateScanModel
from offat.logger import logger
from os import uname, environ
# from os import uname, environ


logger.info('Secret Key: %s', auth_secret_key)
Expand All @@ -17,8 +17,8 @@
async def root():
return {
"name": "OFFAT API",
"project": "https://github.com/dmdhrumilmistry/offat",
"license": "https://github.com/dmdhrumilmistry/offat/blob/main/LICENSE",
"project": "https://github.com/OWASP/offat",
"license": "https://github.com/OWASP/offat/blob/main/LICENSE",
}


Expand Down
5 changes: 2 additions & 3 deletions src/offat/api/jobs.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
from sys import exc_info
from offat.api.models import CreateScanModel
from offat.tester.tester_utils import generate_and_run_tests
from offat.openapi import OpenAPIParser
from offat.parsers import create_parser
from offat.logger import logger


def scan_api(body_data: CreateScanModel):
try:
api_parser = OpenAPIParser(fpath_or_url=None, spec=body_data.openAPI)
api_parser = create_parser(fpath_or_url=None, spec=body_data.openAPI)

results = generate_and_run_tests(
api_parser=api_parser,
regex_pattern=body_data.regex_pattern,
req_headers=body_data.req_headers,
rate_limit=body_data.rate_limit,
delay=body_data.delay,
test_data_config=body_data.test_data_config,
)
return results
Expand Down
157 changes: 0 additions & 157 deletions src/offat/openapi.py

This file was deleted.

12 changes: 12 additions & 0 deletions src/offat/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from .openapi import OpenAPIv3Parser
from .swagger import SwaggerParser
from .parser import BaseParser


def create_parser(fpath_or_url: str, spec: dict = None) -> SwaggerParser | OpenAPIv3Parser:
'''returns parser based on doc file'''
parser = BaseParser(file_or_url=fpath_or_url, spec=spec)
if parser.is_v3:
return OpenAPIv3Parser(file_or_url=fpath_or_url, spec=spec)

return SwaggerParser(fpath_or_url=fpath_or_url, spec=spec)
147 changes: 147 additions & 0 deletions src/offat/parsers/openapi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
'''
module to parse OAS v3 documentation JSON/YAML files.
'''
from .parser import BaseParser


class InvalidOpenAPIv3File(Exception):
'''Exception to be raised when openAPI/OAS spec validation fails'''


class OpenAPIv3Parser(BaseParser):
'''OpenAPI v3 Spec File Parser'''
# while adding new method to this class, make sure same method is present in SwaggerParser class

def __init__(self, file_or_url: str, spec: dict | None = None) -> None:
super().__init__(file_or_url=file_or_url, spec=spec) # noqa
if not self.is_v3:
raise InvalidOpenAPIv3File("Invalid OAS v3 file")

self._populate_hosts()
self.http_scheme = self._get_scheme()
self.api_base_path = self.specification.get('basePath', '')
self.base_url = f"{self.http_scheme}://{self.host}"

self.request_response_params = self._get_request_response_params()

def _populate_hosts(self):
servers = self.specification.get('servers', [])
hosts = []
for server in servers:
host = server.get('url', '').removeprefix(
'https://').removeprefix('http://').removesuffix('/')
host = None if host == '' else host
hosts.append(host)

self.hosts = hosts
self.host = self.hosts[0]

def _get_scheme(self):
servers = self.specification.get('servers', [])
schemes = []
for server in servers:
schemes.append('https' if 'https://' in server.get('url', '') else 'http')

scheme = 'https' if 'https' in schemes else 'http'
return scheme

def _get_param_definition_schema(self, param: dict):
'''Returns Model defined schema for the passed param'''
param_schema = param.get('schema')

# replace schema $ref with model params
if param_schema:
param_schema_ref = param_schema.get('$ref')
if param_schema_ref:
model_slug = param_schema_ref.split('/')[-1]
param_schema = self.specification.get(
'components', {}).get('schemas', {}).get(model_slug, {}) # model schema

return param_schema

def _get_response_definition_schema(self, responses: dict):
'''returns schema of API response
Args:
responses (dict): responses from path http method json data
Returns:
dict:
'''
for status_code in responses.keys():
# below line could return: ["application/json", "application/xml"]
status_code_content_type_response = responses[status_code]['content'].keys()

for status_code_content_type in status_code_content_type_response:
status_code_content = responses[status_code]['content'][status_code_content_type].keys(
)
if 'parameters' in status_code_content:
# done
responses[status_code]['schema'] = responses[status_code]['content'][status_code_content_type]['parameters']
elif 'schema' in status_code_content:
responses[status_code]['schema'] = self._get_param_definition_schema(
responses[status_code]['content'][status_code_content_type])

return responses

def _get_request_response_params(self):
'''Returns Schema of requests and response params
Args:
None
Returns:
list:
'''
requests = []
paths = self.specification.get('paths', {})

# extract endpoints and supported params
for path in paths.keys():
path_params = paths[path].get('parameters', [])

for http_method in paths.get(path, {}).keys():
# consider only http methods
if http_method not in ['get', 'put', 'post', 'delete', 'options']:
continue

request_parameters = paths[path][http_method].get(
'parameters', [])

# create list of parameters: Fetch object schema from OAS file
body_params = []

body_parameter_keys = paths[path][http_method].get(
'requestBody', {}).get('content', {})

for body_parameter_key in body_parameter_keys:
body_parameters_dict = paths[path][http_method]['requestBody']['content'][body_parameter_key]

required = paths[path][http_method]['requestBody'].get('required')
description = paths[path][http_method]['requestBody'].get('description')
body_param = self._get_param_definition_schema(body_parameters_dict)

body_params.append({
'in': 'body',
'name': body_parameter_key,
'description': description,
'required': required,
'schema': body_param,
})

response_params = []
response_params = self._get_response_definition_schema(
paths[path][http_method].get('responses', {}))

# add body param to request param
request_parameters += body_params
requests.append({
'http_method': http_method,
'path': path,
'request_params': request_parameters,
'response_params': response_params,
'path_params': path_params,
'body_params': body_params,
})

return requests
Loading

0 comments on commit 90e93f1

Please sign in to comment.