Procesamiento paralelo en Python

P

Introducción

Cuando inicia un programa en su máquina, se ejecuta en su propia “burbuja” que está completamente separada de otros programas que están activos al mismo tiempo. Esta “burbuja” se denomina proceso y comprende todo lo que se necesita para gestionar esta llamada de programa.

Por ejemplo, este llamado entorno de proceso incluye las páginas de memoria que el proceso tiene en uso, los manejadores de archivo que este proceso ha abierto, tanto los derechos de acceso de usuario como de grupo, y su llamada de línea de comando completa, incluidos los parámetros dados.

Esta información se guarda en el sistema de archivos de proceso de su sistema UNIX / Linux, que es un sistema de archivos virtual, y se puede acceder a él a través del directorio / proc . Las entradas están ordenadas por el ID de proceso, que es único para cada proceso. El ejemplo 1 muestra esto para un proceso seleccionado arbitrariamente que tiene el ID de proceso # 177.

Ejemplo 1: información que está disponible para un proceso

[email protected]:/proc/177# ls
attr         cpuset   limits      net            projid_map   statm
autogroup    cwd      loginuid    ns             root         status
auxv         environ  map_files   numa_maps      sched        syscall
cgroup       exe      maps        oom_adj        sessionid    task
clear_refs   fd       mem         oom_score      setgroups    timers
cmdline      fdinfo   mountinfo   oom_score_adj  smaps        uid_map
comm         gid_map  mounts      pagemap        stack        wchan
coredump_filter       io          mountstats     personality  stat

Estructuración de código y datos del programa

Cuanto más complejo se vuelve un programa, más a menudo resulta útil dividirlo en partes más pequeñas. Esto no se refiere únicamente al código fuente, sino también al código que se ejecuta en su máquina. Una solución para esto es el uso de subprocesos en combinación con la ejecución en paralelo. Los pensamientos detrás de esto son:

  • Un solo proceso cubre un fragmento de código que se puede ejecutar por separado
  • Ciertas secciones de código se pueden ejecutar simultáneamente y permiten la paralelización en principio
  • Usando las características de los procesadores y sistemas operativos modernos, por ejemplo, cada núcleo de un procesador que tenemos disponible para reducir el tiempo total de ejecución de un programa.
  • Para reducir la complejidad de su programa / código y subcontratar piezas de trabajo a agentes especializados que actúan como subprocesos.

El uso de subprocesos requiere que reconsidere la forma en que se ejecuta su programa, de lineal a paralelo. Es similar a cambiar su perspectiva de trabajo en una empresa de un trabajador común a un gerente: tendrá que vigilar quién está haciendo qué, cuánto tiempo toma un solo paso y cuáles son las dependencias entre los resultados intermedios.

Esto le ayuda a dividir su código en fragmentos más pequeños que pueden ser ejecutados por un agente especializado solo para esta tarea. Si aún no lo ha hecho, piense en cómo está estructurado su conjunto de datos para que los agentes individuales puedan procesarlo de manera efectiva. Esto lleva a estas preguntas:

  • ¿Por qué quiere paralelizar el código? En tu caso concreto y en cuanto a esfuerzo, ¿tiene sentido pensarlo?
  • ¿Su programa está diseñado para ejecutarse solo una vez o se ejecutará regularmente en un conjunto de datos similar?
  • ¿Puedes dividir tu algoritmo en varios pasos de ejecución?
  • ¿Sus datos permiten la paralelización? Si aún no lo ha hecho, ¿de qué manera debe adaptarse la organización de sus datos?
  • ¿Qué resultados intermedios de su cálculo dependen unos de otros?
  • ¿Qué cambio de hardware se necesita para eso?
  • ¿Existe un cuello de botella en el hardware o en el algoritmo, y cómo se puede evitar o minimizar la influencia de estos factores?
  • ¿Qué otros efectos secundarios de la paralelización pueden ocurrir?

