-
Notifications
You must be signed in to change notification settings - Fork 1.3k
improving unit test coverage (WIP) #3246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5100c8b
cd1c7a5
b4bdc89
40ad380
bfbef86
f72a7bc
c7f459a
433f257
ce8d68a
f5aadfc
38fb9c5
5f39102
874e594
da29139
153052c
1731eb0
0798080
207e2a1
3075f57
c15f77e
4ced90e
e5457ec
54a6cc2
bf3ae72
865bb88
4d736ba
47698ed
a6d02a7
7489f88
7092955
f2c56fe
bb44cbc
7eab86e
1ccd44c
a6fa1e0
f4831b0
a202757
db69116
e69b354
d1b1013
e9b3254
1e6c801
62aadaf
423ad3f
c831656
62ef268
70a37d8
a094f9b
ba8d98b
45aaaa5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -137,9 +137,9 @@ | |||||
| ('FILENAME', display_name)]) | ||||||
| return web.Response(body=content, headers=headers) | ||||||
| except FileNotFoundError: | ||||||
| return web.HTTPNotFound(body='File not found') | ||||||
| raise web.HTTPNotFound(text='File not found') | ||||||
| except Exception as e: | ||||||
| return web.HTTPNotFound(body=str(e)) | ||||||
| raise web.HTTPNotFound(text=str(e)) | ||||||
Check warningCode scanning / CodeQL Information exposure through an exception Medium Stack trace information Error loading related location Loading |
||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
revert for now |
||||||
|
|
||||||
| @check_authorization | ||||||
| async def download_exfil_file(self, request): | ||||||
|
|
@@ -153,20 +153,17 @@ | |||||
| file = base64.b64decode(request.query.get('file')).decode('ascii') | ||||||
| file = os.path.normpath(file) # normalize path to remove all directory traversal attempts then check for presence in exfil dir | ||||||
| if not is_in_exfil_dir(file): | ||||||
| return web.HTTPNotFound(body="File not found in exfil dir") | ||||||
| raise web.HTTPNotFound(text="File not found in exfil dir") | ||||||
| filename = file.split(os.sep)[-1] | ||||||
| path = os.sep.join(file.split(os.sep)[:-1]) | ||||||
| _, content = await self.file_svc.read_file(filename, location=path) | ||||||
| headers = dict([('CONTENT-DISPOSITION', 'attachment; filename="%s"' % filename), | ||||||
| ('FILENAME', filename)]) | ||||||
| return web.Response(body=content, headers=headers) | ||||||
| except web.HTTPNotFound as e: | ||||||
| raise e | ||||||
| except FileNotFoundError: | ||||||
| return web.HTTPNotFound(body='File not found') | ||||||
| raise web.HTTPNotFound(text='File not found') | ||||||
| except Exception as e: | ||||||
| return web.HTTPNotFound(body=str(e)) | ||||||
| return web.HTTPBadRequest(body='A file needs to be specified for download') | ||||||
|
|
||||||
| @staticmethod | ||||||
| def _request_errors(request): | ||||||
| errors = [] | ||||||
| return errors | ||||||
| raise web.HTTPNotFound(text=str(e)) | ||||||
Check warningCode scanning / CodeQL Information exposure through an exception Medium Stack trace information Error loading related location Loading |
||||||
|
|
||||||
| raise web.HTTPBadRequest(text='A file needs to be specified for download') | ||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -109,4 +109,4 @@ async def update_ability(self, request: web.Request): | |||||
| description='HTTP 204 Status Code (No Content)') | ||||||
| async def delete_ability(self, request: web.Request): | ||||||
| await self.delete_on_disk_object(request) | ||||||
| return web.HTTPNoContent() | ||||||
| raise web.HTTPNoContent() | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
revert for now |
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -136,7 +136,7 @@ async def delete_payloads(self, request: web.Request): | |||||
| response = web.HTTPNotFound() | ||||||
| except PermissionError: | ||||||
| response = web.HTTPForbidden(reason="Permission denied.") | ||||||
| return response | ||||||
| raise response | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
revert for now |
||||||
|
|
||||||
| @classmethod | ||||||
| async def __generate_file_name_and_path(cls, sanitized_filename: str) -> [str, str]: | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |||||
| def api_access(func): | ||||||
| async def process(*args, **kwargs): | ||||||
| async with aiohttp.ClientSession(headers=dict(Authorization='token {}'.format(args[0].token)), | ||||||
| connector=aiohttp.TCPConnector(verify_ssl=False)) as session: | ||||||
| connector=aiohttp.TCPConnector(ssl=False)) as session: | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
revert for now |
||||||
| kwargs['session'] = session | ||||||
| return await func(*args, **kwargs) | ||||||
| return process | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -14,7 +14,7 @@ | |||||
| def api_access(func): | ||||||
| async def process(*args, **kwargs): | ||||||
| async with aiohttp.ClientSession(headers=dict(Authorization='Bearer {}'.format(args[0].key)), | ||||||
| connector=aiohttp.TCPConnector(verify_ssl=False)) as session: | ||||||
| connector=aiohttp.TCPConnector(ssl=False)) as session: | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
revert for now |
||||||
| kwargs['session'] = session | ||||||
| return await func(*args, **kwargs) | ||||||
| return process | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| import pytest | ||
|
|
||
| from unittest import mock | ||
|
|
||
| from app.api.v2.managers.config_api_manager import ConfigApiManager, SENSITIVE_CONFIG_PROPS | ||
| from app.utility.base_world import BaseWorld | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def base_world(app_config, agent_config): | ||
| BaseWorld.clear_config() | ||
| BaseWorld.apply_config('main', app_config) | ||
| BaseWorld.apply_config('agents', agent_config) | ||
|
|
||
| yield BaseWorld | ||
|
|
||
| BaseWorld.clear_config() | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def filter_config(): | ||
| def _filter_config(config): | ||
| to_filter = config | ||
| for sensitive_prop in SENSITIVE_CONFIG_PROPS: | ||
| to_filter.pop(sensitive_prop, None) | ||
| return to_filter | ||
| return _filter_config | ||
|
|
||
|
|
||
| class TestConfigApi: | ||
| async def test_get_config_with_name(self, api_v2_client, api_cookies, base_world, filter_config, app_config, agent_config): | ||
| resp = await api_v2_client.get('/api/v2/config/main', cookies=api_cookies) | ||
| config_json = await resp.json() | ||
| want = filter_config(app_config) | ||
| assert config_json == want | ||
|
|
||
| resp = await api_v2_client.get('/api/v2/config/agents', cookies=api_cookies) | ||
| config_json = await resp.json() | ||
| assert config_json == agent_config | ||
|
|
||
| # Test nonexistent config | ||
| resp = await api_v2_client.get('/api/v2/config/doesnotexist', cookies=api_cookies) | ||
| resp_dict = await resp.json() | ||
| want = dict(error='Config not found: doesnotexist') | ||
| assert resp_dict == want | ||
|
|
||
| async def test_get_update_main_config(self, api_v2_client, api_cookies, base_world, filter_config, app_config): | ||
| data = dict(prop='app.contact.html', value='/newhtmlcontact') | ||
| resp = await api_v2_client.patch('/api/v2/config/main', json=data, cookies=api_cookies) | ||
| config_json = await resp.json() | ||
| want = filter_config(app_config) | ||
| want['app.contact.html'] = '/newhtmlcontact' | ||
| assert config_json == want | ||
|
|
||
| # Test sensitive field | ||
| data = dict(prop='host', value='127.0.0.3') | ||
| resp = await api_v2_client.patch('/api/v2/config/main', json=data, cookies=api_cookies) | ||
| resp_dict = await resp.json() | ||
| want = dict( | ||
| error='Update not allowed', | ||
| details=dict(property='host') | ||
| ) | ||
| assert resp_dict == want | ||
|
|
||
| async def test_get_update_agents_config(self, api_v2_client, api_cookies, base_world, agent_config): | ||
| data = dict(watchdog='1', implant_name='newname', bootstrap_abilities=['abil1', '', 'DNE', 'abil2']) | ||
| with mock.patch.object(ConfigApiManager, '_get_loaded_ability_ids', return_value={'abil1', 'abil2'}): | ||
| resp = await api_v2_client.patch('/api/v2/config/agents', json=data, cookies=api_cookies) | ||
| config_json = await resp.json() | ||
| want = { | ||
| 'sleep_min': 30, | ||
| 'sleep_max': 60, | ||
| 'untrusted_timer': 90, | ||
| 'watchdog': 1, | ||
| 'implant_name': 'newname', | ||
| 'deadman_abilities': [ | ||
| 'this-is-a-fake-ability' | ||
| ], | ||
| 'bootstrap_abilities': [ | ||
| 'abil1', 'abil2' | ||
| ] | ||
| } | ||
| assert config_json == want |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
revert for now