Post-Tyranny-Tech-Infrastru.../ansible/roles/authentik/files/create_recovery_flow.py

523 lines
21 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python3
"""
Authentik Recovery Flow Automation Script
This script creates a complete password recovery flow in Authentik with:
- Password complexity policy (12 chars, mixed case, digit, symbol)
- Recovery identification stage (username/email)
- Recovery email stage (sends recovery token)
- Password change stages (with validation)
- Integration with default authentication flow
- Brand default recovery flow configuration
Usage:
python3 create_recovery_flow.py <api_token> <authentik_domain>
"""
import sys
import json
import urllib.request
import urllib.error
def api_request(base_url, token, path, method='GET', data=None):
"""Make an API request to Authentik"""
url = f"{base_url}{path}"
headers = {
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
}
request_data = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=request_data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
body = resp.read().decode()
if body:
return resp.status, json.loads(body)
return resp.status, {}
except urllib.error.HTTPError as e:
error_body = e.read().decode()
try:
error_data = json.loads(error_body) if error_body else {'error': 'Empty error response'}
except:
error_data = {'error': error_body or 'Unknown error'}
return e.code, error_data
except Exception as e:
return 0, {'error': str(e)}
def get_or_create_password_policy(base_url, token):
"""Create password complexity policy"""
print("Checking for password complexity policy...")
policy_data = {
"name": "password-complexity",
"password_field": "password",
"amount_digits": 1,
"amount_uppercase": 1,
"amount_lowercase": 1,
"amount_symbols": 1,
"length_min": 12,
"symbol_charset": "!\\\"#$%&'()*+,-./:;<=>?@[]^_`{|}~",
"error_message": "Enter a minimum of 12 characters, with at least 1 lowercase, uppercase, digit and symbol",
"check_static_rules": True,
"check_have_i_been_pwned": True,
"check_zxcvbn": True,
"hibp_allowed_count": 0,
"zxcvbn_score_threshold": 2
}
# Check if policy already exists
status, policies = api_request(base_url, token, '/api/v3/policies/password/')
print(f" Initial check status: {status}")
if status == 200:
results = policies.get('results', [])
print(f" Found {len(results)} existing policies")
for policy in results:
policy_name = policy.get('name')
print(f" - {policy_name}")
if policy_name == 'password-complexity':
print(f" ✓ Password policy already exists: {policy['pk']}")
return policy['pk']
else:
print(f" Initial check failed: {policies}")
# Create new policy
status, policy = api_request(base_url, token, '/api/v3/policies/password/', 'POST', policy_data)
if status == 201:
print(f" ✓ Created password policy: {policy['pk']}")
return policy['pk']
elif status == 400 and 'name' in policy:
# Policy with same name already exists, search for it again
print(f" ! Policy name already exists, retrieving existing policy...")
status, policies = api_request(base_url, token, '/api/v3/policies/password/')
if status == 200:
for existing_policy in policies.get('results', []):
if existing_policy.get('name') == 'password-complexity':
print(f" ✓ Found existing password policy: {existing_policy['pk']}")
return existing_policy['pk']
print(f" ✗ Failed to find existing policy after creation conflict")
return None
else:
print(f" ✗ Failed to create password policy: {policy}")
return None
def get_or_create_recovery_identification_stage(base_url, token):
"""Create recovery identification stage"""
print("Creating recovery identification stage...")
stage_data = {
"name": "recovery-authentication-identification",
"user_fields": ["username", "email"],
"password_stage": None,
"case_insensitive_matching": True,
"show_matched_user": True,
"pretend_user_exists": True,
"enable_remember_me": False
}
# Check if stage already exists
status, stages = api_request(base_url, token, '/api/v3/stages/identification/')
if status == 200:
for stage in stages.get('results', []):
if stage.get('name') == 'recovery-authentication-identification':
print(f" ✓ Recovery identification stage already exists: {stage['pk']}")
return stage['pk']
# Create new stage
status, stage = api_request(base_url, token, '/api/v3/stages/identification/', 'POST', stage_data)
if status == 201:
print(f" ✓ Created recovery identification stage: {stage['pk']}")
return stage['pk']
elif status == 400 and 'name' in stage:
# Stage with same name already exists
print(f" ! Stage name already exists, retrieving existing stage...")
status, stages = api_request(base_url, token, '/api/v3/stages/identification/')
if status == 200:
for existing_stage in stages.get('results', []):
if existing_stage.get('name') == 'recovery-authentication-identification':
print(f" ✓ Found existing recovery identification stage: {existing_stage['pk']}")
return existing_stage['pk']
print(f" ✗ Failed to find existing stage after creation conflict")
return None
else:
print(f" ✗ Failed to create recovery identification stage: {stage}")
return None
def get_or_create_recovery_email_stage(base_url, token):
"""Create recovery email stage"""
print("Creating recovery email stage...")
stage_data = {
"name": "recovery-email",
"use_global_settings": True,
"token_expiry": "minutes=30",
"subject": "Password recovery",
"template": "email/password_reset.html",
"activate_user_on_success": True,
"recovery_max_attempts": 5,
"recovery_cache_timeout": "minutes=5"
}
# Check if stage already exists
status, stages = api_request(base_url, token, '/api/v3/stages/email/')
if status == 200:
for stage in stages.get('results', []):
if stage.get('name') == 'recovery-email':
print(f" ✓ Recovery email stage already exists: {stage['pk']}")
return stage['pk']
# Create new stage
status, stage = api_request(base_url, token, '/api/v3/stages/email/', 'POST', stage_data)
if status == 201:
print(f" ✓ Created recovery email stage: {stage['pk']}")
return stage['pk']
elif status == 400 and 'name' in stage:
# Stage with same name already exists
print(f" ! Stage name already exists, retrieving existing stage...")
status, stages = api_request(base_url, token, '/api/v3/stages/email/')
if status == 200:
for existing_stage in stages.get('results', []):
if existing_stage.get('name') == 'recovery-email':
print(f" ✓ Found existing recovery email stage: {existing_stage['pk']}")
return existing_stage['pk']
print(f" ✗ Failed to find existing stage after creation conflict")
return None
else:
print(f" ✗ Failed to create recovery email stage: {stage}")
return None
def get_existing_stage_uuid(base_url, token, stage_name, stage_type):
"""Get UUID of an existing stage"""
status, stages = api_request(base_url, token, f'/api/v3/stages/{stage_type}/')
if status == 200:
for stage in stages.get('results', []):
if stage.get('name') == stage_name:
return stage['pk']
return None
def get_or_create_recovery_flow(base_url, token, stage_ids):
"""Create recovery flow with stage bindings"""
print("Creating recovery flow...")
flow_data = {
"name": "recovery",
"slug": "recovery",
"title": "Recovery",
"designation": "recovery",
"policy_engine_mode": "any",
"compatibility_mode": False,
"layout": "stacked",
"denied_action": "message_continue"
}
# Check if flow already exists
status, flows = api_request(base_url, token, '/api/v3/flows/instances/')
if status == 200:
for flow in flows.get('results', []):
if flow.get('slug') == 'recovery':
print(f" ✓ Recovery flow already exists: {flow['pk']}")
return flow['pk']
# Create new flow
status, flow = api_request(base_url, token, '/api/v3/flows/instances/', 'POST', flow_data)
if status != 201:
print(f" ✗ Failed to create recovery flow: {flow}")
return None
flow_uuid = flow['pk']
print(f" ✓ Created recovery flow: {flow_uuid}")
# Create stage bindings
bindings = [
{"stage": stage_ids['recovery_identification'], "order": 0},
{"stage": stage_ids['recovery_email'], "order": 10},
{"stage": stage_ids['password_change_prompt'], "order": 20},
{"stage": stage_ids['password_change_write'], "order": 30},
]
for binding in bindings:
binding_data = {
"target": flow_uuid,
"stage": binding['stage'],
"order": binding['order'],
"evaluate_on_plan": False,
"re_evaluate_policies": True,
"policy_engine_mode": "any",
"invalid_response_action": "retry"
}
status, result = api_request(base_url, token, '/api/v3/flows/bindings/', 'POST', binding_data)
if status == 201:
print(f" ✓ Bound stage {binding['stage']} at order {binding['order']}")
else:
print(f" ✗ Failed to bind stage: {result}")
return flow_uuid
def update_password_change_prompt_stage(base_url, token, stage_uuid, password_complexity_uuid):
"""Add password complexity policy to password change prompt stage"""
print("Updating password change prompt stage...")
# Get current stage configuration
status, stage = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/')
if status != 200:
print(f" ✗ Failed to get stage: {stage}")
return False
# Add password complexity to validation policies
validation_policies = stage.get('validation_policies', [])
if password_complexity_uuid not in validation_policies:
validation_policies.append(password_complexity_uuid)
update_data = {
"validation_policies": validation_policies
}
status, result = api_request(base_url, token, f'/api/v3/stages/prompt/stages/{stage_uuid}/', 'PATCH', update_data)
if status == 200:
print(f" ✓ Added password complexity policy to validation")
return True
else:
print(f" ✗ Failed to update stage: {result}")
return False
else:
print(f" ✓ Password complexity policy already in validation")
return True
def remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, password_stage_uuid):
"""Remove separate password stage from authentication flow if it exists"""
print("Checking for separate password stage in authentication flow...")
# Get all flow bindings
status, bindings_data = api_request(base_url, token, '/api/v3/flows/bindings/')
if status != 200:
print(f" ✗ Failed to get flow bindings: {bindings_data}")
return False
# Find password stage binding in auth flow
password_binding = None
for binding in bindings_data.get('results', []):
if binding.get('target') == auth_flow_uuid and binding.get('stage') == password_stage_uuid:
password_binding = binding
break
if not password_binding:
print(f" ✓ No separate password stage found (already removed)")
return True
# Delete the password stage binding
binding_uuid = password_binding.get('pk')
status, result = api_request(base_url, token, f'/api/v3/flows/bindings/{binding_uuid}/', 'DELETE')
if status == 204 or status == 200:
print(f" ✓ Removed separate password stage from authentication flow")
return True
else:
print(f" ✗ Failed to remove password stage: {result}")
return False
def update_authentication_identification_stage(base_url, token, stage_uuid, password_stage_uuid, recovery_flow_uuid):
"""Update authentication identification stage with password field and recovery flow"""
print("Updating authentication identification stage...")
# First get the current stage configuration
status, current_stage = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/')
if status != 200:
print(f" ✗ Failed to get current stage: {current_stage}")
return False
# Check if already configured
if current_stage.get('password_stage') == password_stage_uuid and current_stage.get('recovery_flow') == recovery_flow_uuid:
print(f" ✓ Authentication identification stage already configured")
return True
# Update with new values while preserving existing configuration
update_data = {
"name": current_stage.get('name'),
"user_fields": current_stage.get('user_fields', ["username", "email"]),
"password_stage": password_stage_uuid,
"recovery_flow": recovery_flow_uuid,
"case_insensitive_matching": current_stage.get('case_insensitive_matching', True),
"show_matched_user": current_stage.get('show_matched_user', True),
"pretend_user_exists": current_stage.get('pretend_user_exists', True)
}
status, result = api_request(base_url, token, f'/api/v3/stages/identification/{stage_uuid}/', 'PATCH', update_data)
if status == 200:
print(f" ✓ Updated authentication identification stage")
print(f" - Added password field on same page")
print(f" - Added recovery flow link")
return True
else:
print(f" ✗ Failed to update stage: {result}")
return False
def update_brand_recovery_flow(base_url, token, recovery_flow_uuid):
"""Update the default brand to use the recovery flow"""
print("Updating brand default recovery flow...")
# Get the default brand (authentik has one brand by default)
status, brands = api_request(base_url, token, '/api/v3/core/brands/')
if status != 200:
print(f" ✗ Failed to get brands: {brands}")
return False
results = brands.get('results', [])
if not results:
print(f" ✗ No brands found")
return False
# Use the first/default brand
brand = results[0]
brand_uuid = brand.get('brand_uuid')
# Check if already configured
if brand.get('flow_recovery') == recovery_flow_uuid:
print(f" ✓ Brand recovery flow already configured")
return True
# Update the brand with recovery flow
update_data = {
"domain": brand.get('domain'),
"flow_recovery": recovery_flow_uuid
}
status, result = api_request(base_url, token, f'/api/v3/core/brands/{brand_uuid}/', 'PATCH', update_data)
if status == 200:
print(f" ✓ Updated brand default recovery flow")
return True
else:
print(f" ✗ Failed to update brand: {result}")
return False
def main():
if len(sys.argv) < 3:
print("Usage: python3 create_recovery_flow.py <api_token> <authentik_domain>")
sys.exit(1)
token = sys.argv[1]
authentik_domain = sys.argv[2]
# Use internal localhost URL when running inside Authentik container
# This avoids SSL/DNS issues
base_url = "http://localhost:9000"
print(f"Using internal API endpoint: {base_url}")
print(f"External domain: https://{authentik_domain}\n")
print("=" * 80)
print("Authentik Recovery Flow Automation")
print("=" * 80)
print(f"Target: {base_url}\n")
# Step 1: Create password complexity policy
password_complexity_uuid = get_or_create_password_policy(base_url, token)
if not password_complexity_uuid:
print("\n✗ Failed to create password complexity policy")
sys.exit(1)
# Step 2: Create recovery identification stage
recovery_identification_uuid = get_or_create_recovery_identification_stage(base_url, token)
if not recovery_identification_uuid:
print("\n✗ Failed to create recovery identification stage")
sys.exit(1)
# Step 3: Create recovery email stage
recovery_email_uuid = get_or_create_recovery_email_stage(base_url, token)
if not recovery_email_uuid:
print("\n✗ Failed to create recovery email stage")
sys.exit(1)
# Step 4: Get existing stage and flow UUIDs
print("\nGetting existing stage and flow UUIDs...")
password_change_prompt_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-prompt', 'prompt/stages')
password_change_write_uuid = get_existing_stage_uuid(base_url, token, 'default-password-change-write', 'user_write')
auth_identification_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-identification', 'identification')
auth_password_uuid = get_existing_stage_uuid(base_url, token, 'default-authentication-password', 'password')
# Get default authentication flow UUID
status, flows = api_request(base_url, token, '/api/v3/flows/instances/')
auth_flow_uuid = None
if status == 200:
for flow in flows.get('results', []):
if flow.get('slug') == 'default-authentication-flow':
auth_flow_uuid = flow.get('pk')
break
if not all([password_change_prompt_uuid, password_change_write_uuid, auth_identification_uuid, auth_password_uuid, auth_flow_uuid]):
print(" ✗ Failed to find all required existing stages and flows")
sys.exit(1)
print(f" ✓ Found all existing stages and flows")
# Step 5: Create recovery flow
stage_ids = {
'recovery_identification': recovery_identification_uuid,
'recovery_email': recovery_email_uuid,
'password_change_prompt': password_change_prompt_uuid,
'password_change_write': password_change_write_uuid
}
recovery_flow_uuid = get_or_create_recovery_flow(base_url, token, stage_ids)
if not recovery_flow_uuid:
print("\n✗ Failed to create recovery flow")
sys.exit(1)
# Step 6: Update password change prompt stage
if not update_password_change_prompt_stage(base_url, token, password_change_prompt_uuid, password_complexity_uuid):
print("\n⚠ Warning: Failed to update password change prompt stage")
# Step 7: Update authentication identification stage
if not update_authentication_identification_stage(base_url, token, auth_identification_uuid, auth_password_uuid, recovery_flow_uuid):
print("\n⚠ Warning: Failed to update authentication identification stage")
# Step 8: Remove separate password stage from authentication flow
if not remove_separate_password_stage_from_auth_flow(base_url, token, auth_flow_uuid, auth_password_uuid):
print("\n⚠ Warning: Failed to remove separate password stage (may not exist)")
# Step 9: Update brand default recovery flow
if not update_brand_recovery_flow(base_url, token, recovery_flow_uuid):
print("\n⚠ Warning: Failed to update brand recovery flow (non-critical)")
# Success!
print("\n" + "=" * 80)
print("✓ Recovery Flow Configuration Complete!")
print("=" * 80)
print(f"\nRecovery Flow UUID: {recovery_flow_uuid}")
print(f"Recovery URL: https://{authentik_domain}/if/flow/recovery/")
print(f"\nFeatures enabled:")
print(" ✓ Password complexity policy (12 chars, mixed case, digit, symbol)")
print(" ✓ Recovery email with 30-minute token")
print(" ✓ Password + username on same login page")
print("'Forgot password?' link on login page")
print(" ✓ Brand default recovery flow configured")
print("\nTest the recovery flow:")
print(f" 1. Visit: https://{authentik_domain}/if/flow/default-authentication-flow/")
print(" 2. Click 'Forgot password?' link")
print(" 3. Enter username or email")
print(" 4. Check email for recovery link")
print("=" * 80)
# Output JSON for Ansible
result = {
"success": True,
"recovery_flow_uuid": recovery_flow_uuid,
"password_complexity_uuid": password_complexity_uuid,
"recovery_url": f"https://{authentik_domain}/if/flow/recovery/"
}
print("\n" + json.dumps(result))
if __name__ == "__main__":
main()