Tareas asincrónicas con Flask, Redis y Apio

    Introducción

    A medida que las aplicaciones web evolucionan y aumenta su uso, los casos de uso también se diversifican. Ahora estamos creando y usando sitios web para tareas más complejas que nunca. Algunas de estas tareas se pueden procesar y transmitir la retroalimentación a los usuarios al instante, mientras que otras requieren un procesamiento adicional y la transmisión de los resultados más adelante. La mayor adopción de acceso a Internet y dispositivos con capacidad para Internet ha llevado a un mayor tráfico de usuarios finales.

    En un intento por manejar un mayor tráfico o una mayor complejidad de la funcionalidad, a veces podemos optar por aplazar el trabajo y transmitir los resultados en un momento posterior. De esta manera, no podemos hacer que el usuario espere una hora desconocida en nuestra aplicación web y, en cambio, enviamos los resultados en un momento posterior. Podemos lograr esto utilizando tareas en segundo plano para procesar el trabajo cuando hay poco tráfico o procesar el trabajo en lotes.

    Una de las soluciones que podemos utilizar para lograrlo es Apio. Nos ayuda a descomponer piezas complejas de trabajo y hacer que las realicen diferentes máquinas para facilitar la carga en una máquina o reducir el tiempo necesario para completarlas.

    En esta publicación, exploraremos el uso de Celery para programar tareas en segundo plano en una aplicación Flask para descargar tareas que requieren muchos recursos y priorizar la respuesta a los usuarios finales.

    ¿Qué es una cola de tareas?

    Una cola de tareas es un mecanismo para distribuir pequeñas unidades de trabajo o tareas que se pueden ejecutar sin interferir con el ciclo de solicitud-respuesta de la mayoría de las aplicaciones basadas en web.

    Las colas de tareas son útiles para delegar trabajo que, de otro modo, ralentizaría las aplicaciones mientras se esperan respuestas. También se pueden utilizar para manejar tareas que consumen muchos recursos mientras la máquina o el proceso principal interactúa con el usuario.

    De esta manera, la interacción con el usuario es consistente, oportuna y no se ve afectada por la carga de trabajo.

    ¿Qué es el apio?

    El apio es una cola de tareas asincrónica basada en el paso de mensajes distribuidos para distribuir la carga de trabajo entre máquinas o subprocesos. Un sistema de apio consta de un cliente, un corredor y varios trabajadores.

    Estos trabajadores son responsables de la ejecución de las tareas o piezas de trabajo que se colocan en la cola y transmiten los resultados. Con Celery, puede tener trabajadores locales y remotos, lo que significa que el trabajo se puede delegar a máquinas diferentes y más capaces a través de Internet y los resultados se transmiten al cliente.

    De esta manera, se alivia la carga en la máquina principal y hay más recursos disponibles para manejar las solicitudes de los usuarios a medida que ingresan.

    El cliente en una configuración de Apio es responsable de enviar trabajos a los trabajadores y también de comunicarse con ellos mediante un intermediario de mensajes. El broker facilita la comunicación entre el cliente y los trabajadores en una instalación de Celery a través de una cola de mensajes, donde se agrega un mensaje a la cola y el broker lo entrega al cliente.

    Ejemplos de tales agentes de mensajes incluyen Redis y RabbitMQ.

    ¿Por qué utilizar apio?

    Hay varias razones por las que deberíamos usar apio para nuestras tareas de fondo. Primero, es bastante escalable, lo que permite agregar más trabajadores a pedido para atender el aumento de la carga o el tráfico. El apio también se encuentra todavía en desarrollo activo, lo que significa que es un proyecto respaldado junto con su documentación concisa y una comunidad activa de usuarios.

    Otra ventaja es que Celery es fácil de integrar en múltiples marcos web, y la mayoría tiene bibliotecas para facilitar la integración.

    También proporciona la funcionalidad para interactuar con otras aplicaciones web a través de webhooks donde no hay una biblioteca para soportar la interacción.

    El apio también puede utilizar una variedad de agentes de mensajes que nos ofrecen flexibilidad. Se recomienda RabbitMQ, pero también es compatible con Redis y Beanstalk.

    Aplicación de demostración

    Crearemos una aplicación Flask que permite a los usuarios configurar recordatorios que se enviarán a sus correos electrónicos a una hora determinada.

    También proporcionaremos la funcionalidad para personalizar la cantidad de tiempo antes de que se invoque el mensaje o recordatorio y se envíe el mensaje al usuario.

    Preparar

    Como cualquier otro proyecto, nuestro trabajo se desarrollará en un entorno virtual que crearemos y gestionaremos utilizando el Pipenv herramienta:

    $ pipenv install --three
    $ pipenv shell
    

    Para este proyecto, necesitaremos instalar los paquetes Flask y Celery para comenzar:

    $ pipenv install flask celery
    

    Así es como se verá la estructura de archivos de nuestra aplicación Flask:

    .
    ├── Pipfile                    # manage our environment
    ├── Pipfile.lock
    ├── README.md
    ├── __init__.py
    ├── app.py                     # main Flask application implementation
    ├── config.py                  # to host the configuration
    ├── requirements.txt           # store our requirements
    └── templates
        └── index.html             # the landing page
    
    1 directory, 8 files
    

    Para nuestro proyecto basado en apio, usaremos Redis como el agente de mensajes y podemos encontrar las instrucciones para configurarlo. en su página de inicio.

    Implementación

    Comencemos por crear la aplicación Flask que generará un formulario que permite a los usuarios ingresar los detalles del mensaje que se enviará en el futuro.

    Agregaremos lo siguiente a nuestro app.py archivo:

    from flask import Flask, flash, render_template, request, redirect, url_for
    
    app = Flask(__name__)
    app.config.from_object("config")
    app.secret_key = app.config['SECRET_KEY']
    
    @app.route("https://Pharos.sh.com/", methods=['GET', 'POST'])
    def index():
        if request.method == 'GET':
            return render_template('index.html')
    
        elif request.method == 'POST':
            email = request.form['email']
            first_name = request.form['first_name']
            last_name = request.form['last_name']
            message = request.form['message']
            duration = request.form['duration']
            duration_unit = request.form['duration_unit']
    
            flash(“Message scheduled”)
            return redirect(url_for('index'))
    
    
    if __name__ == '__main__':
        app.run(debug=True)
    

    Esta es una aplicación realmente simple con una sola ruta para manejar un GET y POST solicitud del formulario. Una vez que se envían los detalles, podemos entregar los datos a una función que programará el trabajo.

    Para ordenar nuestro archivo de aplicación principal, colocaremos las variables de configuración en un config.py archivo y cargue la configuración desde el archivo:

    app.config.from_object("config")
    

    Nuestra config.py El archivo estará en la misma carpeta que el app.py archivo y contiene algunas configuraciones básicas:

    SECRET_KEY = 'very_very_secure_and_secret'
    # more config
    

    Por ahora, implementemos la página de destino como index.html:

    {% for message in get_flashed_messages() %}
      <p style="color: red;">{{ message }}</p>
    {% endfor %}
    
    <form method="POST">
        First Name: <input id="first_name" name="first_name" type="text">
        Last Name: <input id="last_name" name="last_name" type="text">
        Email: <input id="email" name="email" type="email">
        Message: <textarea id="textarea" name="message"></textarea>
        Duration: <input id="duration" name="duration" placeholder="Enter duration as a number. for example: 3" type="text">
    
       <select name="duration_unit">
          <option value="" disabled selected>Choose the duration</option>
          <option value="1">Minutes</option>
          <option value="2">Hours</option>
          <option value="3">Days</option>
       </select>
    
       <button type="submit" name="action">Submit </button>
    </form>
    

    El estilo y el formato se han truncado por brevedad, siéntase libre de formatear / diseñar su HTML como desee.

    Ahora podemos iniciar nuestra aplicación:

    Envío de correos electrónicos con Flask-Mail

    Para enviar correos electrónicos desde nuestra aplicación Flask, usaremos el Frasco-correo biblioteca, que agregamos a nuestro proyecto de la siguiente manera:

    $ pipenv install flask-mail
    

    Con nuestra aplicación Flask y el formulario en su lugar, ahora podemos integrar Flask-Mail en nuestro app.py:

    from flask_mail import Mail, Message
    
    app = Flask(__name__)
    app.config.from_object("config")
    app.secret_key = app.config['SECRET_KEY']
    
    # set up Flask-Mail Integration
    mail = Mail(app)
    
    def send_mail(data):
        """ Function to send emails.
        """
        with app.app_context():
            msg = Message("Ping!",
                        sender="admin.ping",
                        recipients=[data['email']])
            msg.body = data['message']
            mail.send(msg)
    

    La función send_main(data) recibirá el mensaje a enviar y el destinatario del correo electrónico y luego se invocará una vez transcurrido el tiempo especificado para enviar el correo electrónico al usuario.

    También necesitaremos agregar las siguientes variables a nuestro config.py para que Flask-Mail funcione:

    # Flask-Mail
    MAIL_SERVER = 'smtp.googlemail.com'
    MAIL_PORT = 587
    MAIL_USE_TLS = True
    MAIL_USERNAME = 'mail-username'
    MAIL_PASSWORD = 'mail-password'
    

    Integración de apio

    Con nuestra aplicación Flask lista y equipada con la funcionalidad de envío de correo electrónico, ahora podemos integrar Celery para programar el envío de correos electrónicos en una fecha posterior.

    Nuestra app.py será modificado nuevamente:

    # Existing imports are maintained
    from celery import Celery
    
    # Flask app and flask-mail configuration truncated
    
    # Set up celery client
    client = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
    client.conf.update(app.config)
    
    # Add this decorator to our send_mail function
    @client.task
    def send_mail(data):
        # Function remains the same
    
    @app.route("https://Pharos.sh.com/", methods=['GET', 'POST'])
    def index():
        if request.method == 'GET':
            return render_template('index.html')
    
        elif request.method == 'POST':
            data = {}
            data['email'] = request.form['email']
            data['first_name'] = request.form['first_name']
            data['last_name'] = request.form['last_name']
            data['message'] = request.form['message']
            duration = int(request.form['duration'])
            duration_unit = request.form['duration_unit']
    
            if duration_unit == 'minutes':
                duration *= 60
            elif duration_unit == 'hours':
                duration *= 3600
            elif duration_unit == 'days':
                duration *= 86400
    
            send_mail.apply_async(args=[data], countdown=duration)
            flash(f"Email will be sent to {data['email']} in {request.form['duration']} {duration_unit}")
    
            return redirect(url_for('index'))
    

    Importamos celery y utilícelo para inicializar el cliente Celery en nuestra aplicación Flask adjuntando la URL del corredor de mensajería. En nuestro caso, usaremos Redis como corredor, por lo que agregamos lo siguiente a nuestro config.py:

    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    

    Para tener nuestro send_mail() función ejecutada como una tarea en segundo plano, agregaremos la @client.task decorador para que nuestro cliente Apio sea consciente de ello.

    Después de configurar el cliente Celery, se modifica la función principal que también maneja la entrada de formularios.

    Primero, empaquetamos los datos de entrada para el send_mail() función en un diccionario. Luego, invocamos nuestra función de correo a través de la API de llamada de tareas de apio usando la función apply_async, que toma los argumentos requeridos por nuestra función.

    Un opcional countdown se establece el parámetro, que define un retraso entre la ejecución del código y la realización de la tarea.

    Esta duración es en segundos, razón por la cual convertimos la duración pasada por el usuario en segundos en función de la unidad de tiempo que elija.

    Una vez que el usuario haya enviado el formulario, reconoceremos la recepción y le notificaremos a través de un mensaje de banner cuando se enviará el mensaje.

    Uniendo todo

    Para ejecutar nuestro proyecto, necesitaremos dos terminales, uno para iniciar nuestra aplicación Flask y el otro para iniciar el trabajador de Celery que enviará mensajes en segundo plano.

    Inicie la aplicación Flask en la primera terminal:

    $ python app.py
    

    En la segunda terminal, inicie el entorno virtual y luego inicie el trabajador de Apio:

    # start the virtualenv
    $ pipenv shell
    $ celery worker -A app.client --loglevel=info
    

    Si todo va bien, obtendremos los siguientes comentarios en la terminal que ejecuta el cliente de Celery:

    Ahora naveguemos hacia http://localhost:5000 y complete los detalles para programar el correo electrónico para que llegue después de 2 minutos de envío.

    Sobre el formulario, aparecerá un mensaje indicando la dirección que recibirá el correo electrónico y el tiempo después del cual se enviará el correo electrónico. En nuestra terminal de Apio, también podremos ver una entrada de registro que significa que nuestro correo electrónico ha sido programado:

    [2019-10-23 16:27:25,399: INFO/MainProcess] Received task: app.send_mail[d65025c8-a291-40d0-aea2-e816cb40cd78]  ETA:[2019-10-23 13:29:25.170622+00:00]
    

    los ETA sección de la entrada muestra cuando nuestro send_email() se llamará a la función y, por tanto, cuándo se enviará el correo electrónico.

    Hasta aquí todo bien. Nuestros correos electrónicos se programan y se envían en el tiempo especificado, sin embargo, falta una cosa. No tenemos visibilidad de las tareas antes o después de que se ejecuten y no tenemos forma de saber si el correo electrónico se envió realmente o no.

    Por esta razón, implementemos una solución de monitoreo para nuestras tareas en segundo plano para que podamos ver las tareas y también ser conscientes en caso de que algo salga mal y las tareas no se ejecuten según lo planeado.

    Monitoreo de nuestro racimo de apio usando flor

    Flor es una herramienta basada en la web que proporcionará visibilidad de nuestra configuración de Apio y proporciona la funcionalidad para ver el progreso de la tarea, el historial, los detalles y las estadísticas, incluidas las tasas de éxito o fracaso. También podemos monitorear a todos los trabajadores de nuestro clúster y las tareas que están manejando actualmente.

    Instalando Flower es tan fácil como:

    $ pipenv install flower
    

    Anteriormente, especificamos los detalles de nuestro cliente de Apio en nuestro app.py archivo. Necesitaremos pasar ese cliente a Flower para poder monitorearlo.

    Para lograr esto, necesitamos abrir una tercera ventana de terminal, saltar a nuestro entorno virtual e iniciar nuestra herramienta de monitoreo:

    $ pipenv shell
    $ flower -A app.client --port=5555
    

    Al iniciar Flower, especificamos el cliente Celery pasándolo por la aplicación (-A) argumento, y también especificando el puerto que se utilizará a través del --port argumento.

    Con nuestro monitoreo en su lugar, permítanos programar otro correo electrónico para que se envíe en el tablero y luego navegue hasta http://localhost:5555, donde somos recibidos por los siguientes:

    En esta página, podemos ver la lista de trabajadores en nuestro grupo de apio, que actualmente solo está compuesto por nuestra máquina.

    Para ver el correo electrónico que acabamos de programar, haga clic en el botón Tareas en la parte superior izquierda del tablero y esto nos llevará a la página donde podemos ver las tareas que se han programado:

    En esta sección, podemos ver que habíamos programado dos correos electrónicos y uno se envió correctamente a la hora programada. Se programó el envío de los correos electrónicos después de 1 minuto y 5 minutos respectivamente con fines de prueba.

    También podemos ver la hora a la que se recibió el texto y cuando se ejecutó desde esta sección.

    En la sección del monitor, hay gráficos que muestran las tasas de éxito y fracaso de las tareas en segundo plano.

    Podemos programar mensajes durante el tiempo que deseemos, pero eso también significa que nuestro trabajador debe estar en línea y funcional en el momento en que se supone que se debe ejecutar la tarea.

    Conclusión

    Hemos configurado con éxito un clúster de apio y lo hemos integrado en nuestra aplicación Flask que permite a los usuarios programar el envío de correos electrónicos después de un cierto tiempo en el futuro.

    La funcionalidad de envío de correo electrónico se ha delegado a una tarea en segundo plano y se ha colocado en una cola donde será seleccionada y ejecutada por un trabajador en nuestro clúster local de Apio.

    El código fuente de este proyecto es, como siempre, disponible en Github.

     

    Etiquetas:

    Deja una respuesta

    Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *