DEV Community

PEMPEME MOHAMED CHAMSOUDINE
PEMPEME MOHAMED CHAMSOUDINE

Posted on

AUTHENTIFICATION JWT AVEC DJANGO

Image descriptionDéveloppeurs, découvrez notre avancée en sécurité et gestion d'utilisateurs !

J'ai récemment implémenté un système de gestion d'utilisateurs et de sécurité robuste avec Django et Django REST Framework. Voici ce que notre solution apporte :

Inscription et Authentification Sécurisée:

Utilisation de APIView et TokenObtainPairView pour une gestion JWT avancée avec RefreshToken.
Protection contre les attaques par force brute grâce à throttle_scope pour les vues RegisterView et CustomTokenObtainPairView.
Enregistrement de l'historique des connexions pour chaque utilisateur avec LoginHistory pour une traçabilité complète.

Gestion des Mots de Passe:

Vue ChangePasswordView avec IsAuthenticated pour garantir que seul l'utilisateur peut changer son mot de passe, incluant la mise à jour de password_changed_at et l'invalidation des anciens tokens.

Profil Utilisateur et Historique de Connexion:

UserProfileView pour une mise à jour sécurisée du profil.
LoginHistoryView pour permettre aux utilisateurs de consulter leur historique de connexion, optimisé avec select_related pour minimiser les requêtes SQL.

Déconnexion Sécurisée:

LogoutView avec gestion de RefreshToken pour blacklister le token lors de la déconnexion.

Notre approche combine :

Sécurité: Utilisation de AllowAny pour l'inscription mais IsAuthenticated pour les actions sensibles.

Performances: Gestion des tentatives de connexion pour éviter les blocages par DoS, et optimisation des requêtes avec Django ORM.

Expérience Utilisateur: Notifications claires des succès ou des erreurs grâce à des messages traduits avec gettext_lazy.

Trêve de bavardage passons a l'implémentation.

manage.py

from django.contrib.auth.models import BaseUserManager
from django.utils.translation import gettext_lazy as _


class CustomUserManager(BaseUserManager):
    def create_user(self,email,password=None,**extra_fields):
        if not email:
            raise ValueError(_("L'email est obligatoire"))

        email = self.normalize_email(email)
        user = self.model(email=email, **extra_fields)
        user.set_password(password)
        user.save
        return user

    def create_superuser(self,email,password,**extra_fields):
        extra_fields.setdefault('is_staff',True)
        extra_fields.setdefault('is_superuser',True)
        extra_fields.setdefault('is_active',True)

        return self.create_user(email,password,**extra_fields)



Enter fullscreen mode Exit fullscreen mode

models.py

from django.db import models
from django.contrib.auth.models import AbstractUser
from django.core.validators import RegexValidator
from django.utils.translation import gettext_lazy as _
import uuid
from users.managers import CustomUserManager

class User(AbstractUser):
    id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    username = None
    email = models.EmailField(
        _('adresse email'), 
        unique=True,
        error_messages={
            'unique': _("Un utilisateur avec cette adresse email existe déjà."),
        }
    )
    first_name = models.CharField(_('prénom'), max_length=150)
    last_name = models.CharField(_('nom'), max_length=150)
    phone_regex = RegexValidator(
        regex=r'^\+?1?\d{9,15}$',
        message=_("Le numéro de téléphone doit être au format: '+999999999'. 15 chiffres maximum.")
    )
    phone = models.CharField(
        _('téléphone'), 
        validators=[phone_regex], 
        max_length=15, 
        blank=True
    )
    date_of_birth = models.DateField(_('date de naissance'), null=True, blank=True)
    is_verified = models.BooleanField(_('vérifié'), default=False)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    last_login_ip = models.GenericIPAddressField(null=True, blank=True)
    failed_login_attempts = models.IntegerField(default=0)
    is_locked = models.BooleanField(default=False)
    lock_timestamp = models.DateTimeField(null=True, blank=True)
    password_changed_at = models.DateTimeField(null=True, blank=True)

    USERNAME_FIELD = 'email'
    REQUIRED_FIELDS = ['first_name', 'last_name']

    objects = CustomUserManager()

    class Meta:
        verbose_name = _('user')
        verbose_name_plural = _('users')
        ordering = ['-created_at']

    def __str__(self):
        return self.email

    def lock_account(self):
        from django.utils import timezone
        self.is_locked = True
        self.lock_timestamp = timezone.now()
        self.save()

    def reset_login_attempts(self):
        self.failed_login_attempts = 0
        self.is_locked = False
        self.lock_timestamp = None
        self.save()

class UserProfile(models.Model):
    GENDER_CHOICES = [
        ('M', _('Masculin')),
        ('F', _('Féminin')),
        ('O', _('Autre')),
    ]

    user = models.OneToOneField(
        User, 
        on_delete=models.CASCADE, 
        related_name='profile'
    )
    avatar = models.ImageField(
        upload_to='avatars/%Y/%m/', 
        null=True, 
        blank=True
    )
    bio = models.TextField(max_length=500, blank=True)
    location = models.CharField(max_length=100, blank=True)
    preferences = models.JSONField(
        default=dict,
        help_text=_("Préférences utilisateur en format JSON")
    )
    gender = models.CharField(
        max_length=1,
        choices=GENDER_CHOICES,
        blank=True
    )
    website = models.URLField(blank=True)

    class Meta:
        verbose_name = _('profil utilisateur')
        verbose_name_plural = _('profils utilisateurs')

    def __str__(self):
        return f"Profil de {self.user.email}"

class LoginHistory(models.Model):
    LOGIN_STATUS_CHOICES = [
        ('success', _('Succès')),
        ('failed', _('Échec')),
        ('locked', _('Compte verrouillé')),
    ]

    user = models.ForeignKey(
        User, 
        on_delete=models.CASCADE,
        related_name='login_history'
    )
    login_datetime = models.DateTimeField(auto_now_add=True)
    ip_address = models.GenericIPAddressField()
    user_agent = models.CharField(max_length=255)
    status = models.CharField(
        max_length=20,
        choices=LOGIN_STATUS_CHOICES,
        default='failed'
    )
    location_city = models.CharField(max_length=100, blank=True)
    location_country = models.CharField(max_length=100, blank=True)

    class Meta:
        verbose_name = _('historique de connexion')
        verbose_name_plural = _('historiques de connexion')
        ordering = ['-login_datetime']
        indexes = [
            models.Index(fields=['user', '-login_datetime']),
        ]

    def __str__(self):
        return f"{self.user.email} - {self.login_datetime} - {self.status}"
Enter fullscreen mode Exit fullscreen mode

serializers.py

from rest_framework import serializers
from django.contrib.auth import get_user_model, password_validation
from rest_framework_simplejwt.serializers import TokenObtainPairSerializer
from users.models import UserProfile, LoginHistory
from django.utils.translation import gettext_lazy as _
from django.core.exceptions import ValidationError
from django.utils import timezone
import re

User = get_user_model()

class UserProfileSerializer(serializers.ModelSerializer):
    avatar_url = serializers.SerializerMethodField()

    class Meta:
        model = UserProfile
        fields = ('avatar', 'avatar_url', 'bio', 'location', 'preferences', 'gender', 'website')
        extra_kwargs = {
            'avatar': {'write_only': True}
        }

    def get_avatar_url(self, obj):
        if obj.avatar:
            return self.context['request'].build_absolute_uri(obj.avatar.url)
        return None

    def validate_website(self, value):
        if value and not value.startswith(('http://', 'https://')):
            value = 'https://' + value
        return value

