feat(auth): add Casdoor SSO integration via django-allauth
Integrate OIDC-based SSO authentication through Casdoor using django-allauth. Adds configuration for enabling SSO, custom account adapters, and an optional SSL verification bypass for sandbox environments with self-signed certificates. - Add CASDOOR_* and ALLOW_LOCAL_LOGIN env vars to .env.example and docker-compose (app service only) - Configure allauth with openid_connect provider for Casdoor - Register custom adapters (CasdoorAccountAdapter, LocalAccountAdapter) - Apply SSL patch early in settings when CASDOOR_SSL_VERIFY=false
This commit is contained in:
@@ -10,6 +10,11 @@ from pathlib import Path
|
||||
|
||||
import environ
|
||||
|
||||
# Apply SSL bypass before any HTTP-making imports when CASDOOR_SSL_VERIFY=false.
|
||||
# This must run before allauth or requests make outbound calls.
|
||||
if os.environ.get('CASDOOR_SSL_VERIFY', 'true').lower() == 'false':
|
||||
import themis.ssl_patch # noqa: F401
|
||||
|
||||
# Build paths inside the project like this: BASE_DIR / 'subdir'.
|
||||
BASE_DIR = Path(__file__).resolve().parent.parent
|
||||
|
||||
@@ -34,6 +39,15 @@ SECURE_PROXY_SSL_HEADER = ("HTTP_X_FORWARDED_PROTO", "https")
|
||||
USE_X_FORWARDED_HOST = True
|
||||
USE_X_FORWARDED_PORT = True
|
||||
|
||||
# --- SSO (Casdoor / Allauth) ---
|
||||
CASDOOR_ENABLED = env.bool("CASDOOR_ENABLED", default=False)
|
||||
CASDOOR_ORIGIN = env("CASDOOR_ORIGIN", default="")
|
||||
CASDOOR_ORIGIN_FRONTEND = env("CASDOOR_ORIGIN_FRONTEND", default="")
|
||||
CASDOOR_CLIENT_ID = env("CASDOOR_CLIENT_ID", default="")
|
||||
CASDOOR_CLIENT_SECRET = env("CASDOOR_CLIENT_SECRET", default="")
|
||||
CASDOOR_ORG_NAME = env("CASDOOR_ORG_NAME", default="")
|
||||
ALLOW_LOCAL_LOGIN = env.bool("ALLOW_LOCAL_LOGIN", default=False)
|
||||
|
||||
# --- LLM API Encryption ---
|
||||
LLM_API_SECRETS_ENCRYPTION_KEY = env(
|
||||
"LLM_API_SECRETS_ENCRYPTION_KEY", default=""
|
||||
@@ -54,6 +68,10 @@ INSTALLED_APPS = [
|
||||
"storages",
|
||||
"django_neomodel",
|
||||
"django_prometheus",
|
||||
"allauth",
|
||||
"allauth.account",
|
||||
"allauth.socialaccount",
|
||||
"allauth.socialaccount.providers.openid_connect",
|
||||
# Mnemosyne apps
|
||||
"themis",
|
||||
"library",
|
||||
@@ -64,10 +82,52 @@ INSTALLED_APPS = [
|
||||
# --- MCP Server ---
|
||||
MCP_REQUIRE_AUTH = env.bool("MCP_REQUIRE_AUTH", default=True)
|
||||
|
||||
# --- Authentication backends ---
|
||||
AUTHENTICATION_BACKENDS = [
|
||||
"django.contrib.auth.backends.ModelBackend",
|
||||
"allauth.account.auth_backends.AuthenticationBackend",
|
||||
]
|
||||
|
||||
# --- Allauth ---
|
||||
ACCOUNT_LOGIN_METHODS = {"email"}
|
||||
ACCOUNT_SIGNUP_FIELDS = ["email*", "password1*", "password2*"]
|
||||
ACCOUNT_EMAIL_VERIFICATION = "optional"
|
||||
ACCOUNT_SESSION_REMEMBER = True
|
||||
ACCOUNT_LOGIN_ON_PASSWORD_RESET = True
|
||||
ACCOUNT_UNIQUE_EMAIL = True
|
||||
SOCIALACCOUNT_AUTO_SIGNUP = True
|
||||
SOCIALACCOUNT_EMAIL_VERIFICATION = "none"
|
||||
SOCIALACCOUNT_QUERY_EMAIL = True
|
||||
SOCIALACCOUNT_STORE_TOKENS = True
|
||||
SOCIALACCOUNT_ADAPTER = "themis.adapters.CasdoorAccountAdapter"
|
||||
ACCOUNT_ADAPTER = "themis.adapters.LocalAccountAdapter"
|
||||
SOCIALACCOUNT_EMAIL_AUTHENTICATION_AUTO_CONNECT = True
|
||||
SESSION_COOKIE_AGE = 28800
|
||||
SESSION_SAVE_EVERY_REQUEST = True
|
||||
|
||||
if CASDOOR_ENABLED:
|
||||
SOCIALACCOUNT_PROVIDERS = {
|
||||
"openid_connect": {
|
||||
"APPS": [
|
||||
{
|
||||
"provider_id": "casdoor",
|
||||
"name": "Casdoor SSO",
|
||||
"client_id": CASDOOR_CLIENT_ID,
|
||||
"secret": CASDOOR_CLIENT_SECRET,
|
||||
"settings": {
|
||||
"server_url": f"{CASDOOR_ORIGIN}/.well-known/openid-configuration",
|
||||
},
|
||||
}
|
||||
],
|
||||
"OAUTH_PKCE_ENABLED": True,
|
||||
}
|
||||
}
|
||||
|
||||
MIDDLEWARE = [
|
||||
"django_prometheus.middleware.PrometheusBeforeMiddleware",
|
||||
"django.middleware.security.SecurityMiddleware",
|
||||
"django.contrib.sessions.middleware.SessionMiddleware",
|
||||
"allauth.account.middleware.AccountMiddleware",
|
||||
"django.middleware.common.CommonMiddleware",
|
||||
"django.middleware.csrf.CsrfViewMiddleware",
|
||||
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
||||
@@ -257,9 +317,9 @@ REST_FRAMEWORK = {
|
||||
DEFAULT_AUTO_FIELD = "django.db.models.BigAutoField"
|
||||
|
||||
# --- Login URLs ---
|
||||
LOGIN_URL = "/login/"
|
||||
LOGIN_REDIRECT_URL = "/"
|
||||
LOGOUT_REDIRECT_URL = "/"
|
||||
LOGIN_URL = "/accounts/login/"
|
||||
LOGIN_REDIRECT_URL = "/dashboard/"
|
||||
ACCOUNT_LOGOUT_REDIRECT_URL = "/"
|
||||
|
||||
# --- Embedding Pipeline (Phase 2) ---
|
||||
EMBEDDING_BATCH_SIZE = env.int("EMBEDDING_BATCH_SIZE", default=8)
|
||||
|
||||
92
mnemosyne/mnemosyne/templates/account/login.html
Normal file
92
mnemosyne/mnemosyne/templates/account/login.html
Normal file
@@ -0,0 +1,92 @@
|
||||
{% extends "themis/base.html" %}
|
||||
{% load socialaccount %}
|
||||
|
||||
{% block title %}Log In — {{ themis_app_name }}{% endblock %}
|
||||
|
||||
{% block content %}
|
||||
<div class="flex justify-center items-center min-h-[60vh]">
|
||||
<div class="card bg-base-200 shadow-xl w-full max-w-md">
|
||||
<div class="card-body">
|
||||
<h2 class="card-title text-2xl justify-center mb-4">Log In</h2>
|
||||
|
||||
{% if messages %}
|
||||
{% for message in messages %}
|
||||
<div class="alert alert-{{ message.tags|default:'info' }} mb-4">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 shrink-0" fill="none"
|
||||
viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
|
||||
d="M12 9v2m0 4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z" />
|
||||
</svg>
|
||||
<span>{{ message }}</span>
|
||||
</div>
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
|
||||
{% if CASDOOR_ENABLED %}
|
||||
<form method="post" action="{% provider_login_url 'casdoor' %}">
|
||||
{% csrf_token %}
|
||||
<div class="form-control mt-2">
|
||||
<button type="submit" class="btn btn-primary w-full">Sign in with SSO</button>
|
||||
</div>
|
||||
</form>
|
||||
{% endif %}
|
||||
|
||||
{% if ALLOW_LOCAL_LOGIN or user.is_superuser %}
|
||||
|
||||
{% if CASDOOR_ENABLED %}
|
||||
<div class="divider">or</div>
|
||||
{% endif %}
|
||||
|
||||
{% if form.errors %}
|
||||
<div class="alert alert-error mb-4">
|
||||
<svg xmlns="http://www.w3.org/2000/svg" class="h-5 w-5 shrink-0" fill="none"
|
||||
viewBox="0 0 24 24" stroke="currentColor">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
|
||||
d="M12 9v2m0 4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z" />
|
||||
</svg>
|
||||
<span>Invalid email or password. Please try again.</span>
|
||||
</div>
|
||||
{% endif %}
|
||||
|
||||
<form method="post" action="{% url 'account_login' %}">
|
||||
{% csrf_token %}
|
||||
|
||||
<div class="form-control mb-4">
|
||||
<label class="label" for="id_login">
|
||||
<span class="label-text">Email</span>
|
||||
</label>
|
||||
<input type="email" name="login" id="id_login"
|
||||
class="input input-bordered w-full{% if form.errors %} input-error{% endif %}"
|
||||
value="{{ form.login.value|default:'' }}"
|
||||
autofocus required>
|
||||
</div>
|
||||
|
||||
<div class="form-control mb-6">
|
||||
<label class="label" for="id_password">
|
||||
<span class="label-text">Password</span>
|
||||
</label>
|
||||
<input type="password" name="password" id="id_password"
|
||||
class="input input-bordered w-full{% if form.errors %} input-error{% endif %}"
|
||||
required>
|
||||
<label class="label">
|
||||
<a href="{% url 'account_reset_password' %}" class="label-text-alt link link-hover">
|
||||
Forgot password?
|
||||
</a>
|
||||
</label>
|
||||
</div>
|
||||
|
||||
{% if next %}
|
||||
<input type="hidden" name="next" value="{{ next }}">
|
||||
{% endif %}
|
||||
|
||||
<div class="form-control mt-2">
|
||||
<button type="submit" class="btn btn-outline w-full">Sign In Locally</button>
|
||||
</div>
|
||||
</form>
|
||||
|
||||
{% endif %}
|
||||
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
@@ -14,8 +14,8 @@ urlpatterns = [
|
||||
# Landing / Dashboard
|
||||
path("", views.landing, name="landing"),
|
||||
path("dashboard/", views.dashboard, name="dashboard"),
|
||||
# Django auth (login, logout, password reset)
|
||||
path("", include("django.contrib.auth.urls")),
|
||||
# Allauth (SSO + local login/logout/password reset)
|
||||
path("accounts/", include("allauth.urls")),
|
||||
# Admin
|
||||
path("admin/", admin.site.urls),
|
||||
# Prometheus metrics
|
||||
|
||||
117
mnemosyne/themis/adapters.py
Normal file
117
mnemosyne/themis/adapters.py
Normal file
@@ -0,0 +1,117 @@
|
||||
import logging
|
||||
|
||||
from allauth.account.adapter import DefaultAccountAdapter
|
||||
from allauth.exceptions import ImmediateHttpResponse
|
||||
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
|
||||
from django.contrib import messages
|
||||
from django.contrib.auth.models import Group
|
||||
from django.shortcuts import redirect
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CasdoorAccountAdapter(DefaultSocialAccountAdapter):
|
||||
|
||||
def is_open_for_signup(self, request, sociallogin):
|
||||
return True
|
||||
|
||||
def pre_social_login(self, request, sociallogin):
|
||||
"""
|
||||
Runs on every SSO login (new and returning users).
|
||||
|
||||
Blocks superusers (they must use local auth) and re-syncs group
|
||||
and is_staff claims for returning users so IdP changes take effect
|
||||
immediately without requiring a re-registration.
|
||||
"""
|
||||
if sociallogin.user.id:
|
||||
user = sociallogin.user
|
||||
|
||||
if user.is_superuser:
|
||||
logger.warning(
|
||||
f"SSO login blocked for superuser {user.username}. "
|
||||
"Superusers must use local authentication."
|
||||
)
|
||||
messages.error(
|
||||
request,
|
||||
"Superuser accounts must use local authentication.",
|
||||
)
|
||||
raise ImmediateHttpResponse(redirect("account_login"))
|
||||
|
||||
extra_data = sociallogin.account.extra_data
|
||||
|
||||
org_identifier = extra_data.get("organization", "")
|
||||
if org_identifier:
|
||||
self._assign_organization(user, org_identifier)
|
||||
|
||||
groups = extra_data.get("groups", [])
|
||||
self._assign_groups(user, groups)
|
||||
|
||||
user.is_staff = any(g in ["staff", "sme", "admin"] for g in groups)
|
||||
user.save(update_fields=["is_staff"])
|
||||
|
||||
def populate_user(self, request, sociallogin, data):
|
||||
user = super().populate_user(request, sociallogin, data)
|
||||
|
||||
email = data.get("email", "")
|
||||
user.username = email
|
||||
user.email = email
|
||||
|
||||
user.first_name = data.get("given_name", "")
|
||||
user.last_name = data.get("family_name", "")
|
||||
|
||||
if not user.first_name and not user.last_name:
|
||||
full_name = data.get("name", "")
|
||||
if full_name:
|
||||
parts = full_name.split(" ", 1)
|
||||
user.first_name = parts[0]
|
||||
user.last_name = parts[1] if len(parts) > 1 else ""
|
||||
|
||||
user.is_superuser = False
|
||||
|
||||
groups = data.get("groups", [])
|
||||
user.is_staff = any(g in ["staff", "sme", "admin"] for g in groups)
|
||||
|
||||
return user
|
||||
|
||||
def save_user(self, request, sociallogin, form=None):
|
||||
user = super().save_user(request, sociallogin, form)
|
||||
extra_data = sociallogin.account.extra_data
|
||||
|
||||
org_identifier = extra_data.get("organization", "")
|
||||
if org_identifier:
|
||||
self._assign_organization(user, org_identifier)
|
||||
|
||||
groups = extra_data.get("groups", [])
|
||||
self._assign_groups(user, groups)
|
||||
return user
|
||||
|
||||
def _assign_organization(self, user, org_identifier):
|
||||
# Mnemosyne has no Organization model — no-op, preserved for future use.
|
||||
pass
|
||||
|
||||
def _assign_groups(self, user, group_names):
|
||||
group_mapping = {
|
||||
"view_only": "View Only",
|
||||
"staff": "Staff",
|
||||
"sme": "SME",
|
||||
"admin": "Admin",
|
||||
}
|
||||
user.groups.clear()
|
||||
for casdoor_group in group_names:
|
||||
django_group_name = group_mapping.get(casdoor_group.lower())
|
||||
if django_group_name:
|
||||
group, _ = Group.objects.get_or_create(name=django_group_name)
|
||||
user.groups.add(group)
|
||||
logger.info(f"Added {user.username} to group {django_group_name}")
|
||||
|
||||
|
||||
class LocalAccountAdapter(DefaultAccountAdapter):
|
||||
|
||||
def is_open_for_signup(self, request):
|
||||
return False
|
||||
|
||||
def authentication_failed(self, request, **kwargs):
|
||||
logger.warning(
|
||||
f"Local authentication failed from {request.META.get('REMOTE_ADDR')}"
|
||||
)
|
||||
super().authentication_failed(request, **kwargs)
|
||||
@@ -17,12 +17,16 @@ def themis_settings(request):
|
||||
Context variables:
|
||||
themis_app_name: Application display name
|
||||
themis_notification_poll_interval: Polling interval in seconds (0=disabled)
|
||||
CASDOOR_ENABLED: Whether SSO is active
|
||||
ALLOW_LOCAL_LOGIN: Whether the local login form is shown to non-superusers
|
||||
"""
|
||||
return {
|
||||
"themis_app_name": getattr(settings, "THEMIS_APP_NAME", "Application"),
|
||||
"themis_notification_poll_interval": getattr(
|
||||
settings, "THEMIS_NOTIFICATION_POLL_INTERVAL", 60
|
||||
),
|
||||
"CASDOOR_ENABLED": getattr(settings, "CASDOOR_ENABLED", False),
|
||||
"ALLOW_LOCAL_LOGIN": getattr(settings, "ALLOW_LOCAL_LOGIN", False),
|
||||
}
|
||||
|
||||
|
||||
|
||||
44
mnemosyne/themis/management/commands/create_sso_groups.py
Normal file
44
mnemosyne/themis/management/commands/create_sso_groups.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from django.contrib.auth.models import Group, Permission
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = "Create Django groups for Casdoor SSO integration"
|
||||
|
||||
def handle(self, *args, **options):
|
||||
groups_config = {
|
||||
"View Only": {"permissions": ["view"]},
|
||||
"Staff": {"permissions": ["view", "add", "change"]},
|
||||
"SME": {"permissions": ["view", "add", "change"]},
|
||||
"Admin": {"permissions": ["view", "add", "change", "delete"]},
|
||||
}
|
||||
|
||||
models_to_permission = [
|
||||
# themis
|
||||
"userprofile",
|
||||
"userapikey",
|
||||
"usernotification",
|
||||
# library
|
||||
"ingestjob",
|
||||
# llm_manager
|
||||
"llmapi",
|
||||
"llmmodel",
|
||||
"llmusage",
|
||||
]
|
||||
|
||||
for group_name, config in groups_config.items():
|
||||
group, created = Group.objects.get_or_create(name=group_name)
|
||||
status = "Created" if created else "Exists"
|
||||
self.stdout.write(f"{status}: {group_name}")
|
||||
|
||||
for perm_prefix in config["permissions"]:
|
||||
for model in models_to_permission:
|
||||
try:
|
||||
perm = Permission.objects.get(
|
||||
codename=f"{perm_prefix}_{model}"
|
||||
)
|
||||
group.permissions.add(perm)
|
||||
except Permission.DoesNotExist:
|
||||
pass
|
||||
|
||||
self.stdout.write(self.style.SUCCESS("SSO groups created successfully"))
|
||||
29
mnemosyne/themis/ssl_patch.py
Normal file
29
mnemosyne/themis/ssl_patch.py
Normal file
@@ -0,0 +1,29 @@
|
||||
import os
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def apply_ssl_bypass():
|
||||
ssl_verify = os.environ.get('CASDOOR_SSL_VERIFY', 'true').lower()
|
||||
if ssl_verify != 'false':
|
||||
return
|
||||
|
||||
logger.warning("SSL verification DISABLED — sandbox only")
|
||||
|
||||
import urllib3
|
||||
from requests.adapters import HTTPAdapter
|
||||
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
||||
|
||||
_original_send = HTTPAdapter.send
|
||||
|
||||
def _patched_send(self, request, stream=False, timeout=None,
|
||||
verify=True, cert=None, proxies=None):
|
||||
return _original_send(self, request, stream=stream,
|
||||
timeout=timeout, verify=False,
|
||||
cert=cert, proxies=proxies)
|
||||
|
||||
HTTPAdapter.send = _patched_send
|
||||
|
||||
|
||||
apply_ssl_bypass()
|
||||
130
mnemosyne/themis/tests/test_adapters.py
Normal file
130
mnemosyne/themis/tests/test_adapters.py
Normal file
@@ -0,0 +1,130 @@
|
||||
from unittest.mock import MagicMock
|
||||
|
||||
from django.contrib.auth.models import Group, User
|
||||
from django.test import TestCase
|
||||
|
||||
from themis.adapters import CasdoorAccountAdapter, LocalAccountAdapter
|
||||
|
||||
|
||||
class CasdoorAdapterTest(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.adapter = CasdoorAccountAdapter()
|
||||
|
||||
def test_signup_always_open(self):
|
||||
self.assertTrue(self.adapter.is_open_for_signup(MagicMock(), MagicMock()))
|
||||
|
||||
def test_superuser_never_set_via_sso(self):
|
||||
sociallogin = MagicMock()
|
||||
data = {"email": "admin@example.com", "groups": ["admin"]}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertFalse(user.is_superuser)
|
||||
|
||||
def test_email_used_as_username(self):
|
||||
sociallogin = MagicMock()
|
||||
data = {"email": "jane@example.com"}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertEqual(user.username, "jane@example.com")
|
||||
|
||||
def test_staff_flag_from_groups(self):
|
||||
sociallogin = MagicMock()
|
||||
for group in ["staff", "sme", "admin"]:
|
||||
data = {"email": "user@example.com", "groups": [group]}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertTrue(user.is_staff, f"is_staff should be True for group '{group}'")
|
||||
|
||||
def test_view_only_not_staff(self):
|
||||
sociallogin = MagicMock()
|
||||
data = {"email": "viewer@example.com", "groups": ["view_only"]}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertFalse(user.is_staff)
|
||||
|
||||
def test_name_fallback_parsing(self):
|
||||
sociallogin = MagicMock()
|
||||
data = {"email": "user@example.com", "name": "Jane Doe"}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertEqual(user.first_name, "Jane")
|
||||
self.assertEqual(user.last_name, "Doe")
|
||||
|
||||
def test_name_fallback_single_word(self):
|
||||
sociallogin = MagicMock()
|
||||
data = {"email": "user@example.com", "name": "Cher"}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertEqual(user.first_name, "Cher")
|
||||
self.assertEqual(user.last_name, "")
|
||||
|
||||
def test_given_family_name_takes_precedence_over_name(self):
|
||||
sociallogin = MagicMock()
|
||||
data = {
|
||||
"email": "user@example.com",
|
||||
"given_name": "Alice",
|
||||
"family_name": "Smith",
|
||||
"name": "Should Not Use This",
|
||||
}
|
||||
user = self.adapter.populate_user(MagicMock(), sociallogin, data)
|
||||
self.assertEqual(user.first_name, "Alice")
|
||||
self.assertEqual(user.last_name, "Smith")
|
||||
|
||||
def test_group_mapping(self):
|
||||
Group.objects.create(name="View Only")
|
||||
Group.objects.create(name="Staff")
|
||||
user = User.objects.create_user("test@example.com", "test@example.com")
|
||||
self.adapter._assign_groups(user, ["view_only", "staff"])
|
||||
group_names = set(user.groups.values_list("name", flat=True))
|
||||
self.assertEqual(group_names, {"View Only", "Staff"})
|
||||
|
||||
def test_group_mapping_clears_previous_groups(self):
|
||||
Group.objects.create(name="Admin")
|
||||
Group.objects.create(name="View Only")
|
||||
user = User.objects.create_user("test@example.com", "test@example.com")
|
||||
user.groups.add(Group.objects.get(name="Admin"))
|
||||
self.adapter._assign_groups(user, ["view_only"])
|
||||
group_names = set(user.groups.values_list("name", flat=True))
|
||||
self.assertEqual(group_names, {"View Only"})
|
||||
|
||||
def test_unknown_casdoor_group_ignored(self):
|
||||
user = User.objects.create_user("test@example.com", "test@example.com")
|
||||
self.adapter._assign_groups(user, ["unknown_role"])
|
||||
self.assertEqual(user.groups.count(), 0)
|
||||
|
||||
def test_superuser_sso_login_blocked(self):
|
||||
from allauth.exceptions import ImmediateHttpResponse
|
||||
|
||||
user = User.objects.create_superuser(
|
||||
"admin@example.com", "admin@example.com", "pass"
|
||||
)
|
||||
sociallogin = MagicMock()
|
||||
sociallogin.user = user
|
||||
sociallogin.user.id = user.id
|
||||
with self.assertRaises(ImmediateHttpResponse):
|
||||
self.adapter.pre_social_login(MagicMock(), sociallogin)
|
||||
|
||||
def test_groups_resync_on_returning_login(self):
|
||||
Group.objects.create(name="Admin")
|
||||
Group.objects.create(name="Staff")
|
||||
user = User.objects.create_user("user@example.com", "user@example.com")
|
||||
user.groups.add(Group.objects.get(name="Staff"))
|
||||
|
||||
sociallogin = MagicMock()
|
||||
sociallogin.user = user
|
||||
sociallogin.user.id = user.id
|
||||
sociallogin.account.extra_data = {
|
||||
"groups": ["admin"],
|
||||
"organization": "",
|
||||
}
|
||||
self.adapter.pre_social_login(MagicMock(), sociallogin)
|
||||
group_names = set(user.groups.values_list("name", flat=True))
|
||||
self.assertEqual(group_names, {"Admin"})
|
||||
|
||||
def test_new_user_skips_pre_social_login_sync(self):
|
||||
sociallogin = MagicMock()
|
||||
sociallogin.user.id = None
|
||||
# Should not raise or attempt any DB operations
|
||||
self.adapter.pre_social_login(MagicMock(), sociallogin)
|
||||
|
||||
|
||||
class LocalAdapterTest(TestCase):
|
||||
|
||||
def test_local_signup_disabled(self):
|
||||
adapter = LocalAccountAdapter()
|
||||
self.assertFalse(adapter.is_open_for_signup(MagicMock()))
|
||||
Reference in New Issue
Block a user