Un posible caso de uso es un proceso principal y un demonio que se ejecuta en segundo plano (maestro / esclavo) esperando ser activado. Además, este puede ser un proceso principal que inicia los procesos de trabajo que se ejecutan bajo demanda. En la práctica, el proceso principal es un proceso alimentador que controla dos o más agentes que reciben partes de los datos y realizan cálculos sobre la parte dada.

Tenga en cuenta que la paralelización es costosa y requiere mucho tiempo debido a la sobrecarga de los subprocesos que necesita su sistema operativo. En comparación con la ejecución de dos o más tareas de forma lineal, hacer esto en paralelo puede ahorrar entre un 25 y un 30 por ciento de tiempo por subproceso, según su caso de uso. Por ejemplo, dos tareas que consumen 5 segundos cada una necesitan 10 segundos en total si se ejecutan en serie, y pueden necesitar alrededor de 8 segundos en promedio en una máquina de múltiples núcleos cuando están en paralelo. 3 de esos 8 segundos se pueden perder por sobrecarga, lo que limita las mejoras de velocidad.

Ejecutar una función en paralelo con Python

Python ofrece cuatro formas posibles de manejar eso. Primero, puede ejecutar funciones en paralelo utilizando el módulo de multiprocesamiento . En segundo lugar, una alternativa a los procesos son los hilos. Técnicamente, estos son procesos ligeros y están fuera del alcance de este artículo. Para leer más, puede echar un vistazo al módulo de subprocesos de Python . En tercer lugar, puede llamar a programas externos utilizando el system()método del osmódulo, o los métodos proporcionados por el subprocessmódulo, y recopilar los resultados posteriormente.

El multiprocessingmódulo cubre una buena selección de métodos para manejar la ejecución paralela de rutinas. Esto incluye procesos, grupos de agentes, colas y canalizaciones.

El Listado 1 funciona con un grupo de cinco agentes que procesan una parte de tres valores al mismo tiempo. Los valores para el número de agentes y para el chunksizese eligen arbitrariamente con fines de demostración. Ajuste estos valores de acuerdo con la cantidad de núcleos de su procesador.

El método Pool.map()requiere tres parámetros: una función para ser llamada en cada elemento del conjunto de datos, el conjunto de datos en sí y el chunksize. En el Listado 1 usamos una función que se nombra squarey calcula el cuadrado del valor entero dado. Además, chunksizese puede omitir. Si no se establece explícitamente, el valor predeterminado chunksizees 1.

Tenga en cuenta que el orden de ejecución de los agentes no está garantizado, pero el conjunto de resultados está en el orden correcto. Contiene los valores cuadrados según el orden de los elementos del conjunto de datos original.

Listado 1: Ejecución de funciones en paralelo

from multiprocessing import Pool

def square(x):
    # calculate the square of the value of x
    return x*x

if __name__ == '__main__':

    # Define the dataset
    dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]

    # Output the dataset
    print ('Dataset: ' + str(dataset))

    # Run this with a pool of 5 agents having a chunksize of 3 until finished
    agents = 5
    chunksize = 3
    with Pool(processes=agents) as pool:
        result = pool.map(square, dataset, chunksize)

    # Output the result
    print ('Result:  ' + str(result))

La ejecución de este código debería producir el siguiente resultado:

$ python3 pool_multiprocessing.py 
Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
Result:  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]

Nota : Usaremos Python 3 para estos ejemplos.

Ejecución de varias funciones mediante una cola

Como estructura de datos, una cola es muy común y existe de varias maneras. Está organizado como primero en entrar, primero en salir (FIFO) o último en entrar, primero en salir (LIFO) / pila , así como con y sin prioridades (cola de prioridad). La estructura de datos se implementa como una matriz con un número fijo de entradas o como una lista que contiene un número variable de elementos individuales.

En Listings 2.1-2.7 usamos una cola FIFO. Se implementa como una lista que ya es proporcionada por la clase correspondiente del multiprocessingmódulo. Además, el timemódulo se carga y se utiliza para imitar la carga de trabajo.

Listado 2.1: Módulos que se utilizarán

import multiprocessing
from time import sleep

A continuación, se define una función de trabajador (Listado 2.2). Esta función representa al agente, en realidad, y requiere tres argumentos. El nombre del proceso indica qué proceso es, y tanto el taskscomo hacen resultsreferencia a la cola correspondiente.

Dentro de la función de trabajador hay un whilebucle infinito . Ambos tasksy resultsson colas que se definen en el programa principal. tasks.get()devuelve la tarea actual de la cola de tareas para ser procesada. Un valor de tarea menor que 0 sale del whileciclo y devuelve un valor de -1. Cualquier otro valor de tarea realizará un cálculo (cuadrado) y devolverá este valor. La devolución de un valor al programa principal se implementa como results.put(). Esto agrega el valor calculado al final de la resultscola.

Listado 2.2: La función de trabajador

# define worker function
def calculate(process_name, tasks, results):
    print('[%s] evaluation routine starts' % process_name)

    while True:
        new_value = tasks.get()
        if new_value < 0:
            print('[%s] evaluation routine quits' % process_name)

            # Indicate finished
            results.put(-1)
            break
        else:
            # Compute result and mimic a long-running task
            compute = new_value * new_value
            sleep(0.02*new_value)

            # Output which process received the value
            # and the calculation result
            print('[%s] received value: %i' % (process_name, new_value))
            print('[%s] calculated value: %i' % (process_name, compute))

            # Add result to the queue
            results.put(compute)

    return

El siguiente paso es el ciclo principal (vea el Listado 2.3). Primero, se define un administrador para la comunicación entre procesos (IPC). A continuación, se agregan dos colas: una que mantiene las tareas y la otra para los resultados.

Listado 2.3: IPC y colas

if __name__ == "__main__":
    # Define IPC manager
    manager = multiprocessing.Manager()

    # Define a list (queue) for tasks and computation results
    tasks = manager.Queue()
    results = manager.Queue()

Una vez realizada esta configuración, definimos un grupo de procesos con cuatro procesos de trabajo (agentes). Hacemos uso de la clase multiprocessing.Pool()y creamos una instancia de ella. A continuación, definimos una lista vacía de procesos (consulte el Listado 2.4).

Listado 2.4: Definición de un grupo de procesos

# Create process pool with four processes
num_processes = 4
pool = multiprocessing.Pool(processes=num_processes)
processes = []

Como paso siguiente iniciamos los cuatro procesos de trabajo (agentes). Por simplicidad, se denominan “P0” a “P3”. La creación de los cuatro procesos de trabajo se realiza mediante multiprocessing.Process(). Esto conecta a cada uno de ellos con la función de trabajador, así como con la tarea y la cola de resultados. Finalmente, agregamos el proceso recién inicializado al final de la lista de procesos e iniciamos el nuevo proceso usando new_process.start()(ver Listado 2.5).

Listado 2.5: Prepare los procesos de trabajo

# Initiate the worker processes
for i in range(num_processes):

    # Set process name
    process_name="P%i" % i

    # Create the process, and connect it to the worker function
    new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))

    # Add new process to the list of processes
    processes.append(new_process)

    # Start the process
    new_process.start()

Nuestros procesos laborales están esperando trabajo. Definimos una lista de tareas, que en nuestro caso son enteros seleccionados arbitrariamente. Estos valores se agregan a la lista de tareas mediante tasks.put(). Cada proceso de trabajo espera las tareas y elige la siguiente tarea disponible de la lista de tareas. Esto es manejado por la cola misma (vea el Listado 2.6).

Listado 2.6: Prepare la cola de tareas

# Fill task queue
task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
for single_task in task_list:
    tasks.put(single_task)

# Wait while the workers process
sleep(5)

Después de un tiempo nos gustaría que nuestros agentes terminaran. Cada proceso de trabajo reacciona en una tarea con el valor -1. Interpreta este valor como una señal de terminación y luego muere. Es por eso que colocamos tantos -1 en la cola de tareas como procesos tengamos en ejecución. Antes de morir, un proceso que finaliza coloca un -1 en la cola de resultados. Esto está destinado a ser una señal de confirmación al bucle principal de que el agente está terminando.

En el ciclo principal leemos de esa cola y contamos el número de -1. El ciclo principal se cierra tan pronto como hemos contado tantas confirmaciones de terminación como procesos tenemos. De lo contrario, sacamos el resultado del cálculo de la cola.

Listado 2.7: Terminación y salida de resultados

# Quit the worker processes by sending them -1
for i in range(num_processes):
    tasks.put(-1)

# Read calculation results
num_finished_processes = 0
while True:
    # Read result
    new_result = results.get()

    # Have a look at the results
    if new_result == -1:
        # Process has finished
        num_finished_processes += 1

        if num_finished_processes == num_processes:
            break
    else:
        # Output result
        print('Result:' + str(new_result))

El ejemplo 2 muestra la salida del programa Python. Al ejecutar el programa más de una vez, puede notar que el orden en el que se inician los procesos de trabajo es tan impredecible como el proceso en sí que selecciona una tarea de la cola. Sin embargo, una vez terminado, el orden de los elementos de la cola de resultados coincide con el orden de los elementos de la cola de tareas.

Ejemplo 2

$ python3 queue_multiprocessing.py 
[P0] evaluation routine starts
[P1] evaluation routine starts
[P2] evaluation routine starts
[P3] evaluation routine starts
[P1] received value: 1
[P1] calculated value: 1
[P0] received value: 43
[P0] calculated value: 1849
[P0] received value: 68
[P0] calculated value: 4624
[P1] received value: 142
[P1] calculated value: 20164
result: 1
result: 1849
result: 4624
result: 20164
[P3] received value: 256
[P3] calculated value: 65536
result: 65536
[P0] received value: 183
[P0] calculated value: 33489
result: 33489
[P0] received value: 3
[P0] calculated value: 9
result: 9
[P0] evaluation routine quits
[P1] received value: 334
[P1] calculated value: 111556
result: 111556
[P1] evaluation routine quits
[P3] received value: 325
[P3] calculated value: 105625
result: 105625
[P3] evaluation routine quits
[P2] received value: 780
[P2] calculated value: 608400
result: 608400
[P2] evaluation routine quits

Nota : Como se mencionó anteriormente, es posible que su salida no coincida exactamente con la que se muestra arriba, ya que el orden de ejecución es impredecible.

Usando el método os.system ()

El system()método es parte del módulo os , que permite ejecutar programas de línea de comandos externos en un proceso separado de su programa Python. El system()método es una llamada de bloqueo y debe esperar hasta que finalice la llamada y regrese. Como fetichista de UNIX / Linux, sabe que se puede ejecutar un comando en segundo plano y escribir el resultado calculado en el flujo de salida que se redirige a un archivo como este (consulte el Ejemplo 3):

Ejemplo 3: comando con redirección de salida

$ ./program >> outputfile &

En un programa de Python, simplemente encapsula esta llamada como se muestra a continuación:

Listado 3: Llamada simple al sistema usando el módulo os

import os

os.system("./program >> outputfile &")

Esta llamada al sistema crea un proceso que se ejecuta en paralelo a su programa Python actual. Obtener el resultado puede volverse un poco complicado porque esta llamada puede terminar después del final de su programa Python, nunca se sabe.

Usar este método es mucho más caro que los métodos anteriores que describí. Primero, la sobrecarga es mucho mayor (cambio de proceso), y segundo, escribe datos en la memoria física, como un disco, lo que lleva más tiempo. Sin embargo, esta es una mejor opción si tiene memoria limitada (como con RAM) y en su lugar puede tener datos de salida masivos escritos en un disco de estado sólido.