class UserSerializer(serializers.ModelSerializer):
    profile = UserProfileSerializer(required=False)
    password = serializers.CharField(write_only=True, required=True)
    password_confirm = serializers.CharField(write_only=True, required=True)
    full_name = serializers.SerializerMethodField()

    class Meta:
        model = User
        fields = ('id', 'email', 'password', 'password_confirm', 'first_name', 
                 'last_name', 'phone', 'date_of_birth', 'profile', 'full_name',
                 'created_at', 'is_verified')
        extra_kwargs = {
            'first_name': {'required': True},
            'last_name': {'required': True},
            'email': {'required': True},
            'created_at': {'read_only': True},
            'is_verified': {'read_only': True}
        }

    def get_full_name(self, obj):
        return f"{obj.first_name} {obj.last_name}"

    def validate_email(self, value):
        if User.objects.filter(email__iexact=value).exists():
            raise serializers.ValidationError(_("Un utilisateur avec cette adresse email existe déjà."))
        return value.lower()

    def validate_phone(self, value):
        if value:
            pattern = r'^\+?1?\d{9,15}$'
            if not re.match(pattern, value):
                raise serializers.ValidationError(
                    _("Format de téléphone invalide. Utilisez le format: '+999999999'")
                )
        return value

    def validate_password(self, value):
        try:
            password_validation.validate_password(value)
        except ValidationError as e:
            raise serializers.ValidationError(list(e.messages))
        return value

    def validate(self, attrs):
        if attrs['password'] != attrs.pop('password_confirm'):
            raise serializers.ValidationError({"password": _("Les mots de passe ne correspondent pas")})

        if attrs.get('date_of_birth'):
            if attrs['date_of_birth'] > timezone.now().date():
                raise serializers.ValidationError(
                    {"date_of_birth": _("La date de naissance ne peut pas être dans le futur")}
                )
        return attrs

    def create(self, validated_data):
        profile_data = validated_data.pop('profile', None)
        password = validated_data.pop('password')

        user = User.objects.create(**validated_data)
        user.set_password(password)
        user.password_changed_at = timezone.now()
        user.save()

        if profile_data:
            UserProfile.objects.create(user=user, **profile_data)
        return user

    def update(self, instance, validated_data):
        profile_data = validated_data.pop('profile', None)
        password = validated_data.pop('password', None)

        for attr, value in validated_data.items():
            setattr(instance, attr, value)

        if password:
            instance.set_password(password)
            instance.password_changed_at = timezone.now()

        instance.save()

        if profile_data and hasattr(instance, 'profile'):
            for attr, value in profile_data.items():
                setattr(instance.profile, attr, value)
            instance.profile.save()
        elif profile_data:
            UserProfile.objects.create(user=instance, **profile_data)

        return instance

class CustomTokenObtainPairSerializer(TokenObtainPairSerializer):
    def validate(self, attrs):
        try:
            data = super().validate(attrs)
        except Exception as e:
            user = User.objects.filter(email=attrs.get('email')).first()
            if user:
                user.failed_login_attempts += 1
                if user.failed_login_attempts >= 5:  # Configurable
                    user.lock_account()
                user.save()
            raise

        user = self.user
        if user.is_locked:
            raise serializers.ValidationError({
                "error": _("Compte verrouillé. Veuillez contacter le support."),
                "locked_since": user.lock_timestamp
            })

        data.update({
            'user': {
                'id': str(user.id),
                'email': user.email,
                'full_name': f"{user.first_name} {user.last_name}",
                'is_verified': user.is_verified,
                'profile': UserProfileSerializer(
                    user.profile,
                    context=self.context
                ).data if hasattr(user, 'profile') else None
            }
        })

        user.reset_login_attempts()
        return data

class ChangePasswordSerializer(serializers.Serializer):
    old_password = serializers.CharField(required=True)
    new_password = serializers.CharField(required=True)
    new_password_confirm = serializers.CharField(required=True)

    def validate_old_password(self, value):
        user = self.context['request'].user
        if not user.check_password(value):
            raise serializers.ValidationError(_("Ancien mot de passe incorrect"))
        return value

    def validate_new_password(self, value):
        try:
            password_validation.validate_password(value, self.context['request'].user)
        except ValidationError as e:
            raise serializers.ValidationError(list(e.messages))
        return value

    def validate(self, attrs):
        if attrs['new_password'] != attrs['new_password_confirm']:
            raise serializers.ValidationError({
                "new_password": _("Les nouveaux mots de passe ne correspondent pas")
            })
        if attrs['old_password'] == attrs['new_password']:
            raise serializers.ValidationError({
                "new_password": _("Le nouveau mot de passe doit être différent de l'ancien")
            })
        return attrs

