Skip to content
10 changes: 10 additions & 0 deletions app/objects/secondclass/c_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def __getattr__(self, item):
def replace_cleanup(self, command, payload):
return command.replace(self.RESERVED['payload'], payload)

def __eq__(self, other):
"""Overrides the default eq implementation"""
return isinstance(other, Executor) and self.name == other.name and self.platform == other.platform and \
self.command == other.command and self.code == other.code and \
self.language == other.language and self.build_target == other.build_target and \
self.payloads == other.payloads and self.uploads == other.uploads and \
self.timeout == other.timeout and self.parsers == other.parsers and \
self.cleanup == other.cleanup and self.variations == other.variations and \
self.additional_info == other.additional_info


def get_variations(data):
variations = []
Expand Down
69 changes: 41 additions & 28 deletions app/service/data_svc.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,32 +152,41 @@ async def remove(self, object_name, match):
self.log.error('[!] REMOVE: %s' % e)

async def load_ability_file(self, filename, access):
for entries in self.strip_yml(filename):
for ab in entries:
ability_id = ab.pop('id', None)
name = ab.pop('name', '')
description = ab.pop('description', '')
tactic = ab.pop('tactic', None)
executors = await self.convert_v0_ability_executor(ab)
technique_id = self.convert_v0_ability_technique_id(ab)
technique_name = self.convert_v0_ability_technique_name(ab)
privilege = ab.pop('privilege', None)
repeatable = ab.pop('repeatable', False)
singleton = ab.pop('singleton', False)
requirements = await self.convert_v0_ability_requirements(ab.pop('requirements', []))
buckets = ab.pop('buckets', [tactic])
ab.pop('access', None)
plugin = self._get_plugin_name(filename)
ab.pop('plugin', plugin)

if tactic and tactic not in filename:
self.log.error('Ability=%s has wrong tactic' % ability_id)

await self._create_ability(ability_id=ability_id, name=name, description=description, tactic=tactic,
technique_id=technique_id, technique_name=technique_name,
executors=executors, requirements=requirements, privilege=privilege,
repeatable=repeatable, buckets=buckets, access=access, singleton=singleton, plugin=plugin,
**ab)
try:
for entries in self.strip_yml(filename):
for ab in entries:
if type(ab) is not dict:
self.log.error(f'Malformed ability file {filename}. Expected ability entry to be a dictionary, received {type(ab)} instead.')
continue
ability_id = ab.pop('id', None)
if ability_id is not None and type(ability_id) is not str:
ability_id = str(ability_id)
name = ab.pop('name', '')
description = ab.pop('description', '')
tactic = ab.pop('tactic', None)
executors = await self.convert_v0_ability_executor(ab)
technique_id = self.convert_v0_ability_technique_id(ab)
technique_name = self.convert_v0_ability_technique_name(ab)
privilege = ab.pop('privilege', None)
repeatable = ab.pop('repeatable', False)
singleton = ab.pop('singleton', False)
requirements = await self.convert_v0_ability_requirements(ab.pop('requirements', []))
buckets = ab.pop('buckets', [tactic])
ab.pop('access', None)
plugin = self._get_plugin_name(filename)
ab.pop('plugin', plugin)

if tactic and tactic not in filename:
self.log.warn(f'Tactic for ability={ability_id} is not in the ability file path {filename}.')
self.log.warn('Please check that the ability is labeled with the correct tactic and is in the correct location.')

await self._create_ability(ability_id=ability_id, name=name, description=description, tactic=tactic,
technique_id=technique_id, technique_name=technique_name,
executors=executors, requirements=requirements, privilege=privilege,
repeatable=repeatable, buckets=buckets, access=access, singleton=singleton, plugin=plugin,
**ab)
except Exception as e:
self.log.exception(f'Failed to load ability file {filename}: {e}')

