#!/usr/bin/env python3 """ Authentik Recovery Flow Automation Script This script creates a complete password recovery flow in Authentik with: - Password complexity policy (12 chars, mixed case, digit, symbol) - Recovery identification stage (username/email) - Recovery email stage (sends recovery token) - Password change stages (with validation) - Integration with default authentication flow - Brand default recovery flow configuration Usage: python3 create_recovery_flow.py """ import sys import json import urllib.request import urllib.error def api_request(base_url, token, path, method='GET', data=None): """Make an API request to Authentik""" url = f"{base_url}{path}" headers = { 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' } request_data = json.dumps(data).encode() if data else None req = urllib.request.Request(url, data=request_data, headers=headers, method=method) try: with urllib.request.urlopen(req, timeout=30) as resp: body = resp.read().decode() if body: return resp.status, json.loads(body) return resp.status, {} except urllib.error.HTTPError as e: error_body = e.read().decode() try: error_data = json.loads(error_body) if error_body else {'error': 'Empty error response'} except: error_data = {'error': error_body or 'Unknown error'} return e.code, error_data except Exception as e: return 0, {'error': str(e)} def get_or_create_password_policy(base_url, token): """Create password complexity policy""" print("Checking for password complexity policy...") policy_data = { "name": "password-complexity", "password_field": "password", "amount_digits": 1, "amount_uppercase": 1, "amount_lowercase": 1, "amount_symbols": 1, "length_min": 12, "symbol_charset": "!\\\"#$%&'()*+,-./:;<=>?@[]^_`{|}~", "error_message": "Enter a minimum of 12 characters, with at least 1 lowercase, uppercase, digit and symbol", "check_static_rules": True, "check_have_i_been_pwned": True, "check_zxcvbn": True, "hibp_allowed_count": 0, "zxcvbn_score_threshold": 2 } # Check if policy already exists status, policies = api_request(base_url, token, '/api/v3/policies/password/') print(f" Initial check status: {status}") if status == 200: results = policies.get('results', []) print(f" Found {len(results)} existing policies") for policy in results: policy_name = policy.get('name') print(f" - {policy_name}") if policy_name == 'password-complexity': print(f" ✓ Password policy already exists: {policy['pk']}") return policy['pk'] else: print(f" Initial check failed: {policies}") # Create new policy status, policy = api_request(base_url, token, '/api/v3/policies/password/', 'POST', policy_data) if status == 201: print(f" ✓ Created password policy: {policy['pk']}") return policy['pk'] elif status == 400 and 'name' in policy: # Policy with same name already exists, search for it again print(f" ! Policy name already exists, retrieving existing policy...") status, policies = api_request(base_url, token, '/api/v3/policies/password/') if status == 200: for existing_policy in policies.get('results', []): if existing_policy.get('name') == 'password-complexity': print(f" ✓ Found existing password policy: {existing_policy['pk']}") return existing_policy['pk'] print(f" ✗ Failed to find existing policy after creation conflict") return None else: print(f" ✗ Failed to create password policy: {policy}") return None def get_or_create_recovery_identification_stage(base_url, token): """Create recovery identification stage""" print("Creating recovery identification stage...") stage_data = { "name": "recovery-authentication-identification", "user_fields": ["username", "email"], "password_stage": None, "case_insensitive_matching": True, "show_matched_user": True, "pretend_user_exists": True, "enable_remember_me": False } # Check if stage already exists status, stages = api_request(base_url, token, '/api/v3/stages/identification/') if status == 200: for stage in stages.get('results', []): if stage.get('name') == 'recovery-authentication-identification': print(f" ✓ Recovery identification stage already exists: {stage['pk']}") return stage['pk'] # Create new stage status, stage = api_request(base_url, token, '/api/v3/stages/identification/', 'POST', stage_data) if status == 201: print(f" ✓ Created recovery identification stage: {stage['pk']}") return stage['pk'] elif status == 400 and 'name' in stage: # Stage with same name already exists print(f" ! Stage name already exists, retrieving existing stage...") status, stages = api_request(base_url, token, '/api/v3/stages/identification/') if status == 200: for existing_stage in stages.get('results', []): if existing_stage.get('name') == 'recovery-authentication-identification': print(f" ✓ Found existing recovery identification stage: {existing_stage['pk']}") return existing_stage['pk'] print(f" ✗ Failed to find existing stage after creation conflict") return None else: print(f" ✗ Failed to create recovery identification stage: {stage}") return None def get_or_create_recovery_email_stage(base_url, token): """Create recovery email stage""" print("Creating recovery email stage...") stage_data = { "name": "recovery-email", "use_global_settings": True, "token_expiry": "minutes=30", "subject": "Password recovery", "template": "email/password_reset.html", "activate_user_on_success": True, "recovery_max_attempts": 5, "recovery_cache_timeout": "minutes=5" } # Check if stage already exists status, stages = api_request(base_url, token, '/api/v3/stages/email/') if status == 200: for stage in stages.get('results', []): if stage.get('name') == 'recovery-email': print(f" ✓ Recovery email stage already exists: {stage['pk']}") return stage['pk'] # Create new stage status, stage = api_request(base_url, token, '/api/v3/stages/email/', 'POST', stage_data) if status == 201: print(f" ✓ Created recovery email stage: {stage['pk']}") return stage['pk'] elif status == 400 and 'name' in stage: # Stage with same name already exists print(f" ! Stage name already exists, retrieving existing stage...") status, stages = api_request(base_url, token, '/api/v3/stages/email/') if status == 200: for existing_stage in stages.get('results', []): if existing_stage.get('name') == 'recovery-email': print(f" ✓ Found existing recovery email stage: {existing_stage['pk']}") return existing_stage['pk'] print(f" ✗ Failed to find existing stage after creation conflict") return None else: print(f" ✗ Failed to create recovery email stage: {stage}") return None def get_existing_stage_uuid(base_url, token, stage_name, stage_type): """Get UUID of an existing stage""" status, stages = api_request(base_url, token, f'/api/v3/stages/{stage_type}/') if status == 200: for stage in stages.get('results', []): if stage.get('name') == stage_name: return stage['pk'] return None def get_or_create_recovery_flow(base_url, token, stage_ids): """Create recovery flow with stage bindings""" print("Creating recovery flow...") flow_data = { "name": "recovery", "slug": "recovery", "title": "Recovery", "designation": "recovery", "policy_engine_mode": "any", "compatibility_mode": False, "layout": "stacked", "denied_action": "message_continue" } # Check if flow already exists status, flows = api_request(base_url, token, '/api/v3/flows/instances/') if status == 200: for flow in flows.get('results', []): if flow.get('slug') == 'recovery': print(f" ✓ Recovery flow already exists: {flow['pk']}") return flow['pk'] # Create new flow status, flow = api_request(base_url, token, '/api/v3/flows/instances/', 'POST', flow_data) if status != 201: print(f" ✗ Failed to create recovery flow: {flow}") return None flow_uuid = flow['pk'] print(f" ✓ Created recovery flow: {flow_uuid}") # Create stage bindings bindings = [ {"stage": stage_ids['recovery_identification'], "order": 0}, {"stage": stage_ids['recovery_email'], "order": 10}, {"stage": stage_ids['password_change_prompt'], "order": 20}, {"stage": stage_ids['password_change_write'], "order": 30}, ] for binding in bindings: binding_data = { "target": flow_uuid, "stage": binding['stage'], "order": binding['order'], "evaluate_on_plan": False, "re_evaluate_policies": True, "policy_engine_mode": "any", "invalid_response_action": "retry" } status, result = api_request(base_url, token, '/api/v3/flows/bindings/', 'POST', binding_data) if status == 201: print(f" ✓ Bound stage {binding['stage']} at order {binding['order']}") else: print(f" ✗ Failed to bind stage: {result}") return flow_uuid def update_password_change_prompt_stage(base_url, token, stage_uuid, password_complexity_uuid): """Add password complexity policy to password change prompt stage""" print("Updating password change prompt stage...") # Get current stage configuration status, stage = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/') if status != 200: print(f" ✗ Failed to get stage: {stage}") return False # Add password complexity to validation policies validation_policies = stage.get('validation_policies', []) if password_complexity_uuid not in validation_policies: validation_policies.append(password_complexity_uuid) update_data = { "validation_policies": validation_policies } status, result = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/', 'PATCH', update_data) if status == 200: print(f" ✓ Added password complexity policy to validation") return True else: print(f" ✗ Failed to update stage: {result}") return False else: print(f" ✓ Password complexity policy already in validation") return True def remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, password_stage_uuid): """Remove separate password stage from authentication flow if it exists""" print("Checking for separate password stage in authentication flow...") # Get all flow bindings status, bindings_data = api_request(base_url, token, '/api/v3/flows/bindings/') if status != 200: print(f" ✗ Failed to get flow bindings: {bindings_data}") return False # Find password stage binding in auth flow password_binding = None for binding in bindings_data.get('results', []): if binding.get('target') == auth_flow_uuid and binding.get('stage') == password_stage_uuid: password_binding = binding break if not password_binding: print(f" ✓ No separate password stage found (already removed)") return True # Delete the password stage binding binding_uuid = password_binding.get('pk') status, result = api_request(base_url, token, f'/api/v3/flows/bindings/{binding_uuid}/', 'DELETE') if status == 204 or status == 200: print(f" ✓ Removed separate password stage from authentication flow") return True else: print(f" ✗ Failed to remove password stage: {result}") return False def update_authentication_identification_stage(base_url, token, stage_uuid, password_stage_uuid, recovery_flow_uuid): """Update authentication identification stage with password field and recovery flow""" print("Updating authentication identification stage...") # First get the current stage configuration status, current_stage = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/') if status != 200: print(f" ✗ Failed to get current stage: {current_stage}") return False # Check if already configured if current_stage.get('password_stage') == password_stage_uuid and current_stage.get('recovery_flow') == recovery_flow_uuid: print(f" ✓ Authentication identification stage already configured") return True # Update with new values while preserving existing configuration update_data = { "name": current_stage.get('name'), "user_fields": current_stage.get('user_fields', ["username", "email"]), "password_stage": password_stage_uuid, "recovery_flow": recovery_flow_uuid, "case_insensitive_matching": current_stage.get('case_insensitive_matching', True), "show_matched_user": current_stage.get('show_matched_user', True), "pretend_user_exists": current_stage.get('pretend_user_exists', True) } status, result = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/', 'PATCH', update_data) if status == 200: print(f" ✓ Updated authentication identification stage") print(f" - Added password field on same page") print(f" - Added recovery flow link") return True else: print(f" ✗ Failed to update stage: {result}") return False def update_brand_recovery_flow(base_url, token, recovery_flow_uuid): """Update the default brand to use the recovery flow""" print("Updating brand default recovery flow...") # Get the default brand (authentik has one brand by default) status, brands = api_request(base_url, token, '/api/v3/core/brands/') if status != 200: print(f" ✗ Failed to get brands: {brands}") return False results = brands.get('results', []) if not results: print(f" ✗ No brands found") return False # Use the first/default brand brand = results[0] brand_uuid = brand.get('brand_uuid') # Check if already configured if brand.get('flow_recovery') == recovery_flow_uuid: print(f" ✓ Brand recovery flow already configured") return True # Update the brand with recovery flow update_data = { "domain": brand.get('domain'), "flow_recovery": recovery_flow_uuid } status, result = api_request(base_url, token, f'/api/v3/core/brands/{brand_uuid}/', 'PATCH', update_data) if status == 200: print(f" ✓ Updated brand default recovery flow") return True else: print(f" ✗ Failed to update brand: {result}") return False def main(): if len(sys.argv) < 3: print("Usage: python3 create_recovery_flow.py ") sys.exit(1) token = sys.argv[1] authentik_domain = sys.argv[2] # Use internal localhost URL when running inside Authentik container # This avoids SSL/DNS issues base_url = "http://localhost:9000" print(f"Using internal API endpoint: {base_url}") print(f"External domain: https://{authentik_domain}\n") print("=" * 80) print("Authentik Recovery Flow Automation") print("=" * 80) print(f"Target: {base_url}\n") # Step 1: Create password complexity policy password_complexity_uuid = get_or_create_password_policy(base_url, token) if not password_complexity_uuid: print("\n✗ Failed to create password complexity policy") sys.exit(1) # Step 2: Create recovery identification stage recovery_identification_uuid = get_or_create_recovery_identification_stage(base_url, token) if not recovery_identification_uuid: print("\n✗ Failed to create recovery identification stage") sys.exit(1) # Step 3: Create recovery email stage recovery_email_uuid = get_or_create_recovery_email_stage(base_url, token) if not recovery_email_uuid: print("\n✗ Failed to create recovery email stage") sys.exit(1) # Step 4: Get existing stage and flow UUIDs print("\nGetting existing stage and flow UUIDs...") password_change_prompt_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-prompt', 'prompt/stages') password_change_write_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-write', 'user_write') auth_identification_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-identification', 'identification') auth_password_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-password', 'password') # Get default authentication flow UUID status, flows = api_request(base_url, token, '/api/v3/flows/instances/') auth_flow_uuid = None if status == 200: for flow in flows.get('results', []): if flow.get('slug') == 'default-authentication-flow': auth_flow_uuid = flow.get('pk') break if not all([password_change_prompt_uuid, password_change_write_uuid, auth_identification_uuid, auth_password_uuid, auth_flow_uuid]): print(" ✗ Failed to find all required existing stages and flows") sys.exit(1) print(f" ✓ Found all existing stages and flows") # Step 5: Create recovery flow stage_ids = { 'recovery_identification': recovery_identification_uuid, 'recovery_email': recovery_email_uuid, 'password_change_prompt': password_change_prompt_uuid, 'password_change_write': password_change_write_uuid } recovery_flow_uuid = get_or_create_recovery_flow(base_url, token, stage_ids) if not recovery_flow_uuid: print("\n✗ Failed to create recovery flow") sys.exit(1) # Step 6: Update password change prompt stage if not update_password_change_prompt_stage(base_url, token, password_change_prompt_uuid, password_complexity_uuid): print("\n⚠ Warning: Failed to update password change prompt stage") # Step 7: Update authentication identification stage if not update_authentication_identification_stage(base_url, token, auth_identification_uuid, auth_password_uuid, recovery_flow_uuid): print("\n⚠ Warning: Failed to update authentication identification stage") # Step 8: Remove separate password stage from authentication flow if not remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, auth_password_uuid): print("\n⚠ Warning: Failed to remove separate password stage (may not exist)") # Step 9: Update brand default recovery flow if not update_brand_recovery_flow(base_url, token, recovery_flow_uuid): print("\n⚠ Warning: Failed to update brand recovery flow (non-critical)") # Success! print("\n" + "=" * 80) print("✓ Recovery Flow Configuration Complete!") print("=" * 80) print(f"\nRecovery Flow UUID: {recovery_flow_uuid}") print(f"Recovery URL: https://{authentik_domain}/if/flow/recovery/") print(f"\nFeatures enabled:") print(" ✓ Password complexity policy (12 chars, mixed case, digit, symbol)") print(" ✓ Recovery email with 30-minute token") print(" ✓ Password + username on same login page") print(" ✓ 'Forgot password?' link on login page") print(" ✓ Brand default recovery flow configured") print("\nTest the recovery flow:") print(f" 1. Visit: https://{authentik_domain}/if/flow/default-authentication-flow/") print(" 2. Click 'Forgot password?' link") print(" 3. Enter username or email") print(" 4. Check email for recovery link") print("=" * 80) # Output JSON for Ansible result = { "success": True, "recovery_flow_uuid": recovery_flow_uuid, "password_complexity_uuid": password_complexity_uuid, "recovery_url": f"https://{authentik_domain}/if/flow/recovery/" } print("\n" + json.dumps(result)) if __name__ == "__main__": main()