class LoginHistorySerializer(serializers.ModelSerializer):
    user_email = serializers.CharField(source='user.email', read_only=True)

    class Meta:
        model = LoginHistory
        fields = ('id', 'user_email', 'login_datetime', 'ip_address', 
                 'user_agent', 'status', 'location_city', 'location_country')
        read_only_fields = ('id', 'login_datetime')
Enter fullscreen mode Exit fullscreen mode

views.py

from rest_framework import status, generics
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView
from rest_framework_simplejwt.views import TokenObtainPairView
from rest_framework_simplejwt.tokens import RefreshToken, TokenError
from django.contrib.auth import get_user_model
from django.utils.translation import gettext_lazy as _
from django.core.cache import cache
from django.utils import timezone
from users.serializers import (
    UserSerializer, 
    CustomTokenObtainPairSerializer,
    ChangePasswordSerializer,
    LoginHistorySerializer
)
from .models import LoginHistory
import logging

logger = logging.getLogger(__name__)
User = get_user_model()

class RegisterView(APIView):
    permission_classes = (AllowAny,)
    throttle_scope = 'registration'  # Protection contre les attaques par force brute

    def post(self, request):
        try:
            serializer = UserSerializer(data=request.data, context={'request': request})
            if serializer.is_valid():
                user = serializer.save()

                # Créer les tokens JWT
                refresh = RefreshToken.for_user(user)

                # Enregistrer l'historique
                LoginHistory.objects.create(
                    user=user,
                    ip_address=self._get_client_ip(request),
                    user_agent=request.META.get('HTTP_USER_AGENT', ''),
                    status='success'
                )

                return Response({
                    'user': UserSerializer(user, context={'request': request}).data,
                    'refresh': str(refresh),
                    'access': str(refresh.access_token),
                }, status=status.HTTP_201_CREATED)
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
        except Exception as e:
            logger.error(f"Error in registration: {str(e)}", exc_info=True)
            return Response(
                {'error': _("Une erreur est survenue lors de l'inscription")},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

    def _get_client_ip(self, request):
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0].strip()
        return request.META.get('REMOTE_ADDR')



class CustomTokenObtainPairView(TokenObtainPairView):
    serializer_class = CustomTokenObtainPairSerializer
    throttle_scope = 'login'

    def post(self, request, *args, **kwargs):
        try:
            # Vérifier le nombre de tentatives par IP
            ip = self._get_client_ip(request)
            attempts_key = f"login_attempts_{ip}"
            attempts = cache.get(attempts_key, 0)

            if attempts >= 5:
                return Response(
                    {'error': _("Trop de tentatives. Réessayez plus tard.")},
                    status=status.HTTP_429_TOO_MANY_REQUESTS
                )

            serializer = self.get_serializer(data=request.data)

            try:
                serializer.is_valid(raise_exception=True)
            except ValidationError as e:
                # Incrémenter le compteur de tentatives
                cache.set(attempts_key, attempts + 1, timeout=300)
                return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)

            user = serializer.user
            cache.delete(attempts_key)  # Réinitialiser les tentatives en cas de succès

            # Enregistrer l'historique de connexion
            LoginHistory.objects.create(
                user=user,
                ip_address=ip,
                user_agent=request.META.get('HTTP_USER_AGENT', ''),
                status='success',
                location_city=self._get_location_city(ip),
                location_country=self._get_location_country(ip)
            )

            # Mettre à jour les informations de connexion
            user.last_login_ip = ip
            user.last_login = timezone.now()
            user.save(update_fields=['last_login_ip', 'last_login'])

            return Response(serializer.validated_data)

        except Exception as e:
            logger.error(f"Error in login: {str(e)}", exc_info=True)
            return Response(
                {'error': _("Une erreur est survenue lors de la connexion")},
                status=status.HTTP_400_BAD_REQUEST
            )


    def _get_client_ip(self, request):
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0].strip()
        return request.META.get('REMOTE_ADDR')

    def _get_location_city(self, ip):
        # Implémenter la géolocalisation IP ici
        return ""

    def _get_location_country(self, ip):
        # Implémenter la géolocalisation IP ici
        return ""

