Python Flask – Autenticación de APIs con Tokens

Implementaremos un ejemplo básico de una API con autenticación basada en tokens utilizando el módulo Flask-JWT-Extended. Flask-JWT-Extended es una extensión de Flask que facilita la implementación de autenticación basada en tokens JWT (JSON Web Tokens).

Nos aseguraremos de tener Flask-JWT-Extended instalado antes de implementar este ejemplo:

$ pip install flask_jwt_extended

El método es muy similar al realizado anteriormente en el ejemplo: Python Flask – Creación de APIs RESTful con Flask

Utilizaremos el mismo código, solo agregaremos las bibliotecas JWT y los decoradores @jwt_required() necesarios:

api.py

from flask import Flask, jsonify, request
from app import app, db  # Importa la instancia de la aplicación y la base de datos desde 'app.py'
from app import Usuario  # Importa el modelo de Usuario desde 'app.py'
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity

@app.route('/api/v1/usuarios', methods=['GET'])
@jwt_required()
def obtener_usuarios():
    usuarios = Usuario.query.all()
    usuarios_json = [{'id': usuario.id, 'nombre': usuario.nombre, 'email': usuario.email} for usuario in usuarios]
    return jsonify({'usuarios': usuarios_json})

@app.route('/api/v1/usuarios/<int:id>', methods=['GET'])
@jwt_required()
def obtener_usuario(id):
    usuario = Usuario.query.get(id)
    if usuario:
        return jsonify({'id': usuario.id, 'nombre': usuario.nombre, 'email': usuario.email})
    else:
        return jsonify({'message': 'Usuario no encontrado'}), 404

@app.route('/api/v1/usuarios', methods=['POST'])
@jwt_required()
def crear_usuario():
    datos_usuario = request.json
    nuevo_usuario = Usuario(nombre=datos_usuario['nombre'], email=datos_usuario['email'], password=datos_usuario['password'])
    db.session.add(nuevo_usuario)
    db.session.commit()
    return jsonify({'message': 'Usuario creado correctamente'}), 201

@app.route('/api/v1/usuarios/<int:id>', methods=['PUT'])
@jwt_required()
def actualizar_usuario(id):
    usuario = Usuario.query.get(id)
    if not usuario:
        return jsonify({'message': 'Usuario no encontrado'}), 404

    datos_actualizados = request.json
    usuario.nombre = datos_actualizados['nombre']
    usuario.email = datos_actualizados['email']
    usuario.password = datos_actualizados['password']
    db.session.commit()
    return jsonify({'message': 'Usuario actualizado correctamente'}), 200

@app.route('/api/v1/usuarios/<int:id>', methods=['DELETE'])
@jwt_required()
def eliminar_usuario(id):
    usuario = Usuario.query.get(id)
    if not usuario:
        return jsonify({'message': 'Usuario no encontrado'}), 404

    db.session.delete(usuario)
    db.session.commit()
    return jsonify({'message': 'Usuario eliminado correctamente'}), 200

app.py

from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_principal import Principal, Permission, RoleNeed
from flask_principal import Identity, identity_changed
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity

app = Flask(__name__)
app.config['SECRET_KEY'] = 'tu_clave_secreta'  # Clave secreta para sesiones de usuario
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:123456@192.168.1.5/demo'

# Configuración de Flask-JWT-Extended
app.config['JWT_SECRET_KEY'] = 'supersecretkey'  # Clave secreta para firmar tokens
jwt = JWTManager(app)

db = SQLAlchemy(app)
migrate = Migrate(app, db)

login_manager = LoginManager(app)
login_manager.login_view = 'login'

principal = Principal(app)

# Define roles
admin_role = RoleNeed('admin')  # Define un rol llamado 'admin'
user_role = RoleNeed('usuario')  # Define un rol llamado 'usuario'

# Define permisos
admin_permission = Permission(admin_role)  # Define un permiso para el rol de administrador
user_permission = Permission(user_role)  # Define un permiso para el rol de usuario