async def convert_v0_ability_executor(self, ability_data: dict):
"""Checks if ability file follows v0 executor format, otherwise assumes v1 ability formatting."""
Expand Down Expand Up @@ -502,8 +511,12 @@ async def _verify_adversary_profiles(self):
adv.verify(log=self.log, abilities=self.ram['abilities'], objectives=self.ram['objectives'])

def _get_plugin_name(self, filename):
plugin_path = pathlib.PurePath(filename).parts
return plugin_path[1] if 'plugins' in plugin_path else ''
path_components = pathlib.PurePath(filename).parts
num_parts = len(path_components)
for i, part in enumerate(path_components):
if part == 'plugins' and i < num_parts - 1:
return path_components[i + 1]
return ''

async def get_facts_from_source(self, fact_source_id):
fact_sources = await self.locate('sources', match=dict(id=fact_source_id))
Expand Down
143 changes: 143 additions & 0 deletions tests/services/test_data_svc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import glob
import json
import logging
import yaml

from unittest import mock
Expand Down Expand Up @@ -76,10 +77,81 @@
}


ABILITY_YAMLS = {
'plugins/testing/data/discovery/764efa883dda1e11db47671c4a3bbd9e.yml': [yaml.safe_load('''
---

- id: 764efa883dda1e11db47671c4a3bbd9e
name: Find deletable dirs (per user)
description: Discover all directories containing deletable files by user
tactic: discovery
technique:
attack_id: T1082
name: System Information Discovery
platforms:
darwin:
sh:
command: |
testcommand
linux:
sh:
command: |
testcommand
''')],
'plugins/testing/data/discovery/848aa201-4b00-4f08-ae3a-3e84dfb5065c.yml': [yaml.safe_load('''
---

- id: 848aa201-4b00-4f08-ae3a-3e84dfb5065c
name: Find deletable dirs (per user)
description: Discover all directories containing deletable files by user
tactic: discovery
technique:
attack_id: T1082
name: System Information Discovery
platforms:
darwin:
sh:
command: |
testcommand
linux:
sh:
command: |
testcommand
''')],
'plugins/testing/data/discovery/101.yml': [yaml.safe_load('''
---

- id: 101
name: Find deletable dirs (per user)
description: Discover all directories containing deletable files by user
tactic: purposefullywrongtactic
technique:
attack_id: T1082
name: System Information Discovery
platforms:
darwin:
sh:
command: |
testcommand
linux:
sh:
command: |
testcommand
''')],
'plugins/testing/data/discovery/102.yml': [yaml.safe_load('''
malformed
''')],
}


def strip_payload_yaml(path):
return PAYLOAD_CONFIG_YAMLS.get(path, [])


def strip_ability_yaml(path):
return ABILITY_YAMLS.get(path, [])


class TestDataService:
mock_payload_config = dict()

Expand Down Expand Up @@ -234,3 +306,74 @@ def _mock_apply_payload_config(config=None, **_):
}
}
mock_apply_config2.assert_called_once_with(name='payloads', config=expected_config_part2)

@mock.patch.object(logging.Logger, 'warn')
@mock.patch.object(BaseWorld, 'strip_yml', wraps=strip_ability_yaml)
async def test_load_ability_file(self, mock_strip_yml, mock_warn, data_svc):
want_executors = [
Executor(name='sh', platform='darwin', command='testcommand',
code=None, language=None, build_target=None,
payloads=None, uploads=None, timeout=60,
parsers=[], cleanup=None, variations=[]),
Executor(name='sh', platform='linux', command='testcommand',
code=None, language=None, build_target=None,
payloads=None, uploads=None, timeout=60,
parsers=[], cleanup=None, variations=[])
]
with patch.object(DataService, '_create_ability', return_value=None) as mock_create_ability:
await data_svc.load_ability_file('plugins/testing/data/discovery/764efa883dda1e11db47671c4a3bbd9e.yml', BaseWorld.Access.RED)
mock_create_ability.assert_called_once_with(ability_id='764efa883dda1e11db47671c4a3bbd9e', name='Find deletable dirs (per user)',
description='Discover all directories containing deletable files by user',
tactic='discovery', technique_id='T1082', technique_name='System Information Discovery',
executors=want_executors, requirements=[], privilege=None,
repeatable=False, buckets=['discovery'], access=BaseWorld.Access.RED, singleton=False, plugin='testing')

