Procesamiento paralelo en Python

    Introducción

    Cuando inicia un programa en su máquina, se ejecuta en su propia “burbuja” que está completamente separada de otros programas que están activos al mismo tiempo. Esta “burbuja” se denomina proceso y comprende todo lo que se necesita para gestionar esta llamada de programa.

    Por ejemplo, este llamado entorno de proceso incluye las páginas de memoria que el proceso tiene en uso, los manejadores de archivo que este proceso ha abierto, tanto los derechos de acceso de usuario como de grupo, y su llamada de línea de comando completa, incluidos los parámetros dados.

    Esta información se guarda en el sistema de archivos de proceso de su sistema UNIX / Linux, que es un sistema de archivos virtual, y se puede acceder a él a través del directorio / proc . Las entradas están ordenadas por el ID de proceso, que es único para cada proceso. El ejemplo 1 muestra esto para un proceso seleccionado arbitrariamente que tiene el ID de proceso # 177.

    Ejemplo 1: información que está disponible para un proceso

    [email protected]:/proc/177# ls
    attr         cpuset   limits      net            projid_map   statm
    autogroup    cwd      loginuid    ns             root         status
    auxv         environ  map_files   numa_maps      sched        syscall
    cgroup       exe      maps        oom_adj        sessionid    task
    clear_refs   fd       mem         oom_score      setgroups    timers
    cmdline      fdinfo   mountinfo   oom_score_adj  smaps        uid_map
    comm         gid_map  mounts      pagemap        stack        wchan
    coredump_filter       io          mountstats     personality  stat
    

    Estructuración de código y datos del programa

    Cuanto más complejo se vuelve un programa, más a menudo resulta útil dividirlo en partes más pequeñas. Esto no se refiere únicamente al código fuente, sino también al código que se ejecuta en su máquina. Una solución para esto es el uso de subprocesos en combinación con la ejecución en paralelo. Los pensamientos detrás de esto son:

    • Un solo proceso cubre un fragmento de código que se puede ejecutar por separado
    • Ciertas secciones de código se pueden ejecutar simultáneamente y permiten la paralelización en principio
    • Usando las características de los procesadores y sistemas operativos modernos, por ejemplo, cada núcleo de un procesador que tenemos disponible para reducir el tiempo total de ejecución de un programa.
    • Para reducir la complejidad de su programa / código y subcontratar piezas de trabajo a agentes especializados que actúan como subprocesos.

    El uso de subprocesos requiere que reconsidere la forma en que se ejecuta su programa, de lineal a paralelo. Es similar a cambiar su perspectiva de trabajo en una empresa de un trabajador común a un gerente: tendrá que vigilar quién está haciendo qué, cuánto tiempo toma un solo paso y cuáles son las dependencias entre los resultados intermedios.

    Esto le ayuda a dividir su código en fragmentos más pequeños que pueden ser ejecutados por un agente especializado solo para esta tarea. Si aún no lo ha hecho, piense en cómo está estructurado su conjunto de datos para que los agentes individuales puedan procesarlo de manera efectiva. Esto lleva a estas preguntas:

    • ¿Por qué quiere paralelizar el código? En tu caso concreto y en cuanto a esfuerzo, ¿tiene sentido pensarlo?
    • ¿Su programa está diseñado para ejecutarse solo una vez o se ejecutará regularmente en un conjunto de datos similar?
    • ¿Puedes dividir tu algoritmo en varios pasos de ejecución?
    • ¿Sus datos permiten la paralelización? Si aún no lo ha hecho, ¿de qué manera debe adaptarse la organización de sus datos?
    • ¿Qué resultados intermedios de su cálculo dependen unos de otros?
    • ¿Qué cambio de hardware se necesita para eso?
    • ¿Existe un cuello de botella en el hardware o en el algoritmo, y cómo se puede evitar o minimizar la influencia de estos factores?
    • ¿Qué otros efectos secundarios de la paralelización pueden ocurrir?

    Un posible caso de uso es un proceso principal y un demonio que se ejecuta en segundo plano (maestro / esclavo) esperando ser activado. Además, este puede ser un proceso principal que inicia los procesos de trabajo que se ejecutan bajo demanda. En la práctica, el proceso principal es un proceso alimentador que controla dos o más agentes que reciben partes de los datos y realizan cálculos sobre la parte dada.

    Tenga en cuenta que la paralelización es costosa y requiere mucho tiempo debido a la sobrecarga de los subprocesos que necesita su sistema operativo. En comparación con la ejecución de dos o más tareas de forma lineal, hacer esto en paralelo puede ahorrar entre un 25 y un 30 por ciento de tiempo por subproceso, según su caso de uso. Por ejemplo, dos tareas que consumen 5 segundos cada una necesitan 10 segundos en total si se ejecutan en serie, y pueden necesitar alrededor de 8 segundos en promedio en una máquina de múltiples núcleos cuando están en paralelo. 3 de esos 8 segundos se pueden perder por sobrecarga, lo que limita las mejoras de velocidad.

    Ejecutar una función en paralelo con Python

    Python ofrece cuatro formas posibles de manejar eso. Primero, puede ejecutar funciones en paralelo utilizando el módulo de multiprocesamiento . En segundo lugar, una alternativa a los procesos son los hilos. Técnicamente, estos son procesos ligeros y están fuera del alcance de este artículo. Para leer más, puede echar un vistazo al módulo de subprocesos de Python . En tercer lugar, puede llamar a programas externos utilizando el system()método del osmódulo, o los métodos proporcionados por el subprocessmódulo, y recopilar los resultados posteriormente.

    El multiprocessingmódulo cubre una buena selección de métodos para manejar la ejecución paralela de rutinas. Esto incluye procesos, grupos de agentes, colas y canalizaciones.

    El Listado 1 funciona con un grupo de cinco agentes que procesan una parte de tres valores al mismo tiempo. Los valores para el número de agentes y para el chunksizese eligen arbitrariamente con fines de demostración. Ajuste estos valores de acuerdo con la cantidad de núcleos de su procesador.

    El método Pool.map()requiere tres parámetros: una función para ser llamada en cada elemento del conjunto de datos, el conjunto de datos en sí y el chunksize. En el Listado 1 usamos una función que se nombra squarey calcula el cuadrado del valor entero dado. Además, chunksizese puede omitir. Si no se establece explícitamente, el valor predeterminado chunksizees 1.

    Tenga en cuenta que el orden de ejecución de los agentes no está garantizado, pero el conjunto de resultados está en el orden correcto. Contiene los valores cuadrados según el orden de los elementos del conjunto de datos original.

    Listado 1: Ejecución de funciones en paralelo

    from multiprocessing import Pool
    
    def square(x):
        # calculate the square of the value of x
        return x*x
    
    if __name__ == '__main__':
    
        # Define the dataset
        dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
    
        # Output the dataset
        print ('Dataset: ' + str(dataset))
    
        # Run this with a pool of 5 agents having a chunksize of 3 until finished
        agents = 5
        chunksize = 3
        with Pool(processes=agents) as pool:
            result = pool.map(square, dataset, chunksize)
    
        # Output the result
        print ('Result:  ' + str(result))
    

    La ejecución de este código debería producir el siguiente resultado:

    $ python3 pool_multiprocessing.py 
    Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
    Result:  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]
    

    Nota : Usaremos Python 3 para estos ejemplos.

    Ejecución de varias funciones mediante una cola

    Como estructura de datos, una cola es muy común y existe de varias maneras. Está organizado como primero en entrar, primero en salir (FIFO) o último en entrar, primero en salir (LIFO) / pila , así como con y sin prioridades (cola de prioridad). La estructura de datos se implementa como una matriz con un número fijo de entradas o como una lista que contiene un número variable de elementos individuales.

    En Listings 2.1-2.7 usamos una cola FIFO. Se implementa como una lista que ya es proporcionada por la clase correspondiente del multiprocessingmódulo. Además, el timemódulo se carga y se utiliza para imitar la carga de trabajo.

    Listado 2.1: Módulos que se utilizarán

    import multiprocessing
    from time import sleep
    

    A continuación, se define una función de trabajador (Listado 2.2). Esta función representa al agente, en realidad, y requiere tres argumentos. El nombre del proceso indica qué proceso es, y tanto el taskscomo hacen resultsreferencia a la cola correspondiente.

    Dentro de la función de trabajador hay un whilebucle infinito . Ambos tasksy resultsson colas que se definen en el programa principal. tasks.get()devuelve la tarea actual de la cola de tareas para ser procesada. Un valor de tarea menor que 0 sale del whileciclo y devuelve un valor de -1. Cualquier otro valor de tarea realizará un cálculo (cuadrado) y devolverá este valor. La devolución de un valor al programa principal se implementa como results.put(). Esto agrega el valor calculado al final de la resultscola.

    Listado 2.2: La función de trabajador

    # define worker function
    def calculate(process_name, tasks, results):
        print('[%s] evaluation routine starts' % process_name)
    
        while True:
            new_value = tasks.get()
            if new_value < 0:
                print('[%s] evaluation routine quits' % process_name)
    
                # Indicate finished
                results.put(-1)
                break
            else:
                # Compute result and mimic a long-running task
                compute = new_value * new_value
                sleep(0.02*new_value)
    
                # Output which process received the value
                # and the calculation result
                print('[%s] received value: %i' % (process_name, new_value))
                print('[%s] calculated value: %i' % (process_name, compute))
    
                # Add result to the queue
                results.put(compute)
    
        return
    

    El siguiente paso es el ciclo principal (vea el Listado 2.3). Primero, se define un administrador para la comunicación entre procesos (IPC). A continuación, se agregan dos colas: una que mantiene las tareas y la otra para los resultados.

    Listado 2.3: IPC y colas

    if __name__ == "__main__":
        # Define IPC manager
        manager = multiprocessing.Manager()
    
        # Define a list (queue) for tasks and computation results
        tasks = manager.Queue()
        results = manager.Queue()
    

    Una vez realizada esta configuración, definimos un grupo de procesos con cuatro procesos de trabajo (agentes). Hacemos uso de la clase multiprocessing.Pool()y creamos una instancia de ella. A continuación, definimos una lista vacía de procesos (consulte el Listado 2.4).

    Listado 2.4: Definición de un grupo de procesos

    # Create process pool with four processes
    num_processes = 4
    pool = multiprocessing.Pool(processes=num_processes)
    processes = []
    

    Como paso siguiente iniciamos los cuatro procesos de trabajo (agentes). Por simplicidad, se denominan “P0” a “P3”. La creación de los cuatro procesos de trabajo se realiza mediante multiprocessing.Process(). Esto conecta a cada uno de ellos con la función de trabajador, así como con la tarea y la cola de resultados. Finalmente, agregamos el proceso recién inicializado al final de la lista de procesos e iniciamos el nuevo proceso usando new_process.start()(ver Listado 2.5).

    Listado 2.5: Prepare los procesos de trabajo

    # Initiate the worker processes
    for i in range(num_processes):
    
        # Set process name
        process_name="P%i" % i
    
        # Create the process, and connect it to the worker function
        new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))
    
        # Add new process to the list of processes
        processes.append(new_process)
    
        # Start the process
        new_process.start()
    

    Nuestros procesos laborales están esperando trabajo. Definimos una lista de tareas, que en nuestro caso son enteros seleccionados arbitrariamente. Estos valores se agregan a la lista de tareas mediante tasks.put(). Cada proceso de trabajo espera las tareas y elige la siguiente tarea disponible de la lista de tareas. Esto es manejado por la cola misma (vea el Listado 2.6).

    Listado 2.6: Prepare la cola de tareas

    # Fill task queue
    task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
    for single_task in task_list:
        tasks.put(single_task)
    
    # Wait while the workers process
    sleep(5)
    

    Después de un tiempo nos gustaría que nuestros agentes terminaran. Cada proceso de trabajo reacciona en una tarea con el valor -1. Interpreta este valor como una señal de terminación y luego muere. Es por eso que colocamos tantos -1 en la cola de tareas como procesos tengamos en ejecución. Antes de morir, un proceso que finaliza coloca un -1 en la cola de resultados. Esto está destinado a ser una señal de confirmación al bucle principal de que el agente está terminando.

    En el ciclo principal leemos de esa cola y contamos el número de -1. El ciclo principal se cierra tan pronto como hemos contado tantas confirmaciones de terminación como procesos tenemos. De lo contrario, sacamos el resultado del cálculo de la cola.

    Listado 2.7: Terminación y salida de resultados

    # Quit the worker processes by sending them -1
    for i in range(num_processes):
        tasks.put(-1)
    
    # Read calculation results
    num_finished_processes = 0
    while True:
        # Read result
        new_result = results.get()
    
        # Have a look at the results
        if new_result == -1:
            # Process has finished
            num_finished_processes += 1
    
            if num_finished_processes == num_processes:
                break
        else:
            # Output result
            print('Result:' + str(new_result))
    

    El ejemplo 2 muestra la salida del programa Python. Al ejecutar el programa más de una vez, puede notar que el orden en el que se inician los procesos de trabajo es tan impredecible como el proceso en sí que selecciona una tarea de la cola. Sin embargo, una vez terminado, el orden de los elementos de la cola de resultados coincide con el orden de los elementos de la cola de tareas.

    Ejemplo 2

    $ python3 queue_multiprocessing.py 
    [P0] evaluation routine starts
    [P1] evaluation routine starts
    [P2] evaluation routine starts
    [P3] evaluation routine starts
    [P1] received value: 1
    [P1] calculated value: 1
    [P0] received value: 43
    [P0] calculated value: 1849
    [P0] received value: 68
    [P0] calculated value: 4624
    [P1] received value: 142
    [P1] calculated value: 20164
    result: 1
    result: 1849
    result: 4624
    result: 20164
    [P3] received value: 256
    [P3] calculated value: 65536
    result: 65536
    [P0] received value: 183
    [P0] calculated value: 33489
    result: 33489
    [P0] received value: 3
    [P0] calculated value: 9
    result: 9
    [P0] evaluation routine quits
    [P1] received value: 334
    [P1] calculated value: 111556
    result: 111556
    [P1] evaluation routine quits
    [P3] received value: 325
    [P3] calculated value: 105625
    result: 105625
    [P3] evaluation routine quits
    [P2] received value: 780
    [P2] calculated value: 608400
    result: 608400
    [P2] evaluation routine quits
    

    Nota : Como se mencionó anteriormente, es posible que su salida no coincida exactamente con la que se muestra arriba, ya que el orden de ejecución es impredecible.

    Usando el método os.system ()

    El system()método es parte del módulo os , que permite ejecutar programas de línea de comandos externos en un proceso separado de su programa Python. El system()método es una llamada de bloqueo y debe esperar hasta que finalice la llamada y regrese. Como fetichista de UNIX / Linux, sabe que se puede ejecutar un comando en segundo plano y escribir el resultado calculado en el flujo de salida que se redirige a un archivo como este (consulte el Ejemplo 3):

    Ejemplo 3: comando con redirección de salida

    $ ./program >> outputfile &
    

    En un programa de Python, simplemente encapsula esta llamada como se muestra a continuación:

    Listado 3: Llamada simple al sistema usando el módulo os

    import os
    
    os.system("./program >> outputfile &")
    

    Esta llamada al sistema crea un proceso que se ejecuta en paralelo a su programa Python actual. Obtener el resultado puede volverse un poco complicado porque esta llamada puede terminar después del final de su programa Python, nunca se sabe.

    Usar este método es mucho más caro que los métodos anteriores que describí. Primero, la sobrecarga es mucho mayor (cambio de proceso), y segundo, escribe datos en la memoria física, como un disco, lo que lleva más tiempo. Sin embargo, esta es una mejor opción si tiene memoria limitada (como con RAM) y en su lugar puede tener datos de salida masivos escritos en un disco de estado sólido.

    Usando el módulo de subproceso

    Este módulo está destinado a reemplazar os.system()y os.spawn()llamadas. La idea del subproceso es simplificar los procesos de generación, comunicarse con ellos a través de conductos y señales y recopilar la salida que producen, incluidos los mensajes de error.

    A partir de Python 3.5, el subproceso contiene el método subprocess.run()para iniciar un comando externo, que es un contenedor para la subprocess.Popen()clase subyacente . Como ejemplo, lanzamos el comando UNIX / Linux df -hpara averiguar cuánto espacio en disco queda todavía disponible en la /homepartición de su máquina. En un programa Python, realiza esta llamada como se muestra a continuación (Listado 4).

    Listado 4: Ejemplo básico para ejecutar un comando externo

    import subprocess
    
    ret = subprocess.run(["df", "-h", "/home"])
    print(ret)
    

    Esta es la llamada básica y muy similar al comando que df -h /homese ejecuta en un terminal. Tenga en cuenta que los parámetros están separados como una lista en lugar de una sola cadena. La salida será similar al Ejemplo 4. En comparación con la documentación oficial de Python para este módulo, genera el resultado de la llamada a stdout, además del valor de retorno de la llamada.

    El ejemplo 4 muestra el resultado de nuestra llamada. La última línea de la salida muestra la ejecución exitosa del comando. La llamada subprocess.run()devuelve una instancia de la clase CompletedProcessque tiene los dos atributos nombrados args(argumentos de línea de comando) y returncode(valor de retorno del comando).

    Ejemplo 4: Ejecución de la secuencia de comandos Python del Listado 4

    $ python3 diskfree.py
    Filesystem   Size   Used  Avail Capacity  iused   ifree %iused  Mounted on
    /dev/sda3  233Gi  203Gi   30Gi    88% 53160407 7818407   87%   /home
    CompletedProcess(args=['df', '-h', '/home'], returncode=0)
    

    Para suprimir la salida stdouty capturar tanto la salida como el valor de retorno para una evaluación adicional, la llamada de subprocess.run()debe modificarse ligeramente. Sin más modificaciones, subprocess.run()envía la salida del comando ejecutado al stdoutque es el canal de salida del proceso subyacente de Python. Para tomar la salida, tenemos que cambiar esto y establecer el canal de salida en el valor predefinido subprocess.PIPE. El Listado 5 muestra cómo hacer eso.

    Listado 5: Tomando la salida en una tubería

    import subprocess
    
    # Call the command
    output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)
    
    # Read the return code and the output data
    print ("Return code: %i" % output.returncode)
    print ("Output data: %s" % output.stdout)
    

    Como se explicó antes, subprocess.run()devuelve una instancia de la clase CompletedProcess. En el Listado 5, esta instancia es una variable simplemente nombrada output. El código de retorno del comando se mantiene en el atributo output.returncodey la salida impresa stdoutse puede encontrar en el atributo output.stdout. Tenga en cuenta que esto no cubre el manejo de mensajes de error porque no cambiamos el canal de salida para eso.

    Conclusión

    El procesamiento en paralelo es una gran oportunidad para utilizar el poder del hardware actual. Python le da acceso a estos métodos a un nivel muy sofisticado. Como se ha visto antes, tanto el multiprocessingy la subprocesslet módulo es sumergirse en ese tema fácilmente.

    Etiquetas:

    Deja una respuesta

    Tu dirección de correo electrónico no será publicada. Los campos obligatorios están marcados con *