diff --git a/ansible/roles/authentik/files/configure_2fa_enforcement.py b/ansible/roles/authentik/files/configure_2fa_enforcement.py new file mode 100644 index 0000000..18b2c5d --- /dev/null +++ b/ansible/roles/authentik/files/configure_2fa_enforcement.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Configure 2FA enforcement in Authentik. +Modifies the default-authentication-mfa-validation stage to force users to configure MFA. +""" +import sys +import json +import urllib.request +import urllib.error + +def api_request(base_url, token, path, method='GET', data=None): + """Make API request to Authentik""" + url = f"{base_url}{path}" + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + request_data = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=request_data, headers=headers, method=method) + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status, json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + try: + error_data = json.loads(error_body) + except: + error_data = {'error': error_body} + return e.code, error_data + +def main(): + if len(sys.argv) != 3: + print(json.dumps({'error': 'Usage: configure_2fa_enforcement.py '}), file=sys.stderr) + sys.exit(1) + + base_url = sys.argv[1] + token = sys.argv[2] + + # Step 1: Find the default MFA validation stage + status, stages_response = api_request(base_url, token, '/api/v3/stages/authenticator/validate/') + if status != 200: + print(json.dumps({'error': 'Failed to list authenticator validate stages', 'details': stages_response}), file=sys.stderr) + sys.exit(1) + + mfa_stage = next((s for s in stages_response.get('results', []) + if 'default-authentication-mfa-validation' in s.get('name', '').lower()), None) + + if not mfa_stage: + print(json.dumps({'error': 'default-authentication-mfa-validation stage not found'}), file=sys.stderr) + sys.exit(1) + + stage_pk = mfa_stage['pk'] + + # Step 2: Find the default TOTP setup stage to use as configuration stage + status, totp_stages_response = api_request(base_url, token, '/api/v3/stages/authenticator/totp/') + if status != 200: + print(json.dumps({'error': 'Failed to list TOTP setup stages', 'details': totp_stages_response}), file=sys.stderr) + sys.exit(1) + + totp_setup_stage = next((s for s in totp_stages_response.get('results', []) + if 'setup' in s.get('name', '').lower()), None) + + if not totp_setup_stage: + print(json.dumps({'error': 'TOTP setup stage not found'}), file=sys.stderr) + sys.exit(1) + + totp_setup_pk = totp_setup_stage['pk'] + + # Step 3: Update the MFA validation stage to force configuration + update_data = { + 'name': mfa_stage['name'], + 'not_configured_action': 'configure', # Force user to configure + 'configuration_stages': [totp_setup_pk] # Use TOTP setup stage + } + + status, updated_stage = api_request(base_url, token, f'/api/v3/stages/authenticator/validate/{stage_pk}/', 'PATCH', update_data) + if status not in [200, 201]: + print(json.dumps({'error': 'Failed to update MFA validation stage', 'details': updated_stage}), file=sys.stderr) + sys.exit(1) + + print(json.dumps({ + 'success': True, + 'message': '2FA enforcement configured', + 'stage_name': mfa_stage['name'], + 'stage_pk': stage_pk, + 'note': 'Users will be forced to configure TOTP on login' + })) + +if __name__ == '__main__': + main() diff --git a/ansible/roles/authentik/files/configure_invitation_flow.py b/ansible/roles/authentik/files/configure_invitation_flow.py new file mode 100644 index 0000000..624a737 --- /dev/null +++ b/ansible/roles/authentik/files/configure_invitation_flow.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +""" +Configure Authentik invitation flow. +Creates an invitation stage and binds it to the default enrollment flow. +""" +import sys +import json +import urllib.request +import urllib.error + +def api_request(base_url, token, path, method='GET', data=None): + """Make API request to Authentik""" + url = f"{base_url}{path}" + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + request_data = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=request_data, headers=headers, method=method) + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status, json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + try: + error_data = json.loads(error_body) + except: + error_data = {'error': error_body} + return e.code, error_data + +def main(): + if len(sys.argv) != 3: + print(json.dumps({'error': 'Usage: configure_invitation_flow.py '}), file=sys.stderr) + sys.exit(1) + + base_url = sys.argv[1] + token = sys.argv[2] + + # Step 1: Get the default enrollment flow + status, flows_response = api_request(base_url, token, '/api/v3/flows/instances/') + if status != 200: + print(json.dumps({'error': 'Failed to list flows', 'details': flows_response}), file=sys.stderr) + sys.exit(1) + + enrollment_flow = next((f for f in flows_response.get('results', []) + if f.get('designation') == 'enrollment'), None) + + if not enrollment_flow: + print(json.dumps({'error': 'No enrollment flow found'}), file=sys.stderr) + sys.exit(1) + + flow_slug = enrollment_flow['slug'] + flow_pk = enrollment_flow['pk'] + + # Step 2: Check if invitation stage already exists + status, stages_response = api_request(base_url, token, '/api/v3/stages/invitation/') + if status != 200: + print(json.dumps({'error': 'Failed to list invitation stages', 'details': stages_response}), file=sys.stderr) + sys.exit(1) + + invitation_stage = next((s for s in stages_response.get('results', []) + if s.get('name') == 'default-enrollment-invitation'), None) + + # Step 3: Create invitation stage if it doesn't exist + if not invitation_stage: + stage_data = { + 'name': 'default-enrollment-invitation', + 'continue_flow_without_invitation': True + } + status, invitation_stage = api_request(base_url, token, '/api/v3/stages/invitation/', 'POST', stage_data) + if status not in [200, 201]: + print(json.dumps({'error': 'Failed to create invitation stage', 'details': invitation_stage}), file=sys.stderr) + sys.exit(1) + + stage_pk = invitation_stage['pk'] + + # Step 4: Check if the stage is already bound to the enrollment flow + status, bindings_response = api_request(base_url, token, f'/api/v3/flows/bindings/?target={flow_pk}') + if status != 200: + print(json.dumps({'error': 'Failed to list flow bindings', 'details': bindings_response}), file=sys.stderr) + sys.exit(1) + + # Check if invitation stage is already bound + invitation_binding = next((b for b in bindings_response.get('results', []) + if b.get('stage') == stage_pk), None) + + # Step 5: Bind the invitation stage to the enrollment flow if not already bound + if not invitation_binding: + # Find the highest order number to insert at the beginning + max_order = max([b.get('order', 0) for b in bindings_response.get('results', [])], default=0) + + binding_data = { + 'target': flow_pk, + 'stage': stage_pk, + 'order': 0, # Put invitation stage first + 'evaluate_on_plan': True, + 're_evaluate_policies': False + } + status, binding = api_request(base_url, token, '/api/v3/flows/bindings/', 'POST', binding_data) + if status not in [200, 201]: + print(json.dumps({'error': 'Failed to bind invitation stage to flow', 'details': binding}), file=sys.stderr) + sys.exit(1) + + print(json.dumps({ + 'success': True, + 'message': 'Invitation flow configured', + 'flow_slug': flow_slug, + 'stage_pk': stage_pk, + 'note': 'Invitation stage bound to enrollment flow' + })) + +if __name__ == '__main__': + main() diff --git a/ansible/roles/authentik/files/configure_recovery_flow.py b/ansible/roles/authentik/files/configure_recovery_flow.py new file mode 100644 index 0000000..3b200df --- /dev/null +++ b/ansible/roles/authentik/files/configure_recovery_flow.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python3 +""" +Configure Authentik recovery flow. +Verifies that the default recovery flow exists (Authentik creates it by default). +The recovery flow is used when clicking "Create recovery link" in the UI. +""" +import sys +import json +import urllib.request +import urllib.error + +def api_request(base_url, token, path, method='GET', data=None): + """Make API request to Authentik""" + url = f"{base_url}{path}" + headers = { + 'Authorization': f'Bearer {token}', + 'Content-Type': 'application/json' + } + + request_data = json.dumps(data).encode() if data else None + req = urllib.request.Request(url, data=request_data, headers=headers, method=method) + + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.status, json.loads(resp.read().decode()) + except urllib.error.HTTPError as e: + error_body = e.read().decode() + try: + error_data = json.loads(error_body) + except: + error_data = {'error': error_body} + return e.code, error_data + +def main(): + if len(sys.argv) != 3: + print(json.dumps({'error': 'Usage: configure_recovery_flow.py '}), file=sys.stderr) + sys.exit(1) + + base_url = sys.argv[1] + token = sys.argv[2] + + # Get the default recovery flow (created by Authentik by default) + status, flows_response = api_request(base_url, token, '/api/v3/flows/instances/') + if status != 200: + print(json.dumps({'error': 'Failed to list flows', 'details': flows_response}), file=sys.stderr) + sys.exit(1) + + recovery_flow = next((f for f in flows_response.get('results', []) + if f.get('designation') == 'recovery'), None) + + if not recovery_flow: + print(json.dumps({'error': 'No recovery flow found - Authentik should create one by default'}), file=sys.stderr) + sys.exit(1) + + flow_slug = recovery_flow['slug'] + flow_pk = recovery_flow['pk'] + + print(json.dumps({ + 'success': True, + 'message': 'Recovery flow configured', + 'flow_slug': flow_slug, + 'flow_pk': flow_pk, + 'note': 'Using Authentik default recovery flow' + })) + +if __name__ == '__main__': + main() diff --git a/ansible/roles/authentik/files/create_invitation_flow.py b/ansible/roles/authentik/files/create_invitation_flow.py deleted file mode 100644 index 131aa16..0000000 --- a/ansible/roles/authentik/files/create_invitation_flow.py +++ /dev/null @@ -1,74 +0,0 @@ -#!/usr/bin/env python3 -""" -Create user invitation flow in Authentik -Allows admins to send invitation emails to new users -""" -import sys -import json -import urllib.request -import urllib.error - -def api_request(base_url, token, path, method='GET', data=None): - """Make API request to Authentik""" - url = f"{base_url}{path}" - headers = { - 'Authorization': f'Bearer {token}', - 'Content-Type': 'application/json' - } - - request_data = json.dumps(data).encode() if data else None - req = urllib.request.Request(url, data=request_data, headers=headers, method=method) - - try: - with urllib.request.urlopen(req, timeout=30) as resp: - return resp.status, json.loads(resp.read().decode()) - except urllib.error.HTTPError as e: - error_body = e.read().decode() - try: - error_data = json.loads(error_body) - except: - error_data = {'error': error_body} - return e.code, error_data - -def main(): - if len(sys.argv) != 3: - print(json.dumps({'error': 'Usage: create_invitation_flow.py '})) - sys.exit(1) - - base_url = sys.argv[1] - token = sys.argv[2] - - # Check if invitation flow already exists - status, flows = api_request(base_url, token, '/api/v3/flows/instances/') - if status != 200: - print(json.dumps({'error': 'Failed to list flows', 'details': flows}), file=sys.stderr) - sys.exit(1) - - existing_invitation = next((f for f in flows.get('results', []) - if 'invitation' in f.get('slug', '').lower()), None) - - if existing_invitation: - print(json.dumps({ - 'success': True, - 'message': 'Invitation flow already exists', - 'flow_id': existing_invitation['pk'] - })) - sys.exit(0) - - # Get enrollment flow to use for invitations - enrollment_flow = next((f for f in flows.get('results', []) - if f.get('designation') == 'enrollment'), None) - - if enrollment_flow: - print(json.dumps({ - 'success': True, - 'message': 'Using enrollment flow for invitations', - 'flow_id': enrollment_flow['pk'], - 'flow_slug': enrollment_flow['slug'] - })) - else: - print(json.dumps({'error': 'No enrollment flow found'}), file=sys.stderr) - sys.exit(1) - -if __name__ == '__main__': - main() diff --git a/ansible/roles/authentik/files/create_recovery_flow.py b/ansible/roles/authentik/files/create_recovery_flow.py deleted file mode 100644 index 9ab0612..0000000 --- a/ansible/roles/authentik/files/create_recovery_flow.py +++ /dev/null @@ -1,70 +0,0 @@ -#!/usr/bin/env python3 -""" -Create password recovery flow in Authentik -Allows users to reset their password via email -""" -import sys -import json -import urllib.request -import urllib.error - -def api_request(base_url, token, path, method='GET', data=None): - """Make API request to Authentik""" - url = f"{base_url}{path}" - headers = { - 'Authorization': f'Bearer {token}', - 'Content-Type': 'application/json' - } - - request_data = json.dumps(data).encode() if data else None - req = urllib.request.Request(url, data=request_data, headers=headers, method=method) - - try: - with urllib.request.urlopen(req, timeout=30) as resp: - return resp.status, json.loads(resp.read().decode()) - except urllib.error.HTTPError as e: - error_body = e.read().decode() - try: - error_data = json.loads(error_body) - except: - error_data = {'error': error_body} - return e.code, error_data - -def main(): - if len(sys.argv) != 3: - print(json.dumps({'error': 'Usage: create_recovery_flow.py '})) - sys.exit(1) - - base_url = sys.argv[1] - token = sys.argv[2] - - # Check if recovery flow already exists with slug 'recovery-flow' - status, flows = api_request(base_url, token, '/api/v3/flows/instances/') - if status != 200: - print(json.dumps({'error': 'Failed to list flows', 'details': flows}), file=sys.stderr) - sys.exit(1) - - # Check if we already have a recovery flow configured - existing_recovery = next((f for f in flows.get('results', []) - if f.get('slug') == 'recovery-flow' or f.get('designation') == 'recovery'), None) - - if existing_recovery: - print(json.dumps({ - 'success': True, - 'message': 'Recovery flow already exists', - 'flow_id': existing_recovery['pk'], - 'flow_slug': existing_recovery['slug'] - })) - sys.exit(0) - - # Create a simple recovery flow - # Note: In production Authentik, you would import flows via blueprints or UI - # For initial deployment, we just configure email settings and rely on manual flow setup - print(json.dumps({ - 'success': True, - 'message': 'No recovery flow found - will use default Authentik flow after manual setup', - 'note': 'Admin should configure recovery flow in Authentik UI: Flows & Stages' - })) - -if __name__ == '__main__': - main() diff --git a/ansible/roles/authentik/tasks/flows.yml b/ansible/roles/authentik/tasks/flows.yml index c047cbd..3aa1c46 100644 --- a/ansible/roles/authentik/tasks/flows.yml +++ b/ansible/roles/authentik/tasks/flows.yml @@ -1,55 +1,67 @@ --- -# Configure Authentik flows (recovery, invitation) +# Configure Authentik flows (invitation, recovery, 2FA) via API - name: Use bootstrap token for API access set_fact: authentik_api_token: "{{ client_secrets.authentik_bootstrap_token }}" -- name: Copy recovery flow script to server +- name: Copy invitation flow configuration script to server copy: - src: create_recovery_flow.py - dest: /tmp/create_recovery_flow.py + src: configure_invitation_flow.py + dest: /tmp/configure_invitation_flow.py mode: '0755' -- name: Copy invitation flow script to server +- name: Copy recovery flow configuration script to server copy: - src: create_invitation_flow.py - dest: /tmp/create_invitation_flow.py + src: configure_recovery_flow.py + dest: /tmp/configure_recovery_flow.py mode: '0755' -- name: Copy flow scripts into container +- name: Copy 2FA enforcement configuration script to server + copy: + src: configure_2fa_enforcement.py + dest: /tmp/configure_2fa_enforcement.py + mode: '0755' + +- name: Copy scripts into container shell: | - docker cp /tmp/create_recovery_flow.py authentik-server:/tmp/ - docker cp /tmp/create_invitation_flow.py authentik-server:/tmp/ + docker cp /tmp/configure_invitation_flow.py authentik-server:/tmp/ + docker cp /tmp/configure_recovery_flow.py authentik-server:/tmp/ + docker cp /tmp/configure_2fa_enforcement.py authentik-server:/tmp/ changed_when: false -- name: Create/verify recovery flow +- name: Configure invitation flow shell: | - docker exec -i authentik-server python3 /tmp/create_recovery_flow.py \ + docker exec authentik-server python3 /tmp/configure_invitation_flow.py \ "http://localhost:9000" \ "{{ authentik_api_token }}" - register: recovery_flow - changed_when: "'already exists' not in recovery_flow.stdout" - failed_when: false - ignore_errors: true + register: invitation_result + changed_when: "'success' in invitation_result.stdout" -- name: Create/verify invitation flow +- name: Configure recovery flow shell: | - docker exec -i authentik-server python3 /tmp/create_invitation_flow.py \ + docker exec authentik-server python3 /tmp/configure_recovery_flow.py \ "http://localhost:9000" \ "{{ authentik_api_token }}" - register: invitation_flow - changed_when: "'already exists' not in invitation_flow.stdout" - failed_when: false - ignore_errors: true + register: recovery_result + changed_when: "'success' in recovery_result.stdout" -- name: Cleanup flow scripts from host +- name: Configure 2FA enforcement + shell: | + docker exec authentik-server python3 /tmp/configure_2fa_enforcement.py \ + "http://localhost:9000" \ + "{{ authentik_api_token }}" + register: twofa_result + changed_when: "'success' in twofa_result.stdout" + +- name: Cleanup configuration scripts from host file: path: "{{ item }}" state: absent loop: - - /tmp/create_recovery_flow.py - - /tmp/create_invitation_flow.py + - /tmp/configure_invitation_flow.py + - /tmp/configure_recovery_flow.py + - /tmp/configure_2fa_enforcement.py - name: Display flows configuration status debug: @@ -58,11 +70,14 @@ Authentik Flows Configuration ======================================== - ✓ Recovery Flow: Configured - Users can reset passwords via email + ✓ Invitation Flow: {{ 'Configured' if invitation_result.rc == 0 else 'Failed' }} + {{ (invitation_result.stdout | from_json).message | default('') }} - ✓ Invitation Flow: Configured - Admins can invite users via email + ✓ Recovery Flow: {{ 'Configured' if recovery_result.rc == 0 else 'Failed' }} + {{ (recovery_result.stdout | from_json).message | default('') }} + + ✓ 2FA Enforcement: {{ 'Configured' if twofa_result.rc == 0 else 'Failed' }} + {{ (twofa_result.stdout | from_json).message | default('') }} Email configuration is active and flows will send emails via Mailgun SMTP.