class Usuario(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    nombre = db.Column(db.String(80), nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password = db.Column(db.String(128), nullable=False)

@login_manager.user_loader
def load_user(user_id):
    return Usuario.query.get(int(user_id))

@app.route('/registro', methods=['GET', 'POST'])
def registro():
    if request.method == 'POST':
        nombre = request.form['nombre']
        email = request.form['email']
        password = request.form['password']
        nuevo_usuario = Usuario(nombre=nombre, email=email, password=password)
        db.session.add(nuevo_usuario)
        db.session.commit()
        flash('¡Registro exitoso! Ahora puedes iniciar sesión.')
        return redirect(url_for('login'))
    return render_template('registro.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        email = request.form['email']
        password = request.form['password']
        usuario = Usuario.query.filter_by(email=email).first()  
        if usuario and usuario.password == password:
            login_user(usuario)
            flash('Inicio de sesión exitoso.')
            return redirect(url_for('perfil'))
        else:
            flash('Credenciales incorrectas. Por favor, inténtalo de nuevo.')
    return render_template('login.html')

@app.route('/login_auth', methods=['POST'])
def login_auth():
    datos_login = request.json
    nombre = datos_login['nombre']
    password = datos_login['password']

    # Verifica las credenciales del usuario (en la vida real, esto debe hacerse de manera segura)
    usuario = Usuario.query.filter_by(nombre=nombre, password=password).first()

    if usuario:
        # Si las credenciales son válidas, emite un token JWT
        access_token = create_access_token(identity=usuario.nombre)
        return jsonify(access_token=access_token), 200
    else:
        return jsonify({"message": "Autenticación fallida"}), 401
        
@app.route('/logout')
@login_required
def logout():
    logout_user()
    flash('Has cerrado sesión.')
    return redirect(url_for('login'))

@app.route('/perfil')
@login_required
def perfil():
    return render_template('perfil.html')

@app.route('/admin_panel', methods=['GET', 'POST'])
@login_required
#@admin_permission.require(http_exception=403)
def admin_panel():
    if request.method == 'POST':
        usuario_id = request.form.get('usuario_id')  # Supongamos que hay un campo de selección de usuarios en el formulario
        usuario = Usuario.query.get(usuario_id)
        if usuario:
            # Asignar el rol de administrador al usuario seleccionado
           admin_role = RoleNeed('admin')                       
           identity = Identity(usuario.id)
           identity_changed.send(app, identity=identity)
                        
           flash(f'Se ha asignado el rol de administrador a {usuario.nombre}.')
        else:
            flash('Usuario no encontrado.')
    
    # Obtener la lista de usuarios (esto depende de tu modelo de datos)
    usuarios = Usuario.query.all()
    return render_template('admin_panel.html', usuarios=usuarios)
    
@app.route('/asignar_rol_admin')
@login_required
def asignar_rol_admin():
    email_usuario_admin = 'admin@mail.com'
    usuario = Usuario.query.filter_by(email=email_usuario_admin).first()

    if usuario:
        # Crea un objeto Identity para el usuario
        identity = Identity(usuario.id)

        # Crea el objeto RoleNeed para el rol de administrador
        admin_role = RoleNeed('admin')

        # Asigna el rol al usuario
        identity.provides.add(admin_role)

        # Cambia la identidad del usuario actual
        identity_changed.send(app, identity=identity)

        return 'Rol de administrador asignado correctamente.'
    else:
        return 'Usuario no encontrado.'

from api import *

if __name__ == '__main__':
    with app.app_context():
        db.create_all()
    app.run(debug=True)

Para mayor claridad y fácil identificación de los cambios al código realizado en Python Flask – Creación de APIs RESTful con Flask, les muestro las modificaciones hechas a api.py:

...
...
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
...
...
# Configuración de Flask-JWT-Extended
app.config['JWT_SECRET_KEY'] = 'supersecretkey'  # Clave secreta para firmar tokens
jwt = JWTManager(app)
...
...

@app.route('/login_auth', methods=['POST'])
def login_auth():
    datos_login = request.json
    nombre = datos_login['nombre']
    password = datos_login['password']

    # Verifica las credenciales del usuario (en la vida real, esto debe hacerse de manera segura)
    usuario = Usuario.query.filter_by(nombre=nombre, password=password).first()

    if usuario:
        # Si las credenciales son válidas, emite un token JWT
        access_token = create_access_token(identity=usuario.nombre)
        return jsonify(access_token=access_token), 200
    else:
        return jsonify({"message": "Autenticación fallida"}), 401

...
...

Para poder acceder a las API con cURL, en adelante necesitaremos brindar el token, esto otorga la capa de seguridad al momento de acceder a las consultas CRUD desde las API, a diferencia que anteriormente se accedía directamente.

Para probar la API con cURL y tokens, primero debes asegurarte de tener un token válido. Puedes obtener un token válido haciendo una solicitud POST a la ruta /login_auth que hemos creado en la aplicación Flask, proporcionando las credenciales de inicio de sesión correctas.

Obtener un token JWT válido:

curl -X POST -H "Content-Type: application/json" -d '{"nombre": "tu_nombre_usuario", "password": "tu_password"}' http://localhost:5000/login_auth

Retornará el token.

Usar el token para acceder a una ruta protegida:

# Reemplaza <tu_token> con el token JWT válido que obtuviste en el paso anterior

curl -X GET -H "Authorization: Bearer <tu_token>" http://localhost:5000/api/v1/usuarios

Este comando cURL incluye el token JWT válido en el encabezado de autorización ("Authorization: Bearer <tu_token>") y realiza una solicitud a la ruta /api/v1/usuarios.

Si el token es válido y corresponde a un usuario autenticado, recibirás la respuesta de la API.

Deja un comentario