Implementaremos un ejemplo básico de una API con autenticación basada en tokens utilizando el módulo Flask-JWT-Extended. Flask-JWT-Extended es una extensión de Flask que facilita la implementación de autenticación basada en tokens JWT (JSON Web Tokens).
Nos aseguraremos de tener Flask-JWT-Extended instalado antes de implementar este ejemplo:
$ pip install flask_jwt_extended
El método es muy similar al realizado anteriormente en el ejemplo: Python Flask – Creación de APIs RESTful con Flask
Utilizaremos el mismo código, solo agregaremos las bibliotecas JWT y los decoradores @jwt_required() necesarios:
api.py
from flask import Flask, jsonify, request
from app import app, db # Importa la instancia de la aplicación y la base de datos desde 'app.py'
from app import Usuario # Importa el modelo de Usuario desde 'app.py'
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
@app.route('/api/v1/usuarios', methods=['GET'])
@jwt_required()
def obtener_usuarios():
usuarios = Usuario.query.all()
usuarios_json = [{'id': usuario.id, 'nombre': usuario.nombre, 'email': usuario.email} for usuario in usuarios]
return jsonify({'usuarios': usuarios_json})
@app.route('/api/v1/usuarios/<int:id>', methods=['GET'])
@jwt_required()
def obtener_usuario(id):
usuario = Usuario.query.get(id)
if usuario:
return jsonify({'id': usuario.id, 'nombre': usuario.nombre, 'email': usuario.email})
else:
return jsonify({'message': 'Usuario no encontrado'}), 404
@app.route('/api/v1/usuarios', methods=['POST'])
@jwt_required()
def crear_usuario():
datos_usuario = request.json
nuevo_usuario = Usuario(nombre=datos_usuario['nombre'], email=datos_usuario['email'], password=datos_usuario['password'])
db.session.add(nuevo_usuario)
db.session.commit()
return jsonify({'message': 'Usuario creado correctamente'}), 201
@app.route('/api/v1/usuarios/<int:id>', methods=['PUT'])
@jwt_required()
def actualizar_usuario(id):
usuario = Usuario.query.get(id)
if not usuario:
return jsonify({'message': 'Usuario no encontrado'}), 404
datos_actualizados = request.json
usuario.nombre = datos_actualizados['nombre']
usuario.email = datos_actualizados['email']
usuario.password = datos_actualizados['password']
db.session.commit()
return jsonify({'message': 'Usuario actualizado correctamente'}), 200
@app.route('/api/v1/usuarios/<int:id>', methods=['DELETE'])
@jwt_required()
def eliminar_usuario(id):
usuario = Usuario.query.get(id)
if not usuario:
return jsonify({'message': 'Usuario no encontrado'}), 404
db.session.delete(usuario)
db.session.commit()
return jsonify({'message': 'Usuario eliminado correctamente'}), 200
app.py
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from flask_principal import Principal, Permission, RoleNeed
from flask_principal import Identity, identity_changed
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
app = Flask(__name__)
app.config['SECRET_KEY'] = 'tu_clave_secreta' # Clave secreta para sesiones de usuario
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://postgres:123456@192.168.1.5/demo'
# Configuración de Flask-JWT-Extended
app.config['JWT_SECRET_KEY'] = 'supersecretkey' # Clave secreta para firmar tokens
jwt = JWTManager(app)
db = SQLAlchemy(app)
migrate = Migrate(app, db)
login_manager = LoginManager(app)
login_manager.login_view = 'login'
principal = Principal(app)
# Define roles
admin_role = RoleNeed('admin') # Define un rol llamado 'admin'
user_role = RoleNeed('usuario') # Define un rol llamado 'usuario'
# Define permisos
admin_permission = Permission(admin_role) # Define un permiso para el rol de administrador
user_permission = Permission(user_role) # Define un permiso para el rol de usuario
class Usuario(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
nombre = db.Column(db.String(80), nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password = db.Column(db.String(128), nullable=False)
@login_manager.user_loader
def load_user(user_id):
return Usuario.query.get(int(user_id))
@app.route('/registro', methods=['GET', 'POST'])
def registro():
if request.method == 'POST':
nombre = request.form['nombre']
email = request.form['email']
password = request.form['password']
nuevo_usuario = Usuario(nombre=nombre, email=email, password=password)
db.session.add(nuevo_usuario)
db.session.commit()
flash('¡Registro exitoso! Ahora puedes iniciar sesión.')
return redirect(url_for('login'))
return render_template('registro.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
email = request.form['email']
password = request.form['password']
usuario = Usuario.query.filter_by(email=email).first()
if usuario and usuario.password == password:
login_user(usuario)
flash('Inicio de sesión exitoso.')
return redirect(url_for('perfil'))
else:
flash('Credenciales incorrectas. Por favor, inténtalo de nuevo.')
return render_template('login.html')
@app.route('/login_auth', methods=['POST'])
def login_auth():
datos_login = request.json
nombre = datos_login['nombre']
password = datos_login['password']
# Verifica las credenciales del usuario (en la vida real, esto debe hacerse de manera segura)
usuario = Usuario.query.filter_by(nombre=nombre, password=password).first()
if usuario:
# Si las credenciales son válidas, emite un token JWT
access_token = create_access_token(identity=usuario.nombre)
return jsonify(access_token=access_token), 200
else:
return jsonify({"message": "Autenticación fallida"}), 401
@app.route('/logout')
@login_required
def logout():
logout_user()
flash('Has cerrado sesión.')
return redirect(url_for('login'))
@app.route('/perfil')
@login_required
def perfil():
return render_template('perfil.html')
@app.route('/admin_panel', methods=['GET', 'POST'])
@login_required
#@admin_permission.require(http_exception=403)
def admin_panel():
if request.method == 'POST':
usuario_id = request.form.get('usuario_id') # Supongamos que hay un campo de selección de usuarios en el formulario
usuario = Usuario.query.get(usuario_id)
if usuario:
# Asignar el rol de administrador al usuario seleccionado
admin_role = RoleNeed('admin')
identity = Identity(usuario.id)
identity_changed.send(app, identity=identity)
flash(f'Se ha asignado el rol de administrador a {usuario.nombre}.')
else:
flash('Usuario no encontrado.')
# Obtener la lista de usuarios (esto depende de tu modelo de datos)
usuarios = Usuario.query.all()
return render_template('admin_panel.html', usuarios=usuarios)
@app.route('/asignar_rol_admin')
@login_required
def asignar_rol_admin():
email_usuario_admin = 'admin@mail.com'
usuario = Usuario.query.filter_by(email=email_usuario_admin).first()
if usuario:
# Crea un objeto Identity para el usuario
identity = Identity(usuario.id)
# Crea el objeto RoleNeed para el rol de administrador
admin_role = RoleNeed('admin')
# Asigna el rol al usuario
identity.provides.add(admin_role)
# Cambia la identidad del usuario actual
identity_changed.send(app, identity=identity)
return 'Rol de administrador asignado correctamente.'
else:
return 'Usuario no encontrado.'
from api import *
if __name__ == '__main__':
with app.app_context():
db.create_all()
app.run(debug=True)
Para mayor claridad y fácil identificación de los cambios al código realizado en Python Flask – Creación de APIs RESTful con Flask, les muestro las modificaciones hechas a api.py:
...
...
from flask_jwt_extended import JWTManager, jwt_required, create_access_token, get_jwt_identity
...
...
# Configuración de Flask-JWT-Extended
app.config['JWT_SECRET_KEY'] = 'supersecretkey' # Clave secreta para firmar tokens
jwt = JWTManager(app)
...
...
@app.route('/login_auth', methods=['POST'])
def login_auth():
datos_login = request.json
nombre = datos_login['nombre']
password = datos_login['password']
# Verifica las credenciales del usuario (en la vida real, esto debe hacerse de manera segura)
usuario = Usuario.query.filter_by(nombre=nombre, password=password).first()
if usuario:
# Si las credenciales son válidas, emite un token JWT
access_token = create_access_token(identity=usuario.nombre)
return jsonify(access_token=access_token), 200
else:
return jsonify({"message": "Autenticación fallida"}), 401
...
...
Para poder acceder a las API con cURL, en adelante necesitaremos brindar el token, esto otorga la capa de seguridad al momento de acceder a las consultas CRUD desde las API, a diferencia que anteriormente se accedía directamente.
Para probar la API con cURL y tokens, primero debes asegurarte de tener un token válido. Puedes obtener un token válido haciendo una solicitud POST a la ruta /login
_auth que hemos creado en la aplicación Flask, proporcionando las credenciales de inicio de sesión correctas.
Obtener un token JWT válido:
curl -X POST -H "Content-Type: application/json" -d '{"nombre": "tu_nombre_usuario", "password": "tu_password"}' http://localhost:5000/login_auth
Retornará el token.
Usar el token para acceder a una ruta protegida:
# Reemplaza <tu_token> con el token JWT válido que obtuviste en el paso anterior
curl -X GET -H "Authorization: Bearer <tu_token>" http://localhost:5000/api/v1/usuarios
Este comando cURL incluye el token JWT válido en el encabezado de autorización ("Authorization: Bearer <tu_token>"
) y realiza una solicitud a la ruta /api/v1/usuarios
.
Si el token es válido y corresponde a un usuario autenticado, recibirás la respuesta de la API.