Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix/EAS-2086 : CSRT's Authentication scanner flags static resources #72

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 90 additions & 41 deletions analyzers/descriptor_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,71 @@ def __init__(self, desc_scan: DescriptorResult, requirements: Requirements):
self.scan = desc_scan
self.reqs = requirements

def _get_index_len(self, link):
return len(self.scan.scan_results[link])

def _check_cache_headers(self) -> Tuple[bool, List[str]]:
passed = True
proof: List[str] = []
scan_res = self.scan.scan_results
for link in scan_res:
cache_headers = scan_res[link].cache_header.split(',')
cache_headers = [x.strip().lower() for x in cache_headers]
directives = all(item in cache_headers for item in REQ_CACHE_HEADERS)
if not directives:
proof.append(f"{link} | Cache header: {scan_res[link].cache_header}")
passed = passed and directives
for i in range(self._get_index_len(link)):
cache_headers = scan_res[link][i].cache_header.split(',')
cache_headers = [x.strip().lower() for x in cache_headers]
directives = all(item in cache_headers for item in REQ_CACHE_HEADERS)
if not directives:
proof.append(f"{link} | Cache header: {scan_res[link][i].cache_header}")
passed = passed and directives

return passed, proof

def _check_referrer_headers(self) -> Tuple[bool, List[str]]:
passed = True
proof: List[str] = []
scan_res = self.scan.scan_results

for link in scan_res:
ref_headers = scan_res[link].referrer_header.split(',')
ref_headers = [x.strip().lower() for x in ref_headers]
policy = ref_headers[0] not in REF_DENYLIST if ref_headers[0] != 'header missing' else False
if not policy:
proof.append(f"{link} | Referrer header: {scan_res[link].referrer_header}")
passed = passed and policy
for i in range(self._get_index_len(link)):
scan_res = self.scan.scan_results[link][i]
ref_headers = scan_res.referrer_header.split(',')
ref_headers = [x.strip().lower() for x in ref_headers]
policy = ref_headers[0] not in REF_DENYLIST if ref_headers[0] != 'header missing' else False
if not policy:
proof.append(f"{link} | Referrer header: {scan_res.referrer_header}")
passed = passed and policy

return passed, proof

def _check_cookie_headers(self) -> Tuple[bool, List[str]]:
passed = True
proof: List[str] = []
scan_res = self.scan.scan_results
scan_res = self.scan.scan_results
for link in scan_res:
cookies = scan_res[link].session_cookies
for cookie in cookies:
# Parsing the cookie string became messy, so we use a regex to match and tear
# the string apart into its relevant pieces
parsed = re.match(COOKIE_PARSE, cookie)
secure = bool(util.strtobool(parsed.group(3)))
httponly = bool(util.strtobool(parsed.group(4)))

if not secure or not httponly:
proof.append(f"{link} | Cookie: {cookie}")
passed = passed and secure and httponly
for i in range(self._get_index_len(link)):
cookies = scan_res[link][i].session_cookies
for cookie in cookies:
# Parsing the cookie string became messy, so we use a regex to match and tear
# the string apart into its relevant pieces
parsed = re.match(COOKIE_PARSE, cookie)
secure = bool(util.strtobool(parsed.group(3)))
httponly = bool(util.strtobool(parsed.group(4)))

if not secure or not httponly:
proof.append(f"{link} | Cookie: {cookie}")
passed = passed and secure and httponly

return passed, proof

def check_same_response(self, index_none, index_fake) -> bool:
# check that content and response code are the same
if index_none >= len(self.result_list) or index_fake >= len(self.result_list):
return False
res_none = self.result_list[index_none]
res_fake = self.result_list[index_fake]
if not res_fake or not res_none:
return False
return int(res_none.res_code) == 200 and int(res_fake.res_code) == 200 and res_fake.response == res_none.response

