diff --git a/ansible/roles/authentik/files/create_recovery_flow.py b/ansible/roles/authentik/files/create_recovery_flow.py new file mode 100644 index 0000000..43db357 --- /dev/null +++ b/ansible/roles/authentik/files/create_recovery_flow.py @@ -0,0 +1,477 @@ +#!/usr/bin/env python3 +""" +Authentik Recovery Flow Automation Script + +This script creates a complete password recovery flow in Authentik with: +- Password complexity policy (12 chars, mixed case, digit, symbol) +- Recovery identification stage (username/email) +- Recovery email stage (sends recovery token) +- Password change stages (with validation) +- Integration with default authentication flow + +Usage: + python3 create_recovery_flow.py +""" + +import sys +import json +import urllib.request +import urllib.error + + +def api_request(base_url, token, path, method='GET', data=None): + """Make an API request to Authentik""" + url = f"{base_url}{path}" + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + request_data = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=request_data, headers=headers, method=method) + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + body = resp.read().decode() + if body: + return resp.status, json.loads(body) + return resp.status, {} + except urllib.error.HTTPError as e: + error_body = e.read().decode() + try: + error_data = json.loads(error_body) if error_body else {'error': 'Empty error response'} + except: + error_data = {'error': error_body or 'Unknown error'} + return e.code, error_data + except Exception as e: + return 0, {'error': str(e)} + + +def get_or_create_password_policy(base_url, token): + """Create password complexity policy""" + print("Checking for password complexity policy...") + + policy_data = { + "name": "password-complexity", + "password_field": "password", + "amount_digits": 1, + "amount_uppercase": 1, + "amount_lowercase": 1, + "amount_symbols": 1, + "length_min": 12, + "symbol_charset": "!\\\"#$%&'()*+,-./:;<=>?@[]^_`{|}~", + "error_message": "Enter a minimum of 12 characters, with at least 1 lowercase, uppercase, digit and symbol", + "check_static_rules": True, + "check_have_i_been_pwned": True, + "check_zxcvbn": True, + "hibp_allowed_count": 0, + "zxcvbn_score_threshold": 2 + } + + # Check if policy already exists + status, policies = api_request(base_url, token, '/api/v3/policies/password/') + print(f" Initial check status: {status}") + if status == 200: + results = policies.get('results', []) + print(f" Found {len(results)} existing policies") + for policy in results: + policy_name = policy.get('name') + print(f" - {policy_name}") + if policy_name == 'password-complexity': + print(f" ✓ Password policy already exists: {policy['pk']}") + return policy['pk'] + else: + print(f" Initial check failed: {policies}") + + # Create new policy + status, policy = api_request(base_url, token, '/api/v3/policies/password/', 'POST', policy_data) + if status == 201: + print(f" ✓ Created password policy: {policy['pk']}") + return policy['pk'] + elif status == 400 and 'name' in policy: + # Policy with same name already exists, search for it again + print(f" ! Policy name already exists, retrieving existing policy...") + status, policies = api_request(base_url, token, '/api/v3/policies/password/') + if status == 200: + for existing_policy in policies.get('results', []): + if existing_policy.get('name') == 'password-complexity': + print(f" ✓ Found existing password policy: {existing_policy['pk']}") + return existing_policy['pk'] + print(f" ✗ Failed to find existing policy after creation conflict") + return None + else: + print(f" ✗ Failed to create password policy: {policy}") + return None + + +def get_or_create_recovery_identification_stage(base_url, token): + """Create recovery identification stage""" + print("Creating recovery identification stage...") + + stage_data = { + "name": "recovery-authentication-identification", + "user_fields": ["username", "email"], + "password_stage": None, + "case_insensitive_matching": True, + "show_matched_user": True, + "pretend_user_exists": True, + "enable_remember_me": False + } + + # Check if stage already exists + status, stages = api_request(base_url, token, '/api/v3/stages/identification/') + if status == 200: + for stage in stages.get('results', []): + if stage.get('name') == 'recovery-authentication-identification': + print(f" ✓ Recovery identification stage already exists: {stage['pk']}") + return stage['pk'] + + # Create new stage + status, stage = api_request(base_url, token, '/api/v3/stages/identification/', 'POST', stage_data) + if status == 201: + print(f" ✓ Created recovery identification stage: {stage['pk']}") + return stage['pk'] + elif status == 400 and 'name' in stage: + # Stage with same name already exists + print(f" ! Stage name already exists, retrieving existing stage...") + status, stages = api_request(base_url, token, '/api/v3/stages/identification/') + if status == 200: + for existing_stage in stages.get('results', []): + if existing_stage.get('name') == 'recovery-authentication-identification': + print(f" ✓ Found existing recovery identification stage: {existing_stage['pk']}") + return existing_stage['pk'] + print(f" ✗ Failed to find existing stage after creation conflict") + return None + else: + print(f" ✗ Failed to create recovery identification stage: {stage}") + return None + + +def get_or_create_recovery_email_stage(base_url, token): + """Create recovery email stage""" + print("Creating recovery email stage...") + + stage_data = { + "name": "recovery-email", + "use_global_settings": True, + "token_expiry": "minutes=30", + "subject": "Password recovery", + "template": "email/password_reset.html", + "activate_user_on_success": True, + "recovery_max_attempts": 5, + "recovery_cache_timeout": "minutes=5" + } + + # Check if stage already exists + status, stages = api_request(base_url, token, '/api/v3/stages/email/') + if status == 200: + for stage in stages.get('results', []): + if stage.get('name') == 'recovery-email': + print(f" ✓ Recovery email stage already exists: {stage['pk']}") + return stage['pk'] + + # Create new stage + status, stage = api_request(base_url, token, '/api/v3/stages/email/', 'POST', stage_data) + if status == 201: + print(f" ✓ Created recovery email stage: {stage['pk']}") + return stage['pk'] + elif status == 400 and 'name' in stage: + # Stage with same name already exists + print(f" ! Stage name already exists, retrieving existing stage...") + status, stages = api_request(base_url, token, '/api/v3/stages/email/') + if status == 200: + for existing_stage in stages.get('results', []): + if existing_stage.get('name') == 'recovery-email': + print(f" ✓ Found existing recovery email stage: {existing_stage['pk']}") + return existing_stage['pk'] + print(f" ✗ Failed to find existing stage after creation conflict") + return None + else: + print(f" ✗ Failed to create recovery email stage: {stage}") + return None + + +def get_existing_stage_uuid(base_url, token, stage_name, stage_type): + """Get UUID of an existing stage""" + status, stages = api_request(base_url, token, f'/api/v3/stages/{stage_type}/') + if status == 200: + for stage in stages.get('results', []): + if stage.get('name') == stage_name: + return stage['pk'] + return None + + +def get_or_create_recovery_flow(base_url, token, stage_ids): + """Create recovery flow with stage bindings""" + print("Creating recovery flow...") + + flow_data = { + "name": "recovery", + "slug": "recovery", + "title": "Recovery", + "designation": "recovery", + "policy_engine_mode": "any", + "compatibility_mode": False, + "layout": "stacked", + "denied_action": "message_continue" + } + + # Check if flow already exists + status, flows = api_request(base_url, token, '/api/v3/flows/instances/') + if status == 200: + for flow in flows.get('results', []): + if flow.get('slug') == 'recovery': + print(f" ✓ Recovery flow already exists: {flow['pk']}") + return flow['pk'] + + # Create new flow + status, flow = api_request(base_url, token, '/api/v3/flows/instances/', 'POST', flow_data) + if status != 201: + print(f" ✗ Failed to create recovery flow: {flow}") + return None + + flow_uuid = flow['pk'] + print(f" ✓ Created recovery flow: {flow_uuid}") + + # Create stage bindings + bindings = [ + {"stage": stage_ids['recovery_identification'], "order": 0}, + {"stage": stage_ids['recovery_email'], "order": 10}, + {"stage": stage_ids['password_change_prompt'], "order": 20}, + {"stage": stage_ids['password_change_write'], "order": 30}, + ] + + for binding in bindings: + binding_data = { + "target": flow_uuid, + "stage": binding['stage'], + "order": binding['order'], + "evaluate_on_plan": False, + "re_evaluate_policies": True, + "policy_engine_mode": "any", + "invalid_response_action": "retry" + } + + status, result = api_request(base_url, token, '/api/v3/flows/bindings/', 'POST', binding_data) + if status == 201: + print(f" ✓ Bound stage {binding['stage']} at order {binding['order']}") + else: + print(f" ✗ Failed to bind stage: {result}") + + return flow_uuid + + +def update_password_change_prompt_stage(base_url, token, stage_uuid, password_complexity_uuid): + """Add password complexity policy to password change prompt stage""" + print("Updating password change prompt stage...") + + # Get current stage configuration + status, stage = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/') + if status != 200: + print(f" ✗ Failed to get stage: {stage}") + return False + + # Add password complexity to validation policies + validation_policies = stage.get('validation_policies', []) + if password_complexity_uuid not in validation_policies: + validation_policies.append(password_complexity_uuid) + + update_data = { + "validation_policies": validation_policies + } + + status, result = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/', 'PATCH', update_data) + if status == 200: + print(f" ✓ Added password complexity policy to validation") + return True + else: + print(f" ✗ Failed to update stage: {result}") + return False + else: + print(f" ✓ Password complexity policy already in validation") + return True + + +def remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, password_stage_uuid): + """Remove separate password stage from authentication flow if it exists""" + print("Checking for separate password stage in authentication flow...") + + # Get all flow bindings + status, bindings_data = api_request(base_url, token, '/api/v3/flows/bindings/') + if status != 200: + print(f" ✗ Failed to get flow bindings: {bindings_data}") + return False + + # Find password stage binding in auth flow + password_binding = None + for binding in bindings_data.get('results', []): + if binding.get('target') == auth_flow_uuid and binding.get('stage') == password_stage_uuid: + password_binding = binding + break + + if not password_binding: + print(f" ✓ No separate password stage found (already removed)") + return True + + # Delete the password stage binding + binding_uuid = password_binding.get('pk') + status, result = api_request(base_url, token, f'/api/v3/flows/bindings/{binding_uuid}/', 'DELETE') + if status == 204 or status == 200: + print(f" ✓ Removed separate password stage from authentication flow") + return True + else: + print(f" ✗ Failed to remove password stage: {result}") + return False + + +def update_authentication_identification_stage(base_url, token, stage_uuid, password_stage_uuid, recovery_flow_uuid): + """Update authentication identification stage with password field and recovery flow""" + print("Updating authentication identification stage...") + + # First get the current stage configuration + status, current_stage = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/') + if status != 200: + print(f" ✗ Failed to get current stage: {current_stage}") + return False + + # Check if already configured + if current_stage.get('password_stage') == password_stage_uuid and current_stage.get('recovery_flow') == recovery_flow_uuid: + print(f" ✓ Authentication identification stage already configured") + return True + + # Update with new values while preserving existing configuration + update_data = { + "name": current_stage.get('name'), + "user_fields": current_stage.get('user_fields', ["username", "email"]), + "password_stage": password_stage_uuid, + "recovery_flow": recovery_flow_uuid, + "case_insensitive_matching": current_stage.get('case_insensitive_matching', True), + "show_matched_user": current_stage.get('show_matched_user', True), + "pretend_user_exists": current_stage.get('pretend_user_exists', True) + } + + status, result = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/', 'PATCH', update_data) + if status == 200: + print(f" ✓ Updated authentication identification stage") + print(f" - Added password field on same page") + print(f" - Added recovery flow link") + return True + else: + print(f" ✗ Failed to update stage: {result}") + return False + + +def main(): + if len(sys.argv) < 3: + print("Usage: python3 create_recovery_flow.py ") + sys.exit(1) + + token = sys.argv[1] + authentik_domain = sys.argv[2] + + # Use internal localhost URL when running inside Authentik container + # This avoids SSL/DNS issues + base_url = "http://localhost:9000" + print(f"Using internal API endpoint: {base_url}") + print(f"External domain: https://{authentik_domain}\n") + + print("=" * 80) + print("Authentik Recovery Flow Automation") + print("=" * 80) + print(f"Target: {base_url}\n") + + # Step 1: Create password complexity policy + password_complexity_uuid = get_or_create_password_policy(base_url, token) + if not password_complexity_uuid: + print("\n✗ Failed to create password complexity policy") + sys.exit(1) + + # Step 2: Create recovery identification stage + recovery_identification_uuid = get_or_create_recovery_identification_stage(base_url, token) + if not recovery_identification_uuid: + print("\n✗ Failed to create recovery identification stage") + sys.exit(1) + + # Step 3: Create recovery email stage + recovery_email_uuid = get_or_create_recovery_email_stage(base_url, token) + if not recovery_email_uuid: + print("\n✗ Failed to create recovery email stage") + sys.exit(1) + + # Step 4: Get existing stage and flow UUIDs + print("\nGetting existing stage and flow UUIDs...") + password_change_prompt_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-prompt', 'prompt/stages') + password_change_write_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-write', 'user_write') + auth_identification_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-identification', 'identification') + auth_password_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-password', 'password') + + # Get default authentication flow UUID + status, flows = api_request(base_url, token, '/api/v3/flows/instances/') + auth_flow_uuid = None + if status == 200: + for flow in flows.get('results', []): + if flow.get('slug') == 'default-authentication-flow': + auth_flow_uuid = flow.get('pk') + break + + if not all([password_change_prompt_uuid, password_change_write_uuid, auth_identification_uuid, auth_password_uuid, auth_flow_uuid]): + print(" ✗ Failed to find all required existing stages and flows") + sys.exit(1) + + print(f" ✓ Found all existing stages and flows") + + # Step 5: Create recovery flow + stage_ids = { + 'recovery_identification': recovery_identification_uuid, + 'recovery_email': recovery_email_uuid, + 'password_change_prompt': password_change_prompt_uuid, + 'password_change_write': password_change_write_uuid + } + + recovery_flow_uuid = get_or_create_recovery_flow(base_url, token, stage_ids) + if not recovery_flow_uuid: + print("\n✗ Failed to create recovery flow") + sys.exit(1) + + # Step 6: Update password change prompt stage + if not update_password_change_prompt_stage(base_url, token, password_change_prompt_uuid, password_complexity_uuid): + print("\n⚠ Warning: Failed to update password change prompt stage") + + # Step 7: Update authentication identification stage + if not update_authentication_identification_stage(base_url, token, auth_identification_uuid, auth_password_uuid, recovery_flow_uuid): + print("\n⚠ Warning: Failed to update authentication identification stage") + + # Step 8: Remove separate password stage from authentication flow + if not remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, auth_password_uuid): + print("\n⚠ Warning: Failed to remove separate password stage (may not exist)") + + # Success! + print("\n" + "=" * 80) + print("✓ Recovery Flow Configuration Complete!") + print("=" * 80) + print(f"\nRecovery Flow UUID: {recovery_flow_uuid}") + print(f"Recovery URL: https://{authentik_domain}/if/flow/recovery/") + print(f"\nFeatures enabled:") + print(" ✓ Password complexity policy (12 chars, mixed case, digit, symbol)") + print(" ✓ Recovery email with 30-minute token") + print(" ✓ Password + username on same login page") + print(" ✓ 'Forgot password?' link on login page") + print("\nTest the recovery flow:") + print(f" 1. Visit: https://{authentik_domain}/if/flow/default-authentication-flow/") + print(" 2. Click 'Forgot password?' link") + print(" 3. Enter username or email") + print(" 4. Check email for recovery link") + print("=" * 80) + + # Output JSON for Ansible + result = { + "success": True, + "recovery_flow_uuid": recovery_flow_uuid, + "password_complexity_uuid": password_complexity_uuid, + "recovery_url": f"https://{authentik_domain}/if/flow/recovery/" + } + print("\n" + json.dumps(result)) + + +if __name__ == "__main__": + main()