Skip to content

Commit

Permalink
feat: subdomain matching
Browse files Browse the repository at this point in the history
  • Loading branch information
ppfeister committed Jul 27, 2024
1 parent 727ac99 commit e27d35e
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 18 deletions.
24 changes: 24 additions & 0 deletions src/oculus/data/site_patterns.json
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,30 @@
}
]
},
"ppy.sh": {
"wildcard_subdomain": false,
"friendly_name": "ppy.sh",
"subdomains": {
"osu": {
"friendly_name": "osu!",
"wildcard_subdomain": true,
"self": [
"&quot;country&quot.+?name&quot;:&quot;(?P<country>.+?)&quot;}",
"&quot;username&quot;:&quot;(?P<uid>.+?)&quot;"
],
"patterns": [
{
"sequence": {
"1": "&quot;previous_usernames&quot;:\\[(?P<next>.+?)\\]",
"2": "&quot;(?P<uid>.+?)&quot;"
},
"validation_type": "social",
"platform_name": "osu!"
}
]
}
}
},
"tryhackme.com": {
"wildcard_subdomain": true,
"friendly_name": "TryHackMe",
Expand Down
53 changes: 35 additions & 18 deletions src/oculus/helpers/pattern_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,27 @@ def search(self, url:str, body:str=None, query:str=None, preexisting:pd.DataFram
url = re.sub(__url_normalization_pattern__, '', url)


def _search_patterns(pattern:str):
captures = re.search(pattern['pattern'], body)
def _search_patterns(pattern:Dict):
if 'sequence' in pattern:
steps = len(pattern['sequence'])
last_match = body
for step in range(1, steps):
last_match = re.search(pattern['sequence'][f'{step}'], body)['next']
if last_match is None or last_match == '':
return
else:
if last_match:
captures = re.search(pattern['sequence'][f'{steps}'], last_match)

else:
captures = re.search(pattern['pattern'], body)

if captures:
new_item:Dict = {}

if pattern['validation_type'] == 'social':
new_item['platform_name'] = pattern['platform_name']
if 'uid' in captures.groupdict():
if captures.group('uid') == 'f':
print(body)
new_item['username'] = captures.group('uid')
if 'url' in captures.groupdict():
normalized_url = re.sub(__url_normalization_pattern__, '', captures.group('url'))
Expand Down Expand Up @@ -135,16 +146,18 @@ def _search_desirables(url:str) -> Dict[str, str]:

if root_domain not in self.pattern_data:
return pd.DataFrame()

current_pattern_data = self.pattern_data[root_domain]

if (not body and query) or 'custom_url' in self.pattern_data[root_domain]:
if 'custom_url' in self.pattern_data[root_domain]:
url = self.pattern_data[root_domain]['custom_url'].format(QUERY=query)
if (not body and query) or 'custom_url' in current_pattern_data:
if 'custom_url' in current_pattern_data:
url = current_pattern_data['custom_url'].format(QUERY=query)
else:
url = url.format(query)

headers: Dict[str, str] = {}
if 'headers' in self.pattern_data[root_domain]:
for header, value in self.pattern_data[root_domain]['headers'].items():
if 'headers' in current_pattern_data:
for header, value in current_pattern_data['headers'].items():
# Must iterate so as to not overwrite possibly pre-existing headers
headers[header] = value

Expand All @@ -153,20 +166,24 @@ def _search_desirables(url:str) -> Dict[str, str]:
return pd.DataFrame()
body = response.text
elif not body and not query:
raise RequestError(f'Not enough information for pattern matching {self.pattern_data[root_domain]["friendly_name"]}')
raise RequestError(f'Not enough information for pattern matching {current_pattern_data["friendly_name"]}')

if not self.pattern_data[root_domain]['wildcard_subdomain']:
# TODO add support for subdomain differentials
return pd.DataFrame()
if not current_pattern_data['wildcard_subdomain']:
if 'subdomains' not in current_pattern_data:
raise json.JSONDecodeError(f'Domain {root_domain} set as not wildcard but has no subdomains', '', 0)
if split_url.subdomain not in current_pattern_data['subdomains']:
pass # TODO Should we default to root patterns or just skip?
else:
current_pattern_data = current_pattern_data['subdomains'][split_url.subdomain]

new_data:List[Dict] = []

if 'self' in self.pattern_data[root_domain]:
if 'self' in current_pattern_data:
self_scrape_data:Dict = {}
for pattern in self.pattern_data[root_domain]['self']:
for pattern in current_pattern_data['self']:
captures = re.search(pattern, body, re.MULTILINE)
if captures:
self_scrape_data['platform_name'] = self.pattern_data[root_domain]['friendly_name']
self_scrape_data['platform_name'] = current_pattern_data['friendly_name']
self_scrape_data['platform_url'] = url
if 'uid' in captures.groupdict():
self_scrape_data['username'] = captures.group('uid')
Expand Down Expand Up @@ -199,8 +216,8 @@ def _search_desirables(url:str) -> Dict[str, str]:
if self_scrape_data:
new_data.append(self_scrape_data)

if 'patterns' in self.pattern_data[root_domain]:
for pattern in self.pattern_data[root_domain]['patterns']:
if 'patterns' in current_pattern_data:
for pattern in current_pattern_data['patterns']:
_search_patterns(pattern)

soup = BeautifulSoup(body, 'html.parser')
Expand Down

0 comments on commit e27d35e

Please sign in to comment.