def _check_authn_authz(self) -> Tuple[bool, List[str], bool, List[str], bool, List[str]]:
passed = True
proof: List[str] = []
Expand All @@ -82,23 +99,61 @@ def _check_authn_authz(self) -> Tuple[bool, List[str], bool, List[str], bool, Li
use_authentication = (False if authentication_method is None else authentication_method.get("type") == "jwt")
if not use_authentication:
proof.append(NO_AUTH_PROOF)
return passed, proof, signed_install_passed, signed_install_proof, authz_passed, authz_proof
return passed, proof, signed_install_passed, \
signed_install_proof, authz_passed, authz_proof

invalid_response = False
authz_passed = True
for link in scan_res:
res_code = int(scan_res[link].res_code) if scan_res[link].res_code else 0
auth_header = scan_res[link].auth_header
req_method = scan_res[link].req_method
response = scan_res[link].response
authz_req_method = scan_res[link].authz_req_method
authz_code = int(scan_res[link].authz_code) if scan_res[link].authz_code else 0
authz_header = scan_res[link].authz_header
self.result_list = scan_res[link]
signed_install_passed_get, authz_passed_get, invalid_response_get, passed_get \
= self.check_in_range(link, signed_install_proof, authz_proof, proof, invalid_response, 0, 3)
invalid_response = invalid_response and invalid_response_get
passed = passed and passed_get
signed_install_passed_post, authz_passed_post, invalid_response_post, passed_post \
= self.check_in_range(link, signed_install_proof, authz_proof, proof, invalid_response, 3, 6)
invalid_response = invalid_response and invalid_response_post
passed = passed and passed_post

signed_install_passed = signed_install_passed_post and signed_install_passed_get
authz_passed = authz_passed_get and authz_passed_post

if passed:
proof.append(VALID_AUTH_PROOF)
if authz_passed:
authz_proof.append(VALID_AUTHZ_PROOF)

return passed, proof, signed_install_passed, signed_install_proof, authz_passed, authz_proof

# checks for invalid responses, verifies signed install, and checks authz passes, in a range of responses
def check_in_range(self, link: str, signed_install_proof: List, authz_proof: List, proof: List,
invalid_response: bool, start_index: int, end_index: int) -> Tuple[bool, bool, bool, bool]:
signed_install_passed, authz_passed, passed = True, True, True

if self.check_same_response(start_index, start_index + 1) \
or self.check_same_response(start_index + 2, start_index + 1):
return True, True, invalid_response, passed

for res_index in range(start_index, end_index):
if res_index >= len(self.result_list):
break
result = self.result_list[res_index]
res_code = int(result.res_code) if result.res_code else 0
auth_header = result.auth_header
req_method = result.req_method
response = result.response
authz_req_method = result.authz_req_method
authz_code = int(result.authz_code) if result.authz_code else 0
authz_header = result.authz_header

# Check for invalid responses in the body before failing the authn check
invalid_responses = ['Invalid JWT', 'unauthorized', 'forbidden', 'error', 'unlicensed', 'not licensed',
'no license', 'invalid', '401', '403', '404', '500']
invalid_response = False

if any(str(x).lower() in str(response).lower() for x in invalid_responses):
invalid_response = True
else:
invalid_response = False

# We shouldn't be able to visit this link if the app uses authentication.
if res_code >= 200 and res_code < 400 and not invalid_response:
Expand All @@ -119,20 +174,14 @@ def _check_authn_authz(self) -> Tuple[bool, List[str], bool, List[str], bool, Li
authz_proof_text = (f"{link} | Authz Res Code: {authz_code} Req Method: {authz_req_method}"
f" Authz Header: {authz_header}")
authz_proof.append(authz_proof_text)

if passed:
proof.append(VALID_AUTH_PROOF)
if authz_passed:
authz_proof.append(VALID_AUTHZ_PROOF)

return passed, proof, signed_install_passed, signed_install_proof, authz_passed, authz_proof
return signed_install_passed, authz_passed, invalid_response, passed

def analyze(self, authz_only=False) -> Requirements:
cache_passed, cache_proof = self._check_cache_headers()
ref_passed, ref_proof = self._check_referrer_headers()
cookies_passed, cookies_proof = self._check_cookie_headers()
(auth_passed, auth_proof, signed_install_passed, signed_install_proof,
authz_passed, authz_proof) = self._check_authn_authz()
authz_passed, authz_proof) = self._check_authn_authz()

req1_1 = RequirementsResult(
passed=auth_passed,
Expand Down
2 changes: 1 addition & 1 deletion models/descriptor_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ class DescriptorResult(JsonObject):
scopes = ListProperty(StringProperty())
links = ListProperty(StringProperty())
link_errors = DictProperty()
scan_results = DictProperty(DescriptorLink)
scan_results = DictProperty(ListProperty(DescriptorLink))
response = StringProperty()
41 changes: 22 additions & 19 deletions scans/descriptor_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import string
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.backends import default_backend
from typing import List, Optional, Tuple, Union
from typing import Optional, Tuple, Union, List
from urllib.parse import urlparse

import jwt
Expand Down Expand Up @@ -314,8 +314,8 @@ def _visit_link(self, link: str) -> Optional[requests.Response]:
]

res: Optional[requests.Response] = None
all_res: List[Optional[requests.Response]] = []
for task in tasks:

# Gracefully handle links that result in an exception, report them via warning, and skip any further tests
try:
# If we are requesting a lifecycle event, ensure we perform signed-install authentication check
Expand All @@ -334,7 +334,7 @@ def _visit_link(self, link: str) -> Optional[requests.Response]:
logging.debug(f"Requesting {link} via {task['method']} with auth: {task['headers']=}")
res = self.session.request(task['method'], link, headers=task['headers'])
if res.status_code < 400:
break
all_res.append(res)
if res.status_code == 503:
logging.warning(
f"{link} caused a 503 status. Run with --debug for more information. Skipping endpoint...",
Expand All @@ -358,7 +358,7 @@ def _visit_link(self, link: str) -> Optional[requests.Response]:
self.link_errors['exceptions'] += [f"{link}"]
return None

return res
return all_res

def _get_session_cookies(self, cookiejar: requests.cookies.RequestsCookieJar) -> List[str]:
res: List[str] = []
Expand All @@ -383,8 +383,10 @@ def scan(self, user_jwt: str = None) -> DescriptorResult:
scan_results={}
)
scan_res = defaultdict()

for link in self.links:
r = self._visit_link(link)
all_analysis = []
scan_results = self._visit_link(link)

# If we are testing an admin restricted link, perform Authorization check
authz_res = None
Expand All @@ -393,22 +395,23 @@ def scan(self, user_jwt: str = None) -> DescriptorResult:
logging.debug(f"Found and tested admin link for Authorization issue: {link} |"
f" Result: {authz_res.status_code if authz_res else None}")

if r or authz_res:
scan_res[link] = DescriptorLink(
cache_header=r.headers.get('Cache-Control', 'Header missing'),
referrer_header=r.headers.get('Referrer-Policy', 'Header missing'),
session_cookies=self._get_session_cookies(r.cookies),
auth_header=r.request.headers.get('Authorization', None),
req_method=r.request.method,
res_code=str(r.status_code),
response=str(r.text),
authz_req_method=authz_res.request.method if authz_res else None,
authz_code=str(authz_res.status_code) if authz_res else None,
authz_header=str(authz_res.request.headers.get('Authorization', None)) if authz_res else None,
)
for r in scan_results:
if r or authz_res:
all_analysis.append(DescriptorLink(
cache_header=r.headers.get('Cache-Control', 'Header missing'),
referrer_header=r.headers.get('Referrer-Policy', 'Header missing'),
session_cookies=self._get_session_cookies(r.cookies),
auth_header=r.request.headers.get('Authorization', None),
req_method=r.request.method,
res_code=str(r.status_code),
response=str(r.text),
authz_req_method=authz_res.request.method if authz_res else None,
authz_code=str(authz_res.status_code) if authz_res else None,
authz_header=str(authz_res.request.headers.get('Authorization', None)) if authz_res else None,
))
scan_res[link] = all_analysis

res.scan_results = scan_res
res.link_errors = self.link_errors

logging.info(f"Descriptor scan complete, found and visited {len(self.links)} links")
return res
Loading
Loading