with patch.object(DataService, '_create_ability', return_value=None) as mock_create_ability:
await data_svc.load_ability_file('plugins/testing/data/discovery/848aa201-4b00-4f08-ae3a-3e84dfb5065c.yml', BaseWorld.Access.RED)
mock_create_ability.assert_called_once_with(ability_id='848aa201-4b00-4f08-ae3a-3e84dfb5065c', name='Find deletable dirs (per user)',
description='Discover all directories containing deletable files by user',
tactic='discovery', technique_id='T1082', technique_name='System Information Discovery',
executors=want_executors, requirements=[], privilege=None,
repeatable=False, buckets=['discovery'], access=BaseWorld.Access.RED, singleton=False, plugin='testing')

with patch.object(DataService, '_create_ability', return_value=None) as mock_create_ability:
await data_svc.load_ability_file('plugins/testing/data/discovery/101.yml', BaseWorld.Access.RED)
mock_warn.assert_any_call('Tactic for ability=101 is not in the ability file path plugins/testing/data/discovery/101.yml.')
mock_warn.assert_called_with('Please check that the ability is labeled with the correct tactic and is in the correct location.')
mock_create_ability.assert_called_once_with(ability_id='101', name='Find deletable dirs (per user)',
description='Discover all directories containing deletable files by user',
tactic='purposefullywrongtactic', technique_id='T1082', technique_name='System Information Discovery',
executors=want_executors, requirements=[], privilege=None,
repeatable=False, buckets=['purposefullywrongtactic'], access=BaseWorld.Access.RED, singleton=False, plugin='testing')

with patch.object(DataService, '_create_ability', return_value=None) as mock_create_ability:
with patch.object(logging.Logger, 'error') as mock_error:
await data_svc.load_ability_file('plugins/testing/data/discovery/102.yml', BaseWorld.Access.RED)
mock_create_ability.assert_not_called()
assert mock_error.called

# Test exception
with patch.object(DataService, '_create_ability', side_effect=Exception('mockexception')):
with patch.object(logging.Logger, 'exception') as mock_exception:
await data_svc.load_ability_file('plugins/testing/data/discovery/101.yml', BaseWorld.Access.RED)
mock_exception.assert_called_once_with(mock.ANY)
assert 'Failed to load ability file plugins/testing/data/discovery/101.yml' in mock_exception.call_args.args[0]

def test_get_plugin_name(self, data_svc):
assert 'test' == data_svc._get_plugin_name('plugins/test')
assert 'test' == data_svc._get_plugin_name('plugins/test/')
assert 'test' == data_svc._get_plugin_name('plugins/test/data')
assert 'test' == data_svc._get_plugin_name('plugins/test/data/abilities')
assert 'test' == data_svc._get_plugin_name('plugins/test/data/abilities/collection/123.yml')
assert 'test' == data_svc._get_plugin_name('/full/path/to/plugins/test/data/abilities/collection/123.yml')
assert '' == data_svc._get_plugin_name('test')
assert '' == data_svc._get_plugin_name('plugins')
assert '' == data_svc._get_plugin_name('plugins/')
assert '' == data_svc._get_plugin_name('/full/path/to/plugins')
assert '' == data_svc._get_plugin_name('/full/path/to/plugins/')
assert '' == data_svc._get_plugin_name('plugin/test')
assert '' == data_svc._get_plugin_name('plugin/test/')
assert '' == data_svc._get_plugin_name('plugin/test/data')
assert '' == data_svc._get_plugin_name('plugin/test/data/abilities')
assert '' == data_svc._get_plugin_name('plugin/test/data/abilities/collection/123.yml')
assert '' == data_svc._get_plugin_name('/full/path/to/plugin/test/data/abilities/collection/123.yml')
Loading