feat: Add Python script for automated recovery flow creation
Add create_recovery_flow.py script that configures Authentik password recovery flow via REST API. This script is called by recovery.yml during deployment. The script creates: - Password complexity policy (12+ chars, mixed case, digit, symbol) - Recovery identification stage (username/email input) - Recovery email stage (sends recovery token with 30min expiry) - Recovery flow with proper stage bindings - Updates authentication flow to show "Forgot password?" link Uses internal Authentik API (localhost:9000) to avoid SSL/DNS issues during initial setup. Works entirely via API calls, replacing the unreliable blueprint-based approach. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
ecc09127ef
commit
f40acee0a3
1 changed files with 477 additions and 0 deletions
477
ansible/roles/authentik/files/create_recovery_flow.py
Normal file
477
ansible/roles/authentik/files/create_recovery_flow.py
Normal file
|
|
@ -0,0 +1,477 @@
|
|||
#!/usr/bin/env python3
|
||||
"""
|
||||
Authentik Recovery Flow Automation Script
|
||||
|
||||
This script creates a complete password recovery flow in Authentik with:
|
||||
- Password complexity policy (12 chars, mixed case, digit, symbol)
|
||||
- Recovery identification stage (username/email)
|
||||
- Recovery email stage (sends recovery token)
|
||||
- Password change stages (with validation)
|
||||
- Integration with default authentication flow
|
||||
|
||||
Usage:
|
||||
python3 create_recovery_flow.py <api_token> <authentik_domain>
|
||||
"""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
|
||||
|
||||
def api_request(base_url, token, path, method='GET', data=None):
|
||||
"""Make an API request to Authentik"""
|
||||
url = f"{base_url}{path}"
|
||||
headers = {
|
||||
'Authorization': f'Bearer {token}',
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
request_data = json.dumps(data).encode() if data else None
|
||||
req = urllib.request.Request(url, data=request_data, headers=headers, method=method)
|
||||
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
body = resp.read().decode()
|
||||
if body:
|
||||
return resp.status, json.loads(body)
|
||||
return resp.status, {}
|
||||
except urllib.error.HTTPError as e:
|
||||
error_body = e.read().decode()
|
||||
try:
|
||||
error_data = json.loads(error_body) if error_body else {'error': 'Empty error response'}
|
||||
except:
|
||||
error_data = {'error': error_body or 'Unknown error'}
|
||||
return e.code, error_data
|
||||
except Exception as e:
|
||||
return 0, {'error': str(e)}
|
||||
|
||||
|
||||
def get_or_create_password_policy(base_url, token):
|
||||
"""Create password complexity policy"""
|
||||
print("Checking for password complexity policy...")
|
||||
|
||||
policy_data = {
|
||||
"name": "password-complexity",
|
||||
"password_field": "password",
|
||||
"amount_digits": 1,
|
||||
"amount_uppercase": 1,
|
||||
"amount_lowercase": 1,
|
||||
"amount_symbols": 1,
|
||||
"length_min": 12,
|
||||
"symbol_charset": "!\\\"#$%&'()*+,-./:;<=>?@[]^_`{|}~",
|
||||
"error_message": "Enter a minimum of 12 characters, with at least 1 lowercase, uppercase, digit and symbol",
|
||||
"check_static_rules": True,
|
||||
"check_have_i_been_pwned": True,
|
||||
"check_zxcvbn": True,
|
||||
"hibp_allowed_count": 0,
|
||||
"zxcvbn_score_threshold": 2
|
||||
}
|
||||
|
||||
# Check if policy already exists
|
||||
status, policies = api_request(base_url, token, '/api/v3/policies/password/')
|
||||
print(f" Initial check status: {status}")
|
||||
if status == 200:
|
||||
results = policies.get('results', [])
|
||||
print(f" Found {len(results)} existing policies")
|
||||
for policy in results:
|
||||
policy_name = policy.get('name')
|
||||
print(f" - {policy_name}")
|
||||
if policy_name == 'password-complexity':
|
||||
print(f" ✓ Password policy already exists: {policy['pk']}")
|
||||
return policy['pk']
|
||||
else:
|
||||
print(f" Initial check failed: {policies}")
|
||||
|
||||
# Create new policy
|
||||
status, policy = api_request(base_url, token, '/api/v3/policies/password/', 'POST', policy_data)
|
||||
if status == 201:
|
||||
print(f" ✓ Created password policy: {policy['pk']}")
|
||||
return policy['pk']
|
||||
elif status == 400 and 'name' in policy:
|
||||
# Policy with same name already exists, search for it again
|
||||
print(f" ! Policy name already exists, retrieving existing policy...")
|
||||
status, policies = api_request(base_url, token, '/api/v3/policies/password/')
|
||||
if status == 200:
|
||||
for existing_policy in policies.get('results', []):
|
||||
if existing_policy.get('name') == 'password-complexity':
|
||||
print(f" ✓ Found existing password policy: {existing_policy['pk']}")
|
||||
return existing_policy['pk']
|
||||
print(f" ✗ Failed to find existing policy after creation conflict")
|
||||
return None
|
||||
else:
|
||||
print(f" ✗ Failed to create password policy: {policy}")
|
||||
return None
|
||||
|
||||
|
||||
def get_or_create_recovery_identification_stage(base_url, token):
|
||||
"""Create recovery identification stage"""
|
||||
print("Creating recovery identification stage...")
|
||||
|
||||
stage_data = {
|
||||
"name": "recovery-authentication-identification",
|
||||
"user_fields": ["username", "email"],
|
||||
"password_stage": None,
|
||||
"case_insensitive_matching": True,
|
||||
"show_matched_user": True,
|
||||
"pretend_user_exists": True,
|
||||
"enable_remember_me": False
|
||||
}
|
||||
|
||||
# Check if stage already exists
|
||||
status, stages = api_request(base_url, token, '/api/v3/stages/identification/')
|
||||
if status == 200:
|
||||
for stage in stages.get('results', []):
|
||||
if stage.get('name') == 'recovery-authentication-identification':
|
||||
print(f" ✓ Recovery identification stage already exists: {stage['pk']}")
|
||||
return stage['pk']
|
||||
|
||||
# Create new stage
|
||||
status, stage = api_request(base_url, token, '/api/v3/stages/identification/', 'POST', stage_data)
|
||||
if status == 201:
|
||||
print(f" ✓ Created recovery identification stage: {stage['pk']}")
|
||||
return stage['pk']
|
||||
elif status == 400 and 'name' in stage:
|
||||
# Stage with same name already exists
|
||||
print(f" ! Stage name already exists, retrieving existing stage...")
|
||||
status, stages = api_request(base_url, token, '/api/v3/stages/identification/')
|
||||
if status == 200:
|
||||
for existing_stage in stages.get('results', []):
|
||||
if existing_stage.get('name') == 'recovery-authentication-identification':
|
||||
print(f" ✓ Found existing recovery identification stage: {existing_stage['pk']}")
|
||||
return existing_stage['pk']
|
||||
print(f" ✗ Failed to find existing stage after creation conflict")
|
||||
return None
|
||||
else:
|
||||
print(f" ✗ Failed to create recovery identification stage: {stage}")
|
||||
return None
|
||||
|
||||
|
||||
def get_or_create_recovery_email_stage(base_url, token):
|
||||
"""Create recovery email stage"""
|
||||
print("Creating recovery email stage...")
|
||||
|
||||
stage_data = {
|
||||
"name": "recovery-email",
|
||||
"use_global_settings": True,
|
||||
"token_expiry": "minutes=30",
|
||||
"subject": "Password recovery",
|
||||
"template": "email/password_reset.html",
|
||||
"activate_user_on_success": True,
|
||||
"recovery_max_attempts": 5,
|
||||
"recovery_cache_timeout": "minutes=5"
|
||||
}
|
||||
|
||||
# Check if stage already exists
|
||||
status, stages = api_request(base_url, token, '/api/v3/stages/email/')
|
||||
if status == 200:
|
||||
for stage in stages.get('results', []):
|
||||
if stage.get('name') == 'recovery-email':
|
||||
print(f" ✓ Recovery email stage already exists: {stage['pk']}")
|
||||
return stage['pk']
|
||||
|
||||
# Create new stage
|
||||
status, stage = api_request(base_url, token, '/api/v3/stages/email/', 'POST', stage_data)
|
||||
if status == 201:
|
||||
print(f" ✓ Created recovery email stage: {stage['pk']}")
|
||||
return stage['pk']
|
||||
elif status == 400 and 'name' in stage:
|
||||
# Stage with same name already exists
|
||||
print(f" ! Stage name already exists, retrieving existing stage...")
|
||||
status, stages = api_request(base_url, token, '/api/v3/stages/email/')
|
||||
if status == 200:
|
||||
for existing_stage in stages.get('results', []):
|
||||
if existing_stage.get('name') == 'recovery-email':
|
||||
print(f" ✓ Found existing recovery email stage: {existing_stage['pk']}")
|
||||
return existing_stage['pk']
|
||||
print(f" ✗ Failed to find existing stage after creation conflict")
|
||||
return None
|
||||
else:
|
||||
print(f" ✗ Failed to create recovery email stage: {stage}")
|
||||
return None
|
||||
|
||||
|
||||
def get_existing_stage_uuid(base_url, token, stage_name, stage_type):
|
||||
"""Get UUID of an existing stage"""
|
||||
status, stages = api_request(base_url, token, f'/api/v3/stages/{stage_type}/')
|
||||
if status == 200:
|
||||
for stage in stages.get('results', []):
|
||||
if stage.get('name') == stage_name:
|
||||
return stage['pk']
|
||||
return None
|
||||
|
||||
|
||||
def get_or_create_recovery_flow(base_url, token, stage_ids):
|
||||
"""Create recovery flow with stage bindings"""
|
||||
print("Creating recovery flow...")
|
||||
|
||||
flow_data = {
|
||||
"name": "recovery",
|
||||
"slug": "recovery",
|
||||
"title": "Recovery",
|
||||
"designation": "recovery",
|
||||
"policy_engine_mode": "any",
|
||||
"compatibility_mode": False,
|
||||
"layout": "stacked",
|
||||
"denied_action": "message_continue"
|
||||
}
|
||||
|
||||
# Check if flow already exists
|
||||
status, flows = api_request(base_url, token, '/api/v3/flows/instances/')
|
||||
if status == 200:
|
||||
for flow in flows.get('results', []):
|
||||
if flow.get('slug') == 'recovery':
|
||||
print(f" ✓ Recovery flow already exists: {flow['pk']}")
|
||||
return flow['pk']
|
||||
|
||||
# Create new flow
|
||||
status, flow = api_request(base_url, token, '/api/v3/flows/instances/', 'POST', flow_data)
|
||||
if status != 201:
|
||||
print(f" ✗ Failed to create recovery flow: {flow}")
|
||||
return None
|
||||
|
||||
flow_uuid = flow['pk']
|
||||
print(f" ✓ Created recovery flow: {flow_uuid}")
|
||||
|
||||
# Create stage bindings
|
||||
bindings = [
|
||||
{"stage": stage_ids['recovery_identification'], "order": 0},
|
||||
{"stage": stage_ids['recovery_email'], "order": 10},
|
||||
{"stage": stage_ids['password_change_prompt'], "order": 20},
|
||||
{"stage": stage_ids['password_change_write'], "order": 30},
|
||||
]
|
||||
|
||||
for binding in bindings:
|
||||
binding_data = {
|
||||
"target": flow_uuid,
|
||||
"stage": binding['stage'],
|
||||
"order": binding['order'],
|
||||
"evaluate_on_plan": False,
|
||||
"re_evaluate_policies": True,
|
||||
"policy_engine_mode": "any",
|
||||
"invalid_response_action": "retry"
|
||||
}
|
||||
|
||||
status, result = api_request(base_url, token, '/api/v3/flows/bindings/', 'POST', binding_data)
|
||||
if status == 201:
|
||||
print(f" ✓ Bound stage {binding['stage']} at order {binding['order']}")
|
||||
else:
|
||||
print(f" ✗ Failed to bind stage: {result}")
|
||||
|
||||
return flow_uuid
|
||||
|
||||
|
||||
def update_password_change_prompt_stage(base_url, token, stage_uuid, password_complexity_uuid):
|
||||
"""Add password complexity policy to password change prompt stage"""
|
||||
print("Updating password change prompt stage...")
|
||||
|
||||
# Get current stage configuration
|
||||
status, stage = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/')
|
||||
if status != 200:
|
||||
print(f" ✗ Failed to get stage: {stage}")
|
||||
return False
|
||||
|
||||
# Add password complexity to validation policies
|
||||
validation_policies = stage.get('validation_policies', [])
|
||||
if password_complexity_uuid not in validation_policies:
|
||||
validation_policies.append(password_complexity_uuid)
|
||||
|
||||
update_data = {
|
||||
"validation_policies": validation_policies
|
||||
}
|
||||
|
||||
status, result = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/', 'PATCH', update_data)
|
||||
if status == 200:
|
||||
print(f" ✓ Added password complexity policy to validation")
|
||||
return True
|
||||
else:
|
||||
print(f" ✗ Failed to update stage: {result}")
|
||||
return False
|
||||
else:
|
||||
print(f" ✓ Password complexity policy already in validation")
|
||||
return True
|
||||
|
||||
|
||||
def remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, password_stage_uuid):
|
||||
"""Remove separate password stage from authentication flow if it exists"""
|
||||
print("Checking for separate password stage in authentication flow...")
|
||||
|
||||
# Get all flow bindings
|
||||
status, bindings_data = api_request(base_url, token, '/api/v3/flows/bindings/')
|
||||
if status != 200:
|
||||
print(f" ✗ Failed to get flow bindings: {bindings_data}")
|
||||
return False
|
||||
|
||||
# Find password stage binding in auth flow
|
||||
password_binding = None
|
||||
for binding in bindings_data.get('results', []):
|
||||
if binding.get('target') == auth_flow_uuid and binding.get('stage') == password_stage_uuid:
|
||||
password_binding = binding
|
||||
break
|
||||
|
||||
if not password_binding:
|
||||
print(f" ✓ No separate password stage found (already removed)")
|
||||
return True
|
||||
|
||||
# Delete the password stage binding
|
||||
binding_uuid = password_binding.get('pk')
|
||||
status, result = api_request(base_url, token, f'/api/v3/flows/bindings/{binding_uuid}/', 'DELETE')
|
||||
if status == 204 or status == 200:
|
||||
print(f" ✓ Removed separate password stage from authentication flow")
|
||||
return True
|
||||
else:
|
||||
print(f" ✗ Failed to remove password stage: {result}")
|
||||
return False
|
||||
|
||||
|
||||
def update_authentication_identification_stage(base_url, token, stage_uuid, password_stage_uuid, recovery_flow_uuid):
|
||||
"""Update authentication identification stage with password field and recovery flow"""
|
||||
print("Updating authentication identification stage...")
|
||||
|
||||
# First get the current stage configuration
|
||||
status, current_stage = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/')
|
||||
if status != 200:
|
||||
print(f" ✗ Failed to get current stage: {current_stage}")
|
||||
return False
|
||||
|
||||
# Check if already configured
|
||||
if current_stage.get('password_stage') == password_stage_uuid and current_stage.get('recovery_flow') == recovery_flow_uuid:
|
||||
print(f" ✓ Authentication identification stage already configured")
|
||||
return True
|
||||
|
||||
# Update with new values while preserving existing configuration
|
||||
update_data = {
|
||||
"name": current_stage.get('name'),
|
||||
"user_fields": current_stage.get('user_fields', ["username", "email"]),
|
||||
"password_stage": password_stage_uuid,
|
||||
"recovery_flow": recovery_flow_uuid,
|
||||
"case_insensitive_matching": current_stage.get('case_insensitive_matching', True),
|
||||
"show_matched_user": current_stage.get('show_matched_user', True),
|
||||
"pretend_user_exists": current_stage.get('pretend_user_exists', True)
|
||||
}
|
||||
|
||||
status, result = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/', 'PATCH', update_data)
|
||||
if status == 200:
|
||||
print(f" ✓ Updated authentication identification stage")
|
||||
print(f" - Added password field on same page")
|
||||
print(f" - Added recovery flow link")
|
||||
return True
|
||||
else:
|
||||
print(f" ✗ Failed to update stage: {result}")
|
||||
return False
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) < 3:
|
||||
print("Usage: python3 create_recovery_flow.py <api_token> <authentik_domain>")
|
||||
sys.exit(1)
|
||||
|
||||
token = sys.argv[1]
|
||||
authentik_domain = sys.argv[2]
|
||||
|
||||
# Use internal localhost URL when running inside Authentik container
|
||||
# This avoids SSL/DNS issues
|
||||
base_url = "http://localhost:9000"
|
||||
print(f"Using internal API endpoint: {base_url}")
|
||||
print(f"External domain: https://{authentik_domain}\n")
|
||||
|
||||
print("=" * 80)
|
||||
print("Authentik Recovery Flow Automation")
|
||||
print("=" * 80)
|
||||
print(f"Target: {base_url}\n")
|
||||
|
||||
# Step 1: Create password complexity policy
|
||||
password_complexity_uuid = get_or_create_password_policy(base_url, token)
|
||||
if not password_complexity_uuid:
|
||||
print("\n✗ Failed to create password complexity policy")
|
||||
sys.exit(1)
|
||||
|
||||
# Step 2: Create recovery identification stage
|
||||
recovery_identification_uuid = get_or_create_recovery_identification_stage(base_url, token)
|
||||
if not recovery_identification_uuid:
|
||||
print("\n✗ Failed to create recovery identification stage")
|
||||
sys.exit(1)
|
||||
|
||||
# Step 3: Create recovery email stage
|
||||
recovery_email_uuid = get_or_create_recovery_email_stage(base_url, token)
|
||||
if not recovery_email_uuid:
|
||||
print("\n✗ Failed to create recovery email stage")
|
||||
sys.exit(1)
|
||||
|
||||
# Step 4: Get existing stage and flow UUIDs
|
||||
print("\nGetting existing stage and flow UUIDs...")
|
||||
password_change_prompt_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-prompt', 'prompt/stages')
|
||||
password_change_write_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-write', 'user_write')
|
||||
auth_identification_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-identification', 'identification')
|
||||
auth_password_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-password', 'password')
|
||||
|
||||
# Get default authentication flow UUID
|
||||
status, flows = api_request(base_url, token, '/api/v3/flows/instances/')
|
||||
auth_flow_uuid = None
|
||||
if status == 200:
|
||||
for flow in flows.get('results', []):
|
||||
if flow.get('slug') == 'default-authentication-flow':
|
||||
auth_flow_uuid = flow.get('pk')
|
||||
break
|
||||
|
||||
if not all([password_change_prompt_uuid, password_change_write_uuid, auth_identification_uuid, auth_password_uuid, auth_flow_uuid]):
|
||||
print(" ✗ Failed to find all required existing stages and flows")
|
||||
sys.exit(1)
|
||||
|
||||
print(f" ✓ Found all existing stages and flows")
|
||||
|
||||
# Step 5: Create recovery flow
|
||||
stage_ids = {
|
||||
'recovery_identification': recovery_identification_uuid,
|
||||
'recovery_email': recovery_email_uuid,
|
||||
'password_change_prompt': password_change_prompt_uuid,
|
||||
'password_change_write': password_change_write_uuid
|
||||
}
|
||||
|
||||
recovery_flow_uuid = get_or_create_recovery_flow(base_url, token, stage_ids)
|
||||
if not recovery_flow_uuid:
|
||||
print("\n✗ Failed to create recovery flow")
|
||||
sys.exit(1)
|
||||
|
||||
# Step 6: Update password change prompt stage
|
||||
if not update_password_change_prompt_stage(base_url, token, password_change_prompt_uuid, password_complexity_uuid):
|
||||
print("\n⚠ Warning: Failed to update password change prompt stage")
|
||||
|
||||
# Step 7: Update authentication identification stage
|
||||
if not update_authentication_identification_stage(base_url, token, auth_identification_uuid, auth_password_uuid, recovery_flow_uuid):
|
||||
print("\n⚠ Warning: Failed to update authentication identification stage")
|
||||
|
||||
# Step 8: Remove separate password stage from authentication flow
|
||||
if not remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, auth_password_uuid):
|
||||
print("\n⚠ Warning: Failed to remove separate password stage (may not exist)")
|
||||
|
||||
# Success!
|
||||
print("\n" + "=" * 80)
|
||||
print("✓ Recovery Flow Configuration Complete!")
|
||||
print("=" * 80)
|
||||
print(f"\nRecovery Flow UUID: {recovery_flow_uuid}")
|
||||
print(f"Recovery URL: https://{authentik_domain}/if/flow/recovery/")
|
||||
print(f"\nFeatures enabled:")
|
||||
print(" ✓ Password complexity policy (12 chars, mixed case, digit, symbol)")
|
||||
print(" ✓ Recovery email with 30-minute token")
|
||||
print(" ✓ Password + username on same login page")
|
||||
print(" ✓ 'Forgot password?' link on login page")
|
||||
print("\nTest the recovery flow:")
|
||||
print(f" 1. Visit: https://{authentik_domain}/if/flow/default-authentication-flow/")
|
||||
print(" 2. Click 'Forgot password?' link")
|
||||
print(" 3. Enter username or email")
|
||||
print(" 4. Check email for recovery link")
|
||||
print("=" * 80)
|
||||
|
||||
# Output JSON for Ansible
|
||||
result = {
|
||||
"success": True,
|
||||
"recovery_flow_uuid": recovery_flow_uuid,
|
||||
"password_complexity_uuid": password_complexity_uuid,
|
||||
"recovery_url": f"https://{authentik_domain}/if/flow/recovery/"
|
||||
}
|
||||
print("\n" + json.dumps(result))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Reference in a new issue