Usando el módulo de subproceso

Este módulo está destinado a reemplazar os.system()y os.spawn()llamadas. La idea del subproceso es simplificar los procesos de generación, comunicarse con ellos a través de conductos y señales y recopilar la salida que producen, incluidos los mensajes de error.

A partir de Python 3.5, el subproceso contiene el método subprocess.run()para iniciar un comando externo, que es un contenedor para la subprocess.Popen()clase subyacente . Como ejemplo, lanzamos el comando UNIX / Linux df -hpara averiguar cuánto espacio en disco queda todavía disponible en la /homepartición de su máquina. En un programa Python, realiza esta llamada como se muestra a continuación (Listado 4).

Listado 4: Ejemplo básico para ejecutar un comando externo

import subprocess

ret = subprocess.run(["df", "-h", "/home"])
print(ret)

Esta es la llamada básica y muy similar al comando que df -h /homese ejecuta en un terminal. Tenga en cuenta que los parámetros están separados como una lista en lugar de una sola cadena. La salida será similar al Ejemplo 4. En comparación con la documentación oficial de Python para este módulo, genera el resultado de la llamada a stdout, además del valor de retorno de la llamada.

El ejemplo 4 muestra el resultado de nuestra llamada. La última línea de la salida muestra la ejecución exitosa del comando. La llamada subprocess.run()devuelve una instancia de la clase CompletedProcessque tiene los dos atributos nombrados args(argumentos de línea de comando) y returncode(valor de retorno del comando).

Ejemplo 4: Ejecución de la secuencia de comandos Python del Listado 4

$ python3 diskfree.py
Filesystem   Size   Used  Avail Capacity  iused   ifree %iused  Mounted on
/dev/sda3  233Gi  203Gi   30Gi    88% 53160407 7818407   87%   /home
CompletedProcess(args=['df', '-h', '/home'], returncode=0)

Para suprimir la salida stdouty capturar tanto la salida como el valor de retorno para una evaluación adicional, la llamada de subprocess.run()debe modificarse ligeramente. Sin más modificaciones, subprocess.run()envía la salida del comando ejecutado al stdoutque es el canal de salida del proceso subyacente de Python. Para tomar la salida, tenemos que cambiar esto y establecer el canal de salida en el valor predefinido subprocess.PIPE. El Listado 5 muestra cómo hacer eso.

Listado 5: Tomando la salida en una tubería

import subprocess

# Call the command
output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)

# Read the return code and the output data
print ("Return code: %i" % output.returncode)
print ("Output data: %s" % output.stdout)

Como se explicó antes, subprocess.run()devuelve una instancia de la clase CompletedProcess. En el Listado 5, esta instancia es una variable simplemente nombrada output. El código de retorno del comando se mantiene en el atributo output.returncodey la salida impresa stdoutse puede encontrar en el atributo output.stdout. Tenga en cuenta que esto no cubre el manejo de mensajes de error porque no cambiamos el canal de salida para eso.

Conclusión

El procesamiento en paralelo es una gran oportunidad para utilizar el poder del hardware actual. Python le da acceso a estos métodos a un nivel muy sofisticado. Como se ha visto antes, tanto el multiprocessingy la subprocesslet módulo es sumergirse en ese tema fácilmente.

About the author

Ramiro de la Vega

Bienvenido a Pharos.sh

Soy Ramiro de la Vega, Estadounidense con raíces Españolas. Empecé a programar hace casi 20 años cuando era muy jovencito.

Espero que en mi web encuentres la inspiración y ayuda que necesitas para adentrarte en el fantástico mundo de la programación y conseguir tus objetivos por difíciles que sean.

Add comment

Sobre mi

Últimos Post

Etiquetas

Esta web utiliza cookies propias para su correcto funcionamiento. Al hacer clic en el botón Aceptar, aceptas el uso de estas tecnologías y el procesamiento de tus datos para estos propósitos. Más información
Privacidad