Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/EAS-2086 : CSRT's Authentication scanner flags static resources #72

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 84 additions & 55 deletions analyzers/descriptor_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,71 @@ def __init__(self, desc_scan: DescriptorResult, requirements: Requirements):
self.scan = desc_scan
self.reqs = requirements

def _get_index_len(self, link):
return len(self.scan.scan_results[link])

def _check_cache_headers(self) -> Tuple[bool, List[str]]:
passed = True
proof: List[str] = []
scan_res = self.scan.scan_results
for link in scan_res:
cache_headers = scan_res[link].cache_header.split(',')
cache_headers = [x.strip().lower() for x in cache_headers]
directives = all(item in cache_headers for item in REQ_CACHE_HEADERS)
if not directives:
proof.append(f"{link} | Cache header: {scan_res[link].cache_header}")
passed = passed and directives
for i in range(self._get_index_len(link)):
cache_headers = scan_res[link][i].cache_header.split(',')
cache_headers = [x.strip().lower() for x in cache_headers]
directives = all(item in cache_headers for item in REQ_CACHE_HEADERS)
if not directives:
proof.append(f"{link} | Cache header: {scan_res[link][i].cache_header}")
passed = passed and directives

return passed, proof

def _check_referrer_headers(self) -> Tuple[bool, List[str]]:
passed = True
proof: List[str] = []
scan_res = self.scan.scan_results

for link in scan_res:
ref_headers = scan_res[link].referrer_header.split(',')
ref_headers = [x.strip().lower() for x in ref_headers]
policy = ref_headers[0] not in REF_DENYLIST if ref_headers[0] != 'header missing' else False
if not policy:
proof.append(f"{link} | Referrer header: {scan_res[link].referrer_header}")
passed = passed and policy
for i in range(self._get_index_len(link)):
scan_res = self.scan.scan_results[link][i]
ref_headers = scan_res.referrer_header.split(',')
ref_headers = [x.strip().lower() for x in ref_headers]
policy = ref_headers[0] not in REF_DENYLIST if ref_headers[0] != 'header missing' else False
if not policy:
proof.append(f"{link} | Referrer header: {scan_res.referrer_header}")
passed = passed and policy

return passed, proof

def _check_cookie_headers(self) -> Tuple[bool, List[str]]:
passed = True
proof: List[str] = []
scan_res = self.scan.scan_results
scan_res = self.scan.scan_results
for link in scan_res:
cookies = scan_res[link].session_cookies
for cookie in cookies:
# Parsing the cookie string became messy, so we use a regex to match and tear
# the string apart into its relevant pieces
parsed = re.match(COOKIE_PARSE, cookie)
secure = bool(util.strtobool(parsed.group(3)))
httponly = bool(util.strtobool(parsed.group(4)))

if not secure or not httponly:
proof.append(f"{link} | Cookie: {cookie}")
passed = passed and secure and httponly
for i in range(self._get_index_len(link)):
cookies = scan_res[link][i].session_cookies
for cookie in cookies:
# Parsing the cookie string became messy, so we use a regex to match and tear
# the string apart into its relevant pieces
parsed = re.match(COOKIE_PARSE, cookie)
secure = bool(util.strtobool(parsed.group(3)))
httponly = bool(util.strtobool(parsed.group(4)))

if not secure or not httponly:
proof.append(f"{link} | Cookie: {cookie}")
passed = passed and secure and httponly

return passed, proof

def check_same_response(self, index_none, index_fake) -> bool:
# check that content and response code are the same
if index_none >= len(self.result_list) or index_fake >= len(self.result_list):
return False
res_none = self.result_list[index_none]
res_fake = self.result_list[index_fake]
if not res_fake or not res_none:
return False
return int(res_none.res_code) == 200 and int(res_fake.res_code) == 200 and res_fake.response == res_none.response

def _check_authn_authz(self) -> Tuple[bool, List[str], bool, List[str], bool, List[str]]:
passed = True
proof: List[str] = []
Expand All @@ -84,41 +101,53 @@ def _check_authn_authz(self) -> Tuple[bool, List[str], bool, List[str], bool, Li
proof.append(NO_AUTH_PROOF)
return passed, proof, signed_install_passed, signed_install_proof, authz_passed, authz_proof

invalid_response = False
authz_passed = True
for link in scan_res:
res_code = int(scan_res[link].res_code) if scan_res[link].res_code else 0
auth_header = scan_res[link].auth_header
req_method = scan_res[link].req_method
response = scan_res[link].response
authz_req_method = scan_res[link].authz_req_method
authz_code = int(scan_res[link].authz_code) if scan_res[link].authz_code else 0
authz_header = scan_res[link].authz_header
self.result_list = scan_res[link]

if self.check_same_response(0, 1) or self.check_same_response(2, 1) or \
gersbach marked this conversation as resolved.
Show resolved Hide resolved
self.check_same_response(3, 4) or self.check_same_response(5, 4):
continue

for res_index in range(len(self.result_list)):
result = self.result_list[res_index]
res_code = int(result.res_code) if result.res_code else 0
auth_header = result.auth_header
req_method = result.req_method
response = result.response
authz_req_method = result.authz_req_method
authz_code = int(result.authz_code) if result.authz_code else 0
authz_header = result.authz_header

# Check for invalid responses in the body before failing the authn check
invalid_responses = ['Invalid JWT', 'unauthorized', 'forbidden', 'error', 'unlicensed', 'not licensed',
'no license', 'invalid', '401', '403', '404', '500']
invalid_response = False
if any(str(x).lower() in str(response).lower() for x in invalid_responses):
invalid_response = True

# We shouldn't be able to visit this link if the app uses authentication.
if res_code >= 200 and res_code < 400 and not invalid_response:
if any(x in link for x in ('installed', 'install', 'uninstalled', 'uninstall')):
signed_install_passed = False
signed_install_proof_text = f"Lifecycle endpoint: {link} | Res Code: {res_code}" \
f" Auth Header: {auth_header}"
signed_install_proof.append(signed_install_proof_text)
invalid_responses = ['Invalid JWT', 'unauthorized', 'forbidden', 'error', 'unlicensed', 'not licensed',
'no license', 'invalid', '401', '403', '404', '500']

if any(str(x).lower() in str(response).lower() for x in invalid_responses):
invalid_response = True
else:
passed = False
proof_text = f"{link} | Res Code: {res_code} Req Method: {req_method} Auth Header: {auth_header}"
proof.append(proof_text)

# similarly check for authorization status codes for authorization bypass
if authz_code >= 200 and authz_code < 400:
authz_passed = False
authz_proof_text = (f"{link} | Authz Res Code: {authz_code} Req Method: {authz_req_method}"
f" Authz Header: {authz_header}")
authz_proof.append(authz_proof_text)
invalid_response = False

# We shouldn't be able to visit this link if the app uses authentication.
if res_code >= 200 and res_code < 400 and not invalid_response:
if any(x in link for x in ('installed', 'install', 'uninstalled', 'uninstall')):
signed_install_passed = False
signed_install_proof_text = f"Lifecycle endpoint: {link} | Res Code: {res_code}" \
f" Auth Header: {auth_header}"
signed_install_proof.append(signed_install_proof_text)

else:
passed = False
proof_text = f"{link} | Res Code: {res_code} Req Method: {req_method} Auth Header: {auth_header}"
proof.append(proof_text)

# similarly check for authorization status codes for authorization bypass
if authz_code >= 200 and authz_code < 400:
authz_passed = False
authz_proof_text = (f"{link} | Authz Res Code: {authz_code} Req Method: {authz_req_method}"
f" Authz Header: {authz_header}")
authz_proof.append(authz_proof_text)

if passed:
proof.append(VALID_AUTH_PROOF)
Expand All @@ -132,7 +161,7 @@ def analyze(self, authz_only=False) -> Requirements:
ref_passed, ref_proof = self._check_referrer_headers()
cookies_passed, cookies_proof = self._check_cookie_headers()
(auth_passed, auth_proof, signed_install_passed, signed_install_proof,
authz_passed, authz_proof) = self._check_authn_authz()
authz_passed, authz_proof) = self._check_authn_authz()