class ChangePasswordView(generics.UpdateAPIView):
    permission_classes = (IsAuthenticated,)
    serializer_class = ChangePasswordSerializer
    throttle_scope = 'password_change'

    def update(self, request, *args, **kwargs):
        try:
            serializer = self.get_serializer(data=request.data)
            if serializer.is_valid():
                user = request.user

                # Changer le mot de passe
                user.set_password(serializer.validated_data['new_password'])
                user.password_changed_at = timezone.now()
                user.save()

                # Blacklister tous les tokens existants
                RefreshToken.for_user(user)

                # Créer de nouveaux tokens
                refresh = RefreshToken.for_user(user)

                return Response({
                    'message': _("Mot de passe modifié avec succès"),
                    'refresh': str(refresh),
                    'access': str(refresh.access_token)
                }, status=status.HTTP_200_OK)

            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

        except Exception as e:
            logger.error(f"Error in password change: {str(e)}", exc_info=True)
            return Response(
                {'error': _("Une erreur est survenue lors du changement de mot de passe")},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class UserProfileView(generics.RetrieveUpdateAPIView):
    permission_classes = (IsAuthenticated,)
    serializer_class = UserSerializer

    def get_object(self):
        return self.request.user

    def update(self, request, *args, **kwargs):
        try:
            partial = kwargs.pop('partial', False)
            instance = self.get_object()
            serializer = self.get_serializer(
                instance, 
                data=request.data, 
                partial=partial,
                context={'request': request}
            )

            if serializer.is_valid():
                self.perform_update(serializer)
                return Response(serializer.data)

            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

        except Exception as e:
            logger.error(f"Error updating profile: {str(e)}", exc_info=True)
            return Response(
                {'error': _("Une erreur est survenue lors de la mise à jour du profil")},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class LoginHistoryView(generics.ListAPIView):
    permission_classes = (IsAuthenticated,)
    serializer_class = LoginHistorySerializer
    pagination_class = None  # Ou définir une pagination personnalisée

    def get_queryset(self):
        return (LoginHistory.objects
                .filter(user=self.request.user)
                .select_related('user')
                .order_by('-login_datetime'))

class LogoutView(APIView):
    permission_classes = (IsAuthenticated,)

    def post(self, request):
        try:
            refresh_token = request.data.get("refresh_token")
            if not refresh_token:
                return Response(
                    {'error': _("Le token de rafraîchissement est requis")},
                    status=status.HTTP_400_BAD_REQUEST
                )

            token = RefreshToken(refresh_token)
            token.blacklist()

            # Enregistrer la déconnexion
            LoginHistory.objects.create(
                user=request.user,
                ip_address=self._get_client_ip(request),
                user_agent=request.META.get('HTTP_USER_AGENT', ''),
                status='logout'
            )

            return Response(status=status.HTTP_204_NO_CONTENT)

        except TokenError:
            return Response(
                {'error': _("Token invalide ou expiré")},
                status=status.HTTP_400_BAD_REQUEST
            )
        except Exception as e:
            logger.error(f"Error in logout: {str(e)}", exc_info=True)
            return Response(
                {'error': _("Une erreur est survenue lors de la déconnexion")},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

    def _get_client_ip(self, request):
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            return x_forwarded_for.split(',')[0].strip()
        return request.META.get('REMOTE_ADDR')
Enter fullscreen mode Exit fullscreen mode

urls.py


from django.urls import path
from rest_framework_simplejwt.views import TokenRefreshView
from users.views import (
    RegisterView,
    CustomTokenObtainPairView,
    ChangePasswordView,
    UserProfileView,
    LoginHistoryView,
    LogoutView
)

urlpatterns = [
    path('register/', RegisterView.as_view(), name='register'),
    path('login/', CustomTokenObtainPairView.as_view(), name='token_obtain_pair'),
    path('token/refresh/', TokenRefreshView.as_view(), name='token_refresh'),
    path('change-password/', ChangePasswordView.as_view(), name='change_password'),
    path('profile/', UserProfileView.as_view(), name='user_profile'),
    path('login-history/', LoginHistoryView.as_view(), name='login_history'),
    path('logout/', LogoutView.as_view(), name='logout'),
]



Enter fullscreen mode Exit fullscreen mode

Top comments (0)