Développeurs, découvrez notre avancée en sécurité et gestion d'utilisateurs !
J'ai récemment implémenté un système de gestion d'utilisateurs et de sécurité robuste avec Django et Django REST Framework. Voici ce que notre solution apporte :
Inscription et Authentification Sécurisée:
Utilisation de APIView et TokenObtainPairView pour une gestion JWT avancée avec RefreshToken.
Protection contre les attaques par force brute grâce à throttle_scope pour les vues RegisterView et CustomTokenObtainPairView.
Enregistrement de l'historique des connexions pour chaque utilisateur avec LoginHistory pour une traçabilité complète.
Gestion des Mots de Passe:
Vue ChangePasswordView avec IsAuthenticated pour garantir que seul l'utilisateur peut changer son mot de passe, incluant la mise à jour de password_changed_at et l'invalidation des anciens tokens.
Profil Utilisateur et Historique de Connexion:
UserProfileView pour une mise à jour sécurisée du profil.
LoginHistoryView pour permettre aux utilisateurs de consulter leur historique de connexion, optimisé avec select_related pour minimiser les requêtes SQL.
Déconnexion Sécurisée:
LogoutView avec gestion de RefreshToken pour blacklister le token lors de la déconnexion.
Notre approche combine :
Sécurité: Utilisation de AllowAny pour l'inscription mais IsAuthenticated pour les actions sensibles.
Performances: Gestion des tentatives de connexion pour éviter les blocages par DoS, et optimisation des requêtes avec Django ORM.
Expérience Utilisateur: Notifications claires des succès ou des erreurs grâce à des messages traduits avec gettext_lazy.
Trêve de bavardage passons a l'implémentation.
manage.py
from django.contrib.auth.models import BaseUserManager
from django.utils.translation import gettext_lazy as _
class CustomUserManager(BaseUserManager):
def create_user(self,email,password=None,**extra_fields):
if not email:
raise ValueError(_("L'email est obligatoire"))
email = self.normalize_email(email)
user = self.model(email=email, **extra_fields)
user.set_password(password)
user.save
return user
def create_superuser(self,email,password,**extra_fields):
extra_fields.setdefault('is_staff',True)
extra_fields.setdefault('is_superuser',True)
extra_fields.setdefault('is_active',True)
return self.create_user(email,password,**extra_fields)
models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.core.validators import RegexValidator
from django.utils.translation import gettext_lazy as _
import uuid
from users.managers import CustomUserManager
class User(AbstractUser):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
username = None
email = models.EmailField(
_('adresse email'),
unique=True,
error_messages={
'unique': _("Un utilisateur avec cette adresse email existe déjà."),
}
)
first_name = models.CharField(_('prénom'), max_length=150)
last_name = models.CharField(_('nom'), max_length=150)
phone_regex = RegexValidator(
regex=r'^\+?1?\d{9,15}$',
message=_("Le numéro de téléphone doit être au format: '+999999999'. 15 chiffres maximum.")
)
phone = models.CharField(
_('téléphone'),
validators=[phone_regex],
max_length=15,
blank=True
)
date_of_birth = models.DateField(_('date de naissance'), null=True, blank=True)
is_verified = models.BooleanField(_('vérifié'), default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
last_login_ip = models.GenericIPAddressField(null=True, blank=True)
failed_login_attempts = models.IntegerField(default=0)
is_locked = models.BooleanField(default=False)
lock_timestamp = models.DateTimeField(null=True, blank=True)
password_changed_at = models.DateTimeField(null=True, blank=True)
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ['first_name', 'last_name']
objects = CustomUserManager()
class Meta:
verbose_name = _('user')
verbose_name_plural = _('users')
ordering = ['-created_at']
def __str__(self):
return self.email
def lock_account(self):
from django.utils import timezone
self.is_locked = True
self.lock_timestamp = timezone.now()
self.save()
def reset_login_attempts(self):
self.failed_login_attempts = 0
self.is_locked = False
self.lock_timestamp = None
self.save()
class UserProfile(models.Model):
GENDER_CHOICES = [
('M', _('Masculin')),
('F', _('Féminin')),
('O', _('Autre')),
]
user = models.OneToOneField(
User,
on_delete=models.CASCADE,
related_name='profile'
)
avatar = models.ImageField(
upload_to='avatars/%Y/%m/',
null=True,
blank=True
)
bio = models.TextField(max_length=500, blank=True)
location = models.CharField(max_length=100, blank=True)
preferences = models.JSONField(
default=dict,
help_text=_("Préférences utilisateur en format JSON")
)
gender = models.CharField(
max_length=1,
choices=GENDER_CHOICES,
blank=True
)
website = models.URLField(blank=True)
class Meta:
verbose_name = _('profil utilisateur')
verbose_name_plural = _('profils utilisateurs')
def __str__(self):
return f"Profil de {self.user.email}"
class LoginHistory(models.Model):
LOGIN_STATUS_CHOICES = [
('success', _('Succès')),
('failed', _('Échec')),
('locked', _('Compte verrouillé')),
]
user = models.ForeignKey(
User,
on_delete=models.CASCADE,
related_name='login_history'
)
login_datetime = models.DateTimeField(auto_now_add=True)
ip_address = models.GenericIPAddressField()
user_agent = models.CharField(max_length=255)
status = models.CharField(
max_length=20,
choices=LOGIN_STATUS_CHOICES,
default='failed'
)
location_city = models.CharField(max_length=100, blank=True)
location_country = models.CharField(max_length=100, blank=True)
class Meta:
verbose_name = _('historique de connexion')
verbose_name_plural = _('historiques de connexion')
ordering = ['-login_datetime']
indexes = [
models.Index(fields=['user', '-login_datetime']),
]
def __str__(self):
return f"{self.user.email} - {self.login_datetime} - {self.status}"
serializers.py
from rest_framework import serializers
from django.contrib.auth import get_user_model, password_validation
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from users.models import UserProfile, LoginHistory
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ValidationError
from django.utils import timezone
import re
User = get_user_model()
class UserProfileSerializer(serializers.ModelSerializer):
avatar_url = serializers.SerializerMethodField()
class Meta:
model = UserProfile
fields = ('avatar', 'avatar_url', 'bio', 'location', 'preferences', 'gender', 'website')
extra_kwargs = {
'avatar': {'write_only': True}
}
def get_avatar_url(self, obj):
if obj.avatar:
return self.context['request'].build_absolute_uri(obj.avatar.url)
return None
def validate_website(self, value):
if value and not value.startswith(('http://', 'https://')):
value = 'https://' + value
return value
class UserSerializer(serializers.ModelSerializer):
profile = UserProfileSerializer(required=False)
password = serializers.CharField(write_only=True, required=True)
password_confirm = serializers.CharField(write_only=True, required=True)
full_name = serializers.SerializerMethodField()
class Meta:
model = User
fields = ('id', 'email', 'password', 'password_confirm', 'first_name',
'last_name', 'phone', 'date_of_birth', 'profile', 'full_name',
'created_at', 'is_verified')
extra_kwargs = {
'first_name': {'required': True},
'last_name': {'required': True},
'email': {'required': True},
'created_at': {'read_only': True},
'is_verified': {'read_only': True}
}
def get_full_name(self, obj):
return f"{obj.first_name} {obj.last_name}"
def validate_email(self, value):
if User.objects.filter(email__iexact=value).exists():
raise serializers.ValidationError(_("Un utilisateur avec cette adresse email existe déjà."))
return value.lower()
def validate_phone(self, value):
if value:
pattern = r'^\+?1?\d{9,15}$'
if not re.match(pattern, value):
raise serializers.ValidationError(
_("Format de téléphone invalide. Utilisez le format: '+999999999'")
)
return value
def validate_password(self, value):
try:
password_validation.validate_password(value)
except ValidationError as e:
raise serializers.ValidationError(list(e.messages))
return value
def validate(self, attrs):
if attrs['password'] != attrs.pop('password_confirm'):
raise serializers.ValidationError({"password": _("Les mots de passe ne correspondent pas")})
if attrs.get('date_of_birth'):
if attrs['date_of_birth'] > timezone.now().date():
raise serializers.ValidationError(
{"date_of_birth": _("La date de naissance ne peut pas être dans le futur")}
)
return attrs
def create(self, validated_data):
profile_data = validated_data.pop('profile', None)
password = validated_data.pop('password')
user = User.objects.create(**validated_data)
user.set_password(password)
user.password_changed_at = timezone.now()
user.save()
if profile_data:
UserProfile.objects.create(user=user, **profile_data)
return user
def update(self, instance, validated_data):
profile_data = validated_data.pop('profile', None)
password = validated_data.pop('password', None)
for attr, value in validated_data.items():
setattr(instance, attr, value)
if password:
instance.set_password(password)
instance.password_changed_at = timezone.now()
instance.save()
if profile_data and hasattr(instance, 'profile'):
for attr, value in profile_data.items():
setattr(instance.profile, attr, value)
instance.profile.save()
elif profile_data:
UserProfile.objects.create(user=instance, **profile_data)
return instance
class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
def validate(self, attrs):
try:
data = super().validate(attrs)
except Exception as e:
user = User.objects.filter(email=attrs.get('email')).first()
if user:
user.failed_login_attempts += 1
if user.failed_login_attempts >= 5: # Configurable
user.lock_account()
user.save()
raise
user = self.user
if user.is_locked:
raise serializers.ValidationError({
"error": _("Compte verrouillé. Veuillez contacter le support."),
"locked_since": user.lock_timestamp
})
data.update({
'user': {
'id': str(user.id),
'email': user.email,
'full_name': f"{user.first_name} {user.last_name}",
'is_verified': user.is_verified,
'profile': UserProfileSerializer(
user.profile,
context=self.context
).data if hasattr(user, 'profile') else None
}
})
user.reset_login_attempts()
return data
class ChangePasswordSerializer(serializers.Serializer):
old_password = serializers.CharField(required=True)
new_password = serializers.CharField(required=True)
new_password_confirm = serializers.CharField(required=True)
def validate_old_password(self, value):
user = self.context['request'].user
if not user.check_password(value):
raise serializers.ValidationError(_("Ancien mot de passe incorrect"))
return value
def validate_new_password(self, value):
try:
password_validation.validate_password(value, self.context['request'].user)
except ValidationError as e:
raise serializers.ValidationError(list(e.messages))
return value
def validate(self, attrs):
if attrs['new_password'] != attrs['new_password_confirm']:
raise serializers.ValidationError({
"new_password": _("Les nouveaux mots de passe ne correspondent pas")
})
if attrs['old_password'] == attrs['new_password']:
raise serializers.ValidationError({
"new_password": _("Le nouveau mot de passe doit être différent de l'ancien")
})
return attrs
class LoginHistorySerializer(serializers.ModelSerializer):
user_email = serializers.CharField(source='user.email', read_only=True)
class Meta:
model = LoginHistory
fields = ('id', 'user_email', 'login_datetime', 'ip_address',
'user_agent', 'status', 'location_city', 'location_country')
read_only_fields = ('id', 'login_datetime')
views.py
from rest_framework import status, generics
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from rest_framework_simplejwt.tokens import RefreshToken, TokenError
from django.contrib.auth import get_user_model
from django.utils.translation import gettext_lazy as _
from django.core.cache import cache
from django.utils import timezone
from users.serializers import (
UserSerializer,
CustomTokenObtainPairSerializer,
ChangePasswordSerializer,
LoginHistorySerializer
)
from .models import LoginHistory
import logging
logger = logging.getLogger(__name__)
User = get_user_model()
class RegisterView(APIView):
permission_classes = (AllowAny,)
throttle_scope = 'registration' # Protection contre les attaques par force brute
def post(self, request):
try:
serializer = UserSerializer(data=request.data, context={'request': request})
if serializer.is_valid():
user = serializer.save()
# Créer les tokens JWT
refresh = RefreshToken.for_user(user)
# Enregistrer l'historique
LoginHistory.objects.create(
user=user,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
status='success'
)
return Response({
'user': UserSerializer(user, context={'request': request}).data,
'refresh': str(refresh),
'access': str(refresh.access_token),
}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Error in registration: {str(e)}", exc_info=True)
return Response(
{'error': _("Une erreur est survenue lors de l'inscription")},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def _get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')
class CustomTokenObtainPairView(TokenObtainPairView):
serializer_class = CustomTokenObtainPairSerializer
throttle_scope = 'login'
def post(self, request, *args, **kwargs):
try:
# Vérifier le nombre de tentatives par IP
ip = self._get_client_ip(request)
attempts_key = f"login_attempts_{ip}"
attempts = cache.get(attempts_key, 0)
if attempts >= 5:
return Response(
{'error': _("Trop de tentatives. Réessayez plus tard.")},
status=status.HTTP_429_TOO_MANY_REQUESTS
)
serializer = self.get_serializer(data=request.data)
try:
serializer.is_valid(raise_exception=True)
except ValidationError as e:
# Incrémenter le compteur de tentatives
cache.set(attempts_key, attempts + 1, timeout=300)
return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)
user = serializer.user
cache.delete(attempts_key) # Réinitialiser les tentatives en cas de succès
# Enregistrer l'historique de connexion
LoginHistory.objects.create(
user=user,
ip_address=ip,
user_agent=request.META.get('HTTP_USER_AGENT', ''),
status='success',
location_city=self._get_location_city(ip),
location_country=self._get_location_country(ip)
)
# Mettre à jour les informations de connexion
user.last_login_ip = ip
user.last_login = timezone.now()
user.save(update_fields=['last_login_ip', 'last_login'])
return Response(serializer.validated_data)
except Exception as e:
logger.error(f"Error in login: {str(e)}", exc_info=True)
return Response(
{'error': _("Une erreur est survenue lors de la connexion")},
status=status.HTTP_400_BAD_REQUEST
)
def _get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')
def _get_location_city(self, ip):
# Implémenter la géolocalisation IP ici
return ""
def _get_location_country(self, ip):
# Implémenter la géolocalisation IP ici
return ""
class ChangePasswordView(generics.UpdateAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = ChangePasswordSerializer
throttle_scope = 'password_change'
def update(self, request, *args, **kwargs):
try:
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = request.user
# Changer le mot de passe
user.set_password(serializer.validated_data['new_password'])
user.password_changed_at = timezone.now()
user.save()
# Blacklister tous les tokens existants
RefreshToken.for_user(user)
# Créer de nouveaux tokens
refresh = RefreshToken.for_user(user)
return Response({
'message': _("Mot de passe modifié avec succès"),
'refresh': str(refresh),
'access': str(refresh.access_token)
}, status=status.HTTP_200_OK)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Error in password change: {str(e)}", exc_info=True)
return Response(
{'error': _("Une erreur est survenue lors du changement de mot de passe")},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class UserProfileView(generics.RetrieveUpdateAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = UserSerializer
def get_object(self):
return self.request.user
def update(self, request, *args, **kwargs):
try:
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(
instance,
data=request.data,
partial=partial,
context={'request': request}
)
if serializer.is_valid():
self.perform_update(serializer)
return Response(serializer.data)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Error updating profile: {str(e)}", exc_info=True)
return Response(
{'error': _("Une erreur est survenue lors de la mise à jour du profil")},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class LoginHistoryView(generics.ListAPIView):
permission_classes = (IsAuthenticated,)
serializer_class = LoginHistorySerializer
pagination_class = None # Ou définir une pagination personnalisée
def get_queryset(self):
return (LoginHistory.objects
.filter(user=self.request.user)
.select_related('user')
.order_by('-login_datetime'))
class LogoutView(APIView):
permission_classes = (IsAuthenticated,)
def post(self, request):
try:
refresh_token = request.data.get("refresh_token")
if not refresh_token:
return Response(
{'error': _("Le token de rafraîchissement est requis")},
status=status.HTTP_400_BAD_REQUEST
)
token = RefreshToken(refresh_token)
token.blacklist()
# Enregistrer la déconnexion
LoginHistory.objects.create(
user=request.user,
ip_address=self._get_client_ip(request),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
status='logout'
)
return Response(status=status.HTTP_204_NO_CONTENT)
except TokenError:
return Response(
{'error': _("Token invalide ou expiré")},
status=status.HTTP_400_BAD_REQUEST
)
except Exception as e:
logger.error(f"Error in logout: {str(e)}", exc_info=True)
return Response(
{'error': _("Une erreur est survenue lors de la déconnexion")},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
def _get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
return x_forwarded_for.split(',')[0].strip()
return request.META.get('REMOTE_ADDR')
urls.py
from django.urls import path
from rest_framework_simplejwt.views import TokenRefreshView
from users.views import (
RegisterView,
CustomTokenObtainPairView,
ChangePasswordView,
UserProfileView,
LoginHistoryView,
LogoutView
)
urlpatterns = [
path('register/', RegisterView.as_view(), name='register'),
path('login/', CustomTokenObtainPairView.as_view(), name='token_obtain_pair'),
path('token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
path('change-password/', ChangePasswordView.as_view(), name='change_password'),
path('profile/', UserProfileView.as_view(), name='user_profile'),
path('login-history/', LoginHistoryView.as_view(), name='login_history'),
path('logout/', LogoutView.as_view(), name='logout'),
]
Top comments (0)