Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 64 additions & 6 deletions src/mapp/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import multiprocessing
import time
import shutil
import jwt

from pathlib import Path
from copy import deepcopy
Expand Down Expand Up @@ -127,6 +128,43 @@ def env_to_string(env:dict) -> str:
out += f'{key}={value}\n'
return out

def login_cached_user(cmd:list[str], ctx:dict, user_name:str, email:str, password:str='testpass123') -> dict:
"""Login to a cached user and return user data with env context"""
login_params = {'email': email, 'password': password}
login_cmd = cmd + ['auth', 'login-user', 'run', json.dumps(login_params), '--show', '--no-session']
result = subprocess.run(login_cmd, capture_output=True, text=True, env=ctx, timeout=10)

if result.returncode != 0:
raise RuntimeError(f'Error logging in cached user {user_name}:\n{result.stdout + result.stderr}')

login_response = json.loads(result.stdout)['result']
access_token = login_response['access_token']

# Decode JWT token to get user_id (without verification since we generated it locally)
# Signature verification is disabled because:
# 1. This is for test purposes only with locally generated tokens
# 2. We just need to extract the user_id from the payload
# 3. The token was just created by our own auth system
try:
token_payload = jwt.decode(access_token, options={'verify_signature': False})
user_id = token_payload['sub']
except (jwt.DecodeError, jwt.InvalidTokenError, KeyError) as e:
raise RuntimeError(f'Error decoding access token for cached user {user_name}: {e}')

user_data = {
'id': user_id,
'name': user_name,
'email': email,
'password': password,
'password_confirm': password
}

user_env = ctx.copy()
user_env['MAPP_CLI_ACCESS_TOKEN'] = access_token
user_env['Authorization'] = f'Bearer {access_token}'

return {'user': user_data, 'env': user_env}


class TestMTemplateApp(unittest.TestCase):

Expand Down Expand Up @@ -188,6 +226,11 @@ def setUpClass(cls):

os.makedirs(cls.test_dir, exist_ok=True)

# Always delete crud DB to avoid max models exceeded errors with cached data
# Only cache the expensive pagination DB
if cls.crud_db_file.exists():
cls.crud_db_file.unlink()

pagination_db_exists = cls.pagination_db_file.exists()

#
Expand Down Expand Up @@ -281,13 +324,11 @@ def setUpClass(cls):
except AssertionError as e:
raise RuntimeError(f'AssertionError {e} while creating table for crud db {module_name_kebab}.{model_name_kebab}: {crud_result.stdout + crud_result.stderr}')

# create 2 crud users #
# create crud users (always fresh since crud DB is recreated each time) #

if cls.use_cache:
raise RuntimeError('Need to fix cache with new auth system')

if not cls.use_cache and cls.spec['project']['use_builtin_modules']:
if cls.spec['project']['use_builtin_modules']:
crud_users = ['alice', 'bob', 'charlie', 'david', 'evelyn']
print(' :: Creating crud users ::')
for user_name in crud_users:

# create #
Expand Down Expand Up @@ -364,7 +405,24 @@ def setUpClass(cls):

if cls.use_cache and pagination_db_exists:
print(' :: Using cached pagination db ::')
else:

# login to cached pagination user #

if cls.spec['project']['use_builtin_modules']:
print(' :: Logging in to cached pagination user ::')
try:
cls.pagination_user = login_cached_user(
cls.cmd,
cls.pagination_ctx,
'pagination_tester',
'pagination_tester@example.com'
)
except RuntimeError as e:
print(f' :: Could not login to cached pagination user, will recreate: {e} ::')
# fall through to else block to recreate pagination db
cls.use_cache = False # disable cache for pagination db

if not cls.use_cache or not pagination_db_exists:
print(f' :: Seeding pagination db ::')
seed_jobs = []
for module in cls.spec['modules'].values():
Expand Down