Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
be0797c
initial commit to add travis ci and a mock unittest
clegaspi Jan 30, 2021
f2b3cc9
Skip failing test (intentionally)
clegaspi Jan 30, 2021
b0a7ef4
Add coveralls support for coverage checking
clegaspi Jan 30, 2021
330de63
Change script to support coverage
clegaspi Jan 30, 2021
c309a0c
install coverage packages
clegaspi Jan 30, 2021
7e85ecf
Add badges to readme
clegaspi Jan 30, 2021
1ab0266
Tests for cert.py
clegaspi Mar 21, 2021
e062cc6
Move comment
clegaspi Mar 21, 2021
3b89e6d
Reading cert from alternate location (#35)
clegaspi Mar 28, 2021
8c32879
Adding start of har tests, testing travis encrypted file
clegaspi Mar 28, 2021
da8cc0a
Fix data path
clegaspi Mar 28, 2021
b3549f6
Remove path
clegaspi Mar 28, 2021
3f70283
Putting path back again since it worked before
clegaspi Mar 28, 2021
48cae0a
Uploading re-encrypted file
clegaspi Mar 28, 2021
3586028
Change cert test setup to class setup
clegaspi Mar 28, 2021
7556066
Rewriting HarParser to parse in constructor (#36) and other improveme…
clegaspi Mar 28, 2021
5813c5f
Update redacted HAR test data and include script for producing it
clegaspi Mar 28, 2021
94dd175
Remove todo line that is done
clegaspi Mar 28, 2021
7ad3647
Add raises information to docs
clegaspi Mar 29, 2021
638d61a
Update redaction script to output multiple HAR files
clegaspi Mar 29, 2021
e0192c3
Update har test data
clegaspi Mar 29, 2021
be68beb
Update har test data again
clegaspi Mar 29, 2021
983e0bf
Update travis file
clegaspi Mar 29, 2021
b16c24b
Update har data and tests
clegaspi Mar 29, 2021
ace8a7d
Add additional test file with only one response
clegaspi Mar 29, 2021
2a85d11
Update har data
clegaspi Mar 29, 2021
55b56cb
Create more har.py tests
clegaspi Mar 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
/tests/data/redacted_nodata.har
/tests/data/redacted_requests.har
/tests/data/redacted_responses.har
/tests/data/redacted_saml.har
/tests/data/redacted_har_data.tar.gz
/tests/data/redacted_oneresponse.har
18 changes: 18 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
language: python
python:
- "3.6"
# command to install dependencies
before_install:
- >
openssl aes-256-cbc -K $encrypted_b6a0e19775d3_key -iv $encrypted_b6a0e19775d3_iv
-in tests/data/redacted_har_data.tar.gz.enc -out tests/data/redacted_har_data.tar.gz -d
- tar -zxvf tests/data/redacted_har_data.tar.gz -C tests/data
install:
- sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl
- pip install .
- pip install coverage coveralls
# command to run tests
script:
- coverage run --source=saml_reader -m pytest --verbose
after_success:
- coveralls
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# SAML Reader

[![Build Status](https://travis-ci.com/clegaspi/saml_reader.svg?branch=master)](https://travis-ci.com/clegaspi/saml_reader)
[![Coverage Status](https://coveralls.io/repos/github/clegaspi/saml_reader/badge.svg?branch=add_travis)](https://coveralls.io/github/clegaspi/saml_reader?branch=add_travis)

## **IMPORTANT**
Please **DO NOT** add any personally identifiable information (PII) when reporting an issue.

Expand Down
90 changes: 76 additions & 14 deletions saml_reader/har.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import json
from urllib.parse import unquote
from datetime import datetime
from enum import Enum

import haralyzer

Expand Down Expand Up @@ -33,44 +35,75 @@ def __init__(self, data):

Args:
data (basestring): Raw HAR data as JSON-string

Raises:
(HarParsingError) If there is an error reading the HAR data
"""
# TODO: Consider parsing this upon creation and writing a getter for SAML response(s)
# to wrap the haralyzer package more thoroughly
try:
self.data = json.loads(data)
except json.JSONDecodeError:
raise HarParsingError("Problem reading HAR JSON data")
self.parsed_data = None
self.errors = []

def parse(self):
self.responses = self.__parse(self.data)
# Sort responses newest to oldest
self.responses.sort(reverse=True, key=lambda x: x.date)

@staticmethod
def __parse(raw_json):
"""
Parses the raw HAR data and stores it in the object.
Parses the raw HAR data and returns SAML response data.

Args:
raw_json (dict): HAR JSON as a dictionary

Returns:
(basestring): SAML response as base64 string
(`list` of `RawSamlData`): SAML response data found in HAR file

Raises:
(HarParsingError) If the HAR parsing class could not parse JSON as HAR data
"""
try:
parsed_har = haralyzer.HarParser(self.data)
parsed_har = haralyzer.HarParser(raw_json)
except Exception:
# This is a wide catch-all
raise HarParsingError("Could not parse the HAR data")

responses = []
for page in parsed_har.pages:
for post in page.post_requests:
timestamp = post['startedDateTime']
url = post.get('request', {}).get('url', "")
for param in post.get('request', {}).get('postData', {}).get('params', []):
if param['name'] == 'SAMLResponse':
responses.append(param['value'])

if len(responses) > 1:
self.errors.append("Multiple SAML responses found. Using the first one.")
unencoded_response = unquote(param['value'])
responses.append(
RawSamlData('response', unencoded_response, timestamp, url)
)

if not responses:
raise NoSAMLResponseFound("No SAML response found in the HAR file")

self.parsed_data = unquote(responses[0])
return self.parsed_data
return responses

def get_raw_saml_response(self):
"""
Returns the most recent SAML response in the HAR data (if there are multiple) as
url-decoded base64-encoded string.

Returns:
(basestring) Raw SAML data as base64-encoded string

"""
return self.responses[0].saml_string

def contains_multiple_responses(self):
"""
Checks if the HAR data contained multiple SAML responses.

Returns:
(bool) True if contained more than one SAML response. False otherwise.
"""
return len(self.responses) > 1

@classmethod
def from_file(cls, filename):
Expand All @@ -85,3 +118,32 @@ def from_file(cls, filename):
"""
with open(filename, 'r') as f:
return cls(f.read())


class RawSamlData:
"""
Simple data structure for holding raw SAML data and some metadata
"""
class _SamlDataType(Enum):
"""
Enumeration of possible SAML data types
"""
RESPONSE = 0
REQUEST = 1

def __init__(self, data_type, saml_string, date, url):
"""
Create data structure object

Args:
data_type (basestring): Type of SAML data ('request' or 'response')
saml_string (basestring): URL-decoded base64 string containing SAML data
date (basestring): timestamp of request from HAR file, formatted "2019-11-04T10:00:00.000-08:00"
url (basestring): destination URL of the SAML data
"""
self.data_type = self._SamlDataType[data_type.upper()]
self.saml_string = saml_string
if ":" == date[-3:-2]:
date = date[:-3] + date[-2:]
self.date = datetime.strptime(date, "%Y-%m-%dT%H:%M:%S.%f%z")
self.url = url
45 changes: 34 additions & 11 deletions saml_reader/saml/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,14 @@ def _parse_saml_values(self):
"""

value_by_field = {
'certificate': self._saml.query_assertion(
'/ds:Signature/ds:KeyInfo/ds:X509Data/ds:X509Certificate'
),
'certificate': self.__get_certificate(),
'name_id': self._saml.query_assertion(
'/saml:Subject/saml:NameID'
),
'name_id_format': self._saml.query_assertion(
'/saml:Subject/saml:NameID'
),
'acs': [
self._saml.query('/samlp:Response'),
self._saml.query_assertion(
'/saml:Subject/saml:SubjectConfirmation/saml:SubjectConfirmationData'
)
],
'acs': self.__get_acs(),
'encryption':
self._saml.query_assertion('/ds:Signature/ds:SignedInfo/ds:SignatureMethod') or
self._saml.query('/samlp:Response/ds:Signature/ds:SignedInfo/ds:SignatureMethod'),
Expand All @@ -83,10 +76,10 @@ def _parse_saml_values(self):
}

transform_by_field = {
'certificate': lambda x: x[0].text if x else None,
'certificate': lambda x: x,
'name_id': lambda x: x[0].text if x else None,
'name_id_format': lambda x: x[0].attrib.get('Format') if x else None,
'acs': lambda x: x[0][0].attrib.get('Destination') or x[1][0].attrib.get('Recipient') or None,
'acs': lambda x: x,
'encryption': self.__parse_encryption,
'audience': lambda x: x[0].text if x else None,
'issuer': lambda x: x[0].text if x else None,
Expand All @@ -96,6 +89,36 @@ def _parse_saml_values(self):
for field, value in value_by_field.items():
self._saml_values[field] = transform_by_field[field](value)

def __get_acs(self):
"""
Return the Assertion Consumer Service URL, if it exists in the SAML data

Returns:
(`basestring` or `None`) ACS data, or None, if it doesn't exist
"""
value = self._saml.query('/samlp:Response')
if value:
return value[0].attrib.get('Destination')
value = self._saml.query_assertion(
'/saml:Subject/saml:SubjectConfirmation/saml:SubjectConfirmationData'
)
if value:
return value[0].attrib.get('Recipient')
return None

def __get_certificate(self):
"""
Return the certificate data, if it exists in the SAML data

Returns:
(`basestring` or `None`) Certificate data, or None, if it doesn't exist
"""
value = self._saml.query_assertion('/ds:Signature/ds:KeyInfo/ds:X509Data/ds:X509Certificate') or \
self._saml.query('/samlp:Response/ds:Signature/ds:KeyInfo/ds:X509Data/ds:X509Certificate')
if not value:
return None
return value[0].text

@staticmethod
def __parse_attributes(attribute_data):
"""
Expand Down
11 changes: 7 additions & 4 deletions saml_reader/text_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,13 +206,16 @@ def _parse_raw_data(self, input_type, data, parser=StandardSamlParser):
return parser.from_xml(data)
if input_type == 'har':
try:
# TODO: Do the HAR parsing in the constructor?
har_parser = HarParser(data)
data = har_parser.parse()
except HarParsingError as e:
raise DataTypeInvalid(*e.args)
self._errors.extend(har_parser.errors)
return parser.from_base64(data)
if har_parser.contains_multiple_responses():
# TODO: This is a place where some optimization could happen, such as prompting
# the user to select one of the responses, or trying to analyze the destination
# URL to see which one is "probably" right by matching the ACS pattern
self._errors.append("Multiple SAML responses found! Using the most recent.")
raw_saml_data = har_parser.get_raw_saml_response()
return parser.from_base64(raw_saml_data)
raise DataTypeInvalid(f"Invalid data type specified: {input_type}")

def get_saml(self):
Expand Down
116 changes: 116 additions & 0 deletions tests/data/redact_har_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""
This script will strip a HAR file to include only SAML request and response data,
and redact all cookie and header data.

Running from the command line, the first argument is the path to the source file, which
should contain at least one SAML Request and one SAML Response. The second argument is the output path
for the redacted files. The third argument is a template for the output filenames.
"""

import json
from datetime import datetime, timedelta
from copy import deepcopy
import sys
import os


def redact_har_file(source_file, destination_path, filename_template):
# Read source file
with open(source_file, 'r') as f:
har = json.load(f)

# Find only SAML Request and Response data in the HAR file
entries = [e for e in har['log']['entries']
if e['request']['method'] == 'POST' and \
any(p['name'].startswith('SAML') for p in e['request'].get('postData', {}).get('params', []))]

# Redact header and cookie values
for entry in entries:
for t in ('request', 'response'):
for category in ('cookies', 'headers'):
for values_to_edit in entry[t][category]:
values_to_edit['value'] = "redacted"

# Collect pages that match the entries found
page_nums = {p['pageref'] for p in entries}
pages = [p for p in har['log']['pages'] if p['id'] in page_nums]

# Create a second set of SAML data entries which are one day in the future.
# This is to test having multiple entries in the file.
second_response_entries = []

for entry in entries:
raw_timestamp = entry['startedDateTime']

# HAR timestamps have a colon in the timezone offset. Removing it here.
if ":" == raw_timestamp[-3:-2]:
raw_timestamp = raw_timestamp[:-3] + raw_timestamp[-2:]
timestamp = datetime.strptime(raw_timestamp, "%Y-%m-%dT%H:%M:%S.%f%z")
new_timestamp = timestamp + timedelta(days=1)

new_raw_timestamp = new_timestamp.strftime("%Y-%m-%dT%H:%M:%S")
# HAR timestamps have milliseconds instead of microseconds. Removing excess digits.
new_raw_timestamp += new_timestamp.strftime(".%f")[:4]
# HAR timestamps have a colon in the timezone offset. Adding it in here.
new_raw_timestamp += new_timestamp.strftime("%z")
new_raw_timestamp = new_raw_timestamp[:-2] + ":" + new_raw_timestamp[-2:]

new_entry = deepcopy(entry)
new_entry['startedDateTime'] = new_raw_timestamp
second_response_entries.append(new_entry)

responses = []
requests = []

for entry in entries + second_response_entries:
if any(p['name'] == 'SAMLRequest' for p in entry['request']['postData']['params']):
requests.append(entry)
else:
responses.append(entry)

both_types_out = {'log': {
'pages': pages,
'entries': entries + second_response_entries}
}

responses_out = {'log': {
'pages': pages,
'entries': responses}
}

one_response_out = {'log': {
'pages': [p for p in pages if p['id'] == responses[0]['pageref']],
'entries': [responses[0]]}
}

requests_out = {'log': {
'pages': pages,
'entries': requests}
}

no_saml_data_out = {'log': {
'pages': pages,
'entries': []}
}

with open(os.path.join(destination_path, filename_template + "_saml.har"), 'w') as f:
json.dump(both_types_out, f)

with open(os.path.join(destination_path, filename_template + "_requests.har"), 'w') as f:
json.dump(requests_out, f)

with open(os.path.join(destination_path, filename_template + "_responses.har"), 'w') as f:
json.dump(responses_out, f)

with open(os.path.join(destination_path, filename_template + "_oneresponse.har"), 'w') as f:
json.dump(one_response_out, f)

with open(os.path.join(destination_path, filename_template + "_nodata.har"), 'w') as f:
json.dump(no_saml_data_out, f)


if __name__ == '__main__':
if len(sys.argv) != 4:
raise ValueError("Incorrect number of arguments specified! "
"Need source file, destination path, filename template")
redact_har_file(*sys.argv[1:])
Binary file added tests/data/redacted_har_data.tar.gz.enc
Binary file not shown.
Loading