Procesamiento paralelo en Python

    Introducci贸n

    Cuando inicia un programa en su m谩quina, se ejecuta en su propia “burbuja” que est谩 completamente separada de otros programas que est谩n activos al mismo tiempo. Esta “burbuja” se denomina proceso y comprende todo lo que se necesita para gestionar esta llamada de programa.

    Por ejemplo, este llamado entorno de proceso incluye las p谩ginas de memoria que el proceso tiene en uso, los manejadores de archivo que este proceso ha abierto, tanto los derechos de acceso de usuario como de grupo, y su llamada de l铆nea de comando completa, incluidos los par谩metros dados.

    Esta informaci贸n se guarda en el sistema de archivos de proceso de su sistema UNIX / Linux, que es un sistema de archivos virtual, y se puede acceder a 茅l a trav茅s del directorio / proc . Las entradas est谩n ordenadas por el ID de proceso, que es 煤nico para cada proceso. El ejemplo 1 muestra esto para un proceso seleccionado arbitrariamente que tiene el ID de proceso # 177.

    Ejemplo 1: informaci贸n que est谩 disponible para un proceso

    [email聽protected]:/proc/177# ls
    attr         cpuset   limits      net            projid_map   statm
    autogroup    cwd      loginuid    ns             root         status
    auxv         environ  map_files   numa_maps      sched        syscall
    cgroup       exe      maps        oom_adj        sessionid    task
    clear_refs   fd       mem         oom_score      setgroups    timers
    cmdline      fdinfo   mountinfo   oom_score_adj  smaps        uid_map
    comm         gid_map  mounts      pagemap        stack        wchan
    coredump_filter       io          mountstats     personality  stat
    

    Estructuraci贸n de c贸digo y datos del programa

    Cuanto m谩s complejo se vuelve un programa, m谩s a menudo resulta 煤til dividirlo en partes m谩s peque帽as. Esto no se refiere 煤nicamente al c贸digo fuente, sino tambi茅n al c贸digo que se ejecuta en su m谩quina. Una soluci贸n para esto es el uso de subprocesos en combinaci贸n con la ejecuci贸n en paralelo. Los pensamientos detr谩s de esto son:

    • Un solo proceso cubre un fragmento de c贸digo que se puede ejecutar por separado
    • Ciertas secciones de c贸digo se pueden ejecutar simult谩neamente y permiten la paralelizaci贸n en principio
    • Usando las caracter铆sticas de los procesadores y sistemas operativos modernos, por ejemplo, cada n煤cleo de un procesador que tenemos disponible para reducir el tiempo total de ejecuci贸n de un programa.
    • Para reducir la complejidad de su programa / c贸digo y subcontratar piezas de trabajo a agentes especializados que act煤an como subprocesos.

    El uso de subprocesos requiere que reconsidere la forma en que se ejecuta su programa, de lineal a paralelo. Es similar a cambiar su perspectiva de trabajo en una empresa de un trabajador com煤n a un gerente: tendr谩 que vigilar qui茅n est谩 haciendo qu茅, cu谩nto tiempo toma un solo paso y cu谩les son las dependencias entre los resultados intermedios.

    Esto le ayuda a dividir su c贸digo en fragmentos m谩s peque帽os que pueden ser ejecutados por un agente especializado solo para esta tarea. Si a煤n no lo ha hecho, piense en c贸mo est谩 estructurado su conjunto de datos para que los agentes individuales puedan procesarlo de manera efectiva. Esto lleva a estas preguntas:

    • 驴Por qu茅 quiere paralelizar el c贸digo? En tu caso concreto y en cuanto a esfuerzo, 驴tiene sentido pensarlo?
    • 驴Su programa est谩 dise帽ado para ejecutarse solo una vez o se ejecutar谩 regularmente en un conjunto de datos similar?
    • 驴Puedes dividir tu algoritmo en varios pasos de ejecuci贸n?
    • 驴Sus datos permiten la paralelizaci贸n? Si a煤n no lo ha hecho, 驴de qu茅 manera debe adaptarse la organizaci贸n de sus datos?
    • 驴Qu茅 resultados intermedios de su c谩lculo dependen unos de otros?
    • 驴Qu茅 cambio de hardware se necesita para eso?
    • 驴Existe un cuello de botella en el hardware o en el algoritmo, y c贸mo se puede evitar o minimizar la influencia de estos factores?
    • 驴Qu茅 otros efectos secundarios de la paralelizaci贸n pueden ocurrir?

    Un posible caso de uso es un proceso principal y un demonio que se ejecuta en segundo plano (maestro / esclavo) esperando ser activado. Adem谩s, este puede ser un proceso principal que inicia los procesos de trabajo que se ejecutan bajo demanda. En la pr谩ctica, el proceso principal es un proceso alimentador que controla dos o m谩s agentes que reciben partes de los datos y realizan c谩lculos sobre la parte dada.

    Tenga en cuenta que la paralelizaci贸n es costosa y requiere mucho tiempo debido a la sobrecarga de los subprocesos que necesita su sistema operativo. En comparaci贸n con la ejecuci贸n de dos o m谩s tareas de forma lineal, hacer esto en paralelo puede ahorrar entre un 25 y un 30 por ciento de tiempo por subproceso, seg煤n su caso de uso. Por ejemplo, dos tareas que consumen 5 segundos cada una necesitan 10 segundos en total si se ejecutan en serie, y pueden necesitar alrededor de 8 segundos en promedio en una m谩quina de m煤ltiples n煤cleos cuando est谩n en paralelo. 3 de esos 8 segundos se pueden perder por sobrecarga, lo que limita las mejoras de velocidad.

    Ejecutar una funci贸n en paralelo con Python

    Python ofrece cuatro formas posibles de manejar eso. Primero, puede ejecutar funciones en paralelo utilizando el m贸dulo de multiprocesamiento . En segundo lugar, una alternativa a los procesos son los hilos. T茅cnicamente, estos son procesos ligeros y est谩n fuera del alcance de este art铆culo. Para leer m谩s, puede echar un vistazo al m贸dulo de subprocesos de Python . En tercer lugar, puede llamar a programas externos utilizando el system()m茅todo del osm贸dulo, o los m茅todos proporcionados por el subprocessm贸dulo, y recopilar los resultados posteriormente.

    El multiprocessingm贸dulo cubre una buena selecci贸n de m茅todos para manejar la ejecuci贸n paralela de rutinas. Esto incluye procesos, grupos de agentes, colas y canalizaciones.

    El Listado 1 funciona con un grupo de cinco agentes que procesan una parte de tres valores al mismo tiempo. Los valores para el n煤mero de agentes y para el chunksizese eligen arbitrariamente con fines de demostraci贸n. Ajuste estos valores de acuerdo con la cantidad de n煤cleos de su procesador.

    El m茅todo Pool.map()requiere tres par谩metros: una funci贸n para ser llamada en cada elemento del conjunto de datos, el conjunto de datos en s铆 y el chunksize. En el Listado 1 usamos una funci贸n que se nombra squarey calcula el cuadrado del valor entero dado. Adem谩s, chunksizese puede omitir. Si no se establece expl铆citamente, el valor predeterminado chunksizees 1.

    Tenga en cuenta que el orden de ejecuci贸n de los agentes no est谩 garantizado, pero el conjunto de resultados est谩 en el orden correcto. Contiene los valores cuadrados seg煤n el orden de los elementos del conjunto de datos original.

    Listado 1: Ejecuci贸n de funciones en paralelo

    from multiprocessing import Pool
    
    def square(x):
        # calculate the square of the value of x
        return x*x
    
    if __name__ == '__main__':
    
        # Define the dataset
        dataset = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
    
        # Output the dataset
        print ('Dataset: ' + str(dataset))
    
        # Run this with a pool of 5 agents having a chunksize of 3 until finished
        agents = 5
        chunksize = 3
        with Pool(processes=agents) as pool:
            result = pool.map(square, dataset, chunksize)
    
        # Output the result
        print ('Result:  ' + str(result))
    

    La ejecuci贸n de este c贸digo deber铆a producir el siguiente resultado:

    $ python3 pool_multiprocessing.py 
    Dataset: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
    Result:  [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196]
    

    Nota : Usaremos Python 3 para estos ejemplos.

    Ejecuci贸n de varias funciones mediante una cola

    Como estructura de datos, una cola es muy com煤n y existe de varias maneras. Est谩 organizado como primero en entrar, primero en salir (FIFO) o 煤ltimo en entrar, primero en salir (LIFO) / pila , as铆 como con y sin prioridades (cola de prioridad). La estructura de datos se implementa como una matriz con un n煤mero fijo de entradas o como una lista que contiene un n煤mero variable de elementos individuales.

    En Listings 2.1-2.7 usamos una cola FIFO. Se implementa como una lista que ya es proporcionada por la clase correspondiente del multiprocessingm贸dulo. Adem谩s, el timem贸dulo se carga y se utiliza para imitar la carga de trabajo.

    Listado 2.1: M贸dulos que se utilizar谩n

    import multiprocessing
    from time import sleep
    

    A continuaci贸n, se define una funci贸n de trabajador (Listado 2.2). Esta funci贸n representa al agente, en realidad, y requiere tres argumentos. El nombre del proceso indica qu茅 proceso es, y tanto el taskscomo hacen resultsreferencia a la cola correspondiente.

    Dentro de la funci贸n de trabajador hay un whilebucle infinito . Ambos tasksy resultsson colas que se definen en el programa principal. tasks.get()devuelve la tarea actual de la cola de tareas para ser procesada. Un valor de tarea menor que 0 sale del whileciclo y devuelve un valor de -1. Cualquier otro valor de tarea realizar谩 un c谩lculo (cuadrado) y devolver谩 este valor. La devoluci贸n de un valor al programa principal se implementa como results.put(). Esto agrega el valor calculado al final de la resultscola.

    Listado 2.2: La funci贸n de trabajador

    # define worker function
    def calculate(process_name, tasks, results):
        print('[%s] evaluation routine starts' % process_name)
    
        while True:
            new_value = tasks.get()
            if new_value < 0:
                print('[%s] evaluation routine quits' % process_name)
    
                # Indicate finished
                results.put(-1)
                break
            else:
                # Compute result and mimic a long-running task
                compute = new_value * new_value
                sleep(0.02*new_value)
    
                # Output which process received the value
                # and the calculation result
                print('[%s] received value: %i' % (process_name, new_value))
                print('[%s] calculated value: %i' % (process_name, compute))
    
                # Add result to the queue
                results.put(compute)
    
        return
    

    El siguiente paso es el ciclo principal (vea el Listado 2.3). Primero, se define un administrador para la comunicaci贸n entre procesos (IPC). A continuaci贸n, se agregan dos colas: una que mantiene las tareas y la otra para los resultados.

    Listado 2.3: IPC y colas

    if __name__ == "__main__":
        # Define IPC manager
        manager = multiprocessing.Manager()
    
        # Define a list (queue) for tasks and computation results
        tasks = manager.Queue()
        results = manager.Queue()
    

    Una vez realizada esta configuraci贸n, definimos un grupo de procesos con cuatro procesos de trabajo (agentes). Hacemos uso de la clase multiprocessing.Pool()y creamos una instancia de ella. A continuaci贸n, definimos una lista vac铆a de procesos (consulte el Listado 2.4).

    Listado 2.4: Definici贸n de un grupo de procesos

    # Create process pool with four processes
    num_processes = 4
    pool = multiprocessing.Pool(processes=num_processes)
    processes = []
    

    Como paso siguiente iniciamos los cuatro procesos de trabajo (agentes). Por simplicidad, se denominan “P0” a “P3”. La creaci贸n de los cuatro procesos de trabajo se realiza mediante multiprocessing.Process(). Esto conecta a cada uno de ellos con la funci贸n de trabajador, as铆 como con la tarea y la cola de resultados. Finalmente, agregamos el proceso reci茅n inicializado al final de la lista de procesos e iniciamos el nuevo proceso usando new_process.start()(ver Listado 2.5).

    Listado 2.5: Prepare los procesos de trabajo

    # Initiate the worker processes
    for i in range(num_processes):
    
        # Set process name
        process_name="P%i" % i
    
        # Create the process, and connect it to the worker function
        new_process = multiprocessing.Process(target=calculate, args=(process_name,tasks,results))
    
        # Add new process to the list of processes
        processes.append(new_process)
    
        # Start the process
        new_process.start()
    

    Nuestros procesos laborales est谩n esperando trabajo. Definimos una lista de tareas, que en nuestro caso son enteros seleccionados arbitrariamente. Estos valores se agregan a la lista de tareas mediante tasks.put(). Cada proceso de trabajo espera las tareas y elige la siguiente tarea disponible de la lista de tareas. Esto es manejado por la cola misma (vea el Listado 2.6).

    Listado 2.6: Prepare la cola de tareas

    # Fill task queue
    task_list = [43, 1, 780, 256, 142, 68, 183, 334, 325, 3]
    for single_task in task_list:
        tasks.put(single_task)
    
    # Wait while the workers process
    sleep(5)
    

    Despu茅s de un tiempo nos gustar铆a que nuestros agentes terminaran. Cada proceso de trabajo reacciona en una tarea con el valor -1. Interpreta este valor como una se帽al de terminaci贸n y luego muere. Es por eso que colocamos tantos -1 en la cola de tareas como procesos tengamos en ejecuci贸n. Antes de morir, un proceso que finaliza coloca un -1 en la cola de resultados. Esto est谩 destinado a ser una se帽al de confirmaci贸n al bucle principal de que el agente est谩 terminando.

    En el ciclo principal leemos de esa cola y contamos el n煤mero de -1. El ciclo principal se cierra tan pronto como hemos contado tantas confirmaciones de terminaci贸n como procesos tenemos. De lo contrario, sacamos el resultado del c谩lculo de la cola.

    Listado 2.7: Terminaci贸n y salida de resultados

    # Quit the worker processes by sending them -1
    for i in range(num_processes):
        tasks.put(-1)
    
    # Read calculation results
    num_finished_processes = 0
    while True:
        # Read result
        new_result = results.get()
    
        # Have a look at the results
        if new_result == -1:
            # Process has finished
            num_finished_processes += 1
    
            if num_finished_processes == num_processes:
                break
        else:
            # Output result
            print('Result:' + str(new_result))
    

    El ejemplo 2 muestra la salida del programa Python. Al ejecutar el programa m谩s de una vez, puede notar que el orden en el que se inician los procesos de trabajo es tan impredecible como el proceso en s铆 que selecciona una tarea de la cola. Sin embargo, una vez terminado, el orden de los elementos de la cola de resultados coincide con el orden de los elementos de la cola de tareas.

    Ejemplo 2

    $ python3 queue_multiprocessing.py 
    [P0] evaluation routine starts
    [P1] evaluation routine starts
    [P2] evaluation routine starts
    [P3] evaluation routine starts
    [P1] received value: 1
    [P1] calculated value: 1
    [P0] received value: 43
    [P0] calculated value: 1849
    [P0] received value: 68
    [P0] calculated value: 4624
    [P1] received value: 142
    [P1] calculated value: 20164
    result: 1
    result: 1849
    result: 4624
    result: 20164
    [P3] received value: 256
    [P3] calculated value: 65536
    result: 65536
    [P0] received value: 183
    [P0] calculated value: 33489
    result: 33489
    [P0] received value: 3
    [P0] calculated value: 9
    result: 9
    [P0] evaluation routine quits
    [P1] received value: 334
    [P1] calculated value: 111556
    result: 111556
    [P1] evaluation routine quits
    [P3] received value: 325
    [P3] calculated value: 105625
    result: 105625
    [P3] evaluation routine quits
    [P2] received value: 780
    [P2] calculated value: 608400
    result: 608400
    [P2] evaluation routine quits
    

    Nota : Como se mencion贸 anteriormente, es posible que su salida no coincida exactamente con la que se muestra arriba, ya que el orden de ejecuci贸n es impredecible.

    Usando el m茅todo os.system ()

    El system()m茅todo es parte del m贸dulo os , que permite ejecutar programas de l铆nea de comandos externos en un proceso separado de su programa Python. El system()m茅todo es una llamada de bloqueo y debe esperar hasta que finalice la llamada y regrese. Como fetichista de UNIX / Linux, sabe que se puede ejecutar un comando en segundo plano y escribir el resultado calculado en el flujo de salida que se redirige a un archivo como este (consulte el Ejemplo 3):

    Ejemplo 3: comando con redirecci贸n de salida

    $ ./program >> outputfile &
    

    En un programa de Python, simplemente encapsula esta llamada como se muestra a continuaci贸n:

    Listado 3: Llamada simple al sistema usando el m贸dulo os

    import os
    
    os.system("./program >> outputfile &")
    

    Esta llamada al sistema crea un proceso que se ejecuta en paralelo a su programa Python actual. Obtener el resultado puede volverse un poco complicado porque esta llamada puede terminar despu茅s del final de su programa Python, nunca se sabe.

    Usar este m茅todo es mucho m谩s caro que los m茅todos anteriores que describ铆. Primero, la sobrecarga es mucho mayor (cambio de proceso), y segundo, escribe datos en la memoria f铆sica, como un disco, lo que lleva m谩s tiempo. Sin embargo, esta es una mejor opci贸n si tiene memoria limitada (como con RAM) y en su lugar puede tener datos de salida masivos escritos en un disco de estado s贸lido.

    Usando el m贸dulo de subproceso

    Este m贸dulo est谩 destinado a reemplazar os.system()y os.spawn()llamadas. La idea del subproceso es simplificar los procesos de generaci贸n, comunicarse con ellos a trav茅s de conductos y se帽ales y recopilar la salida que producen, incluidos los mensajes de error.

    A partir de Python 3.5, el subproceso contiene el m茅todo subprocess.run()para iniciar un comando externo, que es un contenedor para la subprocess.Popen()clase subyacente . Como ejemplo, lanzamos el comando UNIX / Linux df -hpara averiguar cu谩nto espacio en disco queda todav铆a disponible en la /homepartici贸n de su m谩quina. En un programa Python, realiza esta llamada como se muestra a continuaci贸n (Listado 4).

    Listado 4: Ejemplo b谩sico para ejecutar un comando externo

    import subprocess
    
    ret = subprocess.run(["df", "-h", "/home"])
    print(ret)
    

    Esta es la llamada b谩sica y muy similar al comando que df -h /homese ejecuta en un terminal. Tenga en cuenta que los par谩metros est谩n separados como una lista en lugar de una sola cadena. La salida ser谩 similar al Ejemplo 4. En comparaci贸n con la documentaci贸n oficial de Python para este m贸dulo, genera el resultado de la llamada a stdout, adem谩s del valor de retorno de la llamada.

    El ejemplo 4 muestra el resultado de nuestra llamada. La 煤ltima l铆nea de la salida muestra la ejecuci贸n exitosa del comando. La llamada subprocess.run()devuelve una instancia de la clase CompletedProcessque tiene los dos atributos nombrados args(argumentos de l铆nea de comando) y returncode(valor de retorno del comando).

    Ejemplo 4: Ejecuci贸n de la secuencia de comandos Python del Listado 4

    $ python3 diskfree.py
    Filesystem   Size   Used  Avail Capacity  iused   ifree %iused  Mounted on
    /dev/sda3  233Gi  203Gi   30Gi    88% 53160407 7818407   87%   /home
    CompletedProcess(args=['df', '-h', '/home'], returncode=0)
    

    Para suprimir la salida stdouty capturar tanto la salida como el valor de retorno para una evaluaci贸n adicional, la llamada de subprocess.run()debe modificarse ligeramente. Sin m谩s modificaciones, subprocess.run()env铆a la salida del comando ejecutado al stdoutque es el canal de salida del proceso subyacente de Python. Para tomar la salida, tenemos que cambiar esto y establecer el canal de salida en el valor predefinido subprocess.PIPE. El Listado 5 muestra c贸mo hacer eso.

    Listado 5: Tomando la salida en una tuber铆a

    import subprocess
    
    # Call the command
    output = subprocess.run(["df", "-h", "/home"], stdout=subprocess.PIPE)
    
    # Read the return code and the output data
    print ("Return code: %i" % output.returncode)
    print ("Output data: %s" % output.stdout)
    

    Como se explic贸 antes, subprocess.run()devuelve una instancia de la clase CompletedProcess. En el Listado 5, esta instancia es una variable simplemente nombrada output. El c贸digo de retorno del comando se mantiene en el atributo output.returncodey la salida impresa stdoutse puede encontrar en el atributo output.stdout. Tenga en cuenta que esto no cubre el manejo de mensajes de error porque no cambiamos el canal de salida para eso.

    Conclusi贸n

    El procesamiento en paralelo es una gran oportunidad para utilizar el poder del hardware actual. Python le da acceso a estos m茅todos a un nivel muy sofisticado. Como se ha visto antes, tanto el multiprocessingy la subprocesslet m贸dulo es sumergirse en ese tema f谩cilmente.

    Etiquetas:

    Deja una respuesta

    Tu direcci贸n de correo electr贸nico no ser谩 publicada. Los campos obligatorios est谩n marcados con *