req1_1 = RequirementsResult(
passed=auth_passed,
Expand Down
2 changes: 1 addition & 1 deletion models/descriptor_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ class DescriptorResult(JsonObject):
scopes = ListProperty(StringProperty())
links = ListProperty(StringProperty())
link_errors = DictProperty()
scan_results = DictProperty(DescriptorLink)
scan_results = DictProperty(ListProperty(DescriptorLink))
response = StringProperty()
41 changes: 22 additions & 19 deletions scans/descriptor_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import string
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from typing import List, Optional, Tuple, Union
from typing import Optional, Tuple, Union, List
from urllib.parse import urlparse

import jwt
Expand Down Expand Up @@ -314,8 +314,8 @@ def _visit_link(self, link: str) -> Optional[requests.Response]:
]

res: Optional[requests.Response] = None
all_res: List[Optional[requests.Response]] = []
for task in tasks:

# Gracefully handle links that result in an exception, report them via warning, and skip any further tests
try:
# If we are requesting a lifecycle event, ensure we perform signed-install authentication check
Expand All @@ -334,7 +334,7 @@ def _visit_link(self, link: str) -> Optional[requests.Response]:
logging.debug(f"Requesting {link} via {task['method']} with auth: {task['headers']=}")
res = self.session.request(task['method'], link, headers=task['headers'])
if res.status_code < 400:
break
all_res.append(res)
if res.status_code == 503:
logging.warning(
f"{link} caused a 503 status. Run with --debug for more information. Skipping endpoint...",
Expand All @@ -358,7 +358,7 @@ def _visit_link(self, link: str) -> Optional[requests.Response]:
self.link_errors['exceptions'] += [f"{link}"]
return None

return res
return all_res

def _get_session_cookies(self, cookiejar: requests.cookies.RequestsCookieJar) -> List[str]:
res: List[str] = []
Expand All @@ -383,8 +383,10 @@ def scan(self, user_jwt: str = None) -> DescriptorResult:
scan_results={}
)
scan_res = defaultdict()

for link in self.links:
r = self._visit_link(link)
all_analysis = []
scan_results = self._visit_link(link)

# If we are testing an admin restricted link, perform Authorization check
authz_res = None
Expand All @@ -393,22 +395,23 @@ def scan(self, user_jwt: str = None) -> DescriptorResult:
logging.debug(f"Found and tested admin link for Authorization issue: {link} |"
f" Result: {authz_res.status_code if authz_res else None}")

if r or authz_res:
scan_res[link] = DescriptorLink(
cache_header=r.headers.get('Cache-Control', 'Header missing'),
referrer_header=r.headers.get('Referrer-Policy', 'Header missing'),
session_cookies=self._get_session_cookies(r.cookies),
auth_header=r.request.headers.get('Authorization', None),
req_method=r.request.method,
res_code=str(r.status_code),
response=str(r.text),
authz_req_method=authz_res.request.method if authz_res else None,
authz_code=str(authz_res.status_code) if authz_res else None,
authz_header=str(authz_res.request.headers.get('Authorization', None)) if authz_res else None,
)
for r in scan_results:
if r or authz_res:
all_analysis.append(DescriptorLink(
cache_header=r.headers.get('Cache-Control', 'Header missing'),
referrer_header=r.headers.get('Referrer-Policy', 'Header missing'),
session_cookies=self._get_session_cookies(r.cookies),
auth_header=r.request.headers.get('Authorization', None),
req_method=r.request.method,
res_code=str(r.status_code),
response=str(r.text),
authz_req_method=authz_res.request.method if authz_res else None,
authz_code=str(authz_res.status_code) if authz_res else None,
authz_header=str(authz_res.request.headers.get('Authorization', None)) if authz_res else None,
))
scan_res[link] = all_analysis

res.scan_results = scan_res
res.link_errors = self.link_errors

logging.info(f"Descriptor scan complete, found and visited {len(self.links)} links")
return res
Loading
Loading