Programação em sistemas de memória distribuída
(
Message-Passing Computing)
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.2
Programação distribuída requer bibliotecas com rotinas para comunicação por mensagens.
São necessários dois mecanismos base:
1. Um método para criar processos para execução em computadores/processadores diferentes
2. Um método para enviar e receber mensagens.
Multiple program, multiple data (MPMD) model
Source fi le
Executable
Compile to suit processor
Source fi le
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.4
Single Program Multiple Data (SPMD) model
.
Source fi le
Ex ecutab les
Processor 0 Processor p - 1 Compile to suit
processor
Basic MPI w a y
Diferentes processos construídos num único programa.
Instruções de controlo selecionam diferentes partes do código a serem executadas por cada processo. Todos os executáveis começam ao mesmo tempo (criação estática).
Multiple Program Multiple Data (MPMD) Model
Process 1
Process 2 spawn();
Time
Star t e x ecution of process 2
Programas separados para cada processador. Um
processador executa o processo master. Os outros processos são criados dinamicamente pelo master.
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.6
Rotinas para comunicação ponto a ponto:
Send and Receive
Process 1 Process 2
send(&x, 2);
recv(&y, 1);
x y
Mo v ement of data
Sintaxe genérica)
Envio de uma mensagem entre processos:
Comunicação por mensagens síncrona
Rotinas que só retornam quando a transferência da mensagem está completa.
Rotina de envio (send) síncrona
• Antes de enviar a mensagem, espera até que a mesma possa ser aceite pelo processo receptor.
Rotina de receção (receive) síncrona
• Espera até que a mensagem esperada chegue.
Rotinas síncronas executam duas ações: transferem dados
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.8
Synchronous send() and recv() using 3-way protocol
Process 1 Process 2
send();
recv();
Suspend Time
process Ac kno wledgment
Message
Both processes contin ue
(a) When send() occurs bef ore recv()
Process 1 Process 2
recv();
send();
Suspend Time
process
Ac kno wledgment Message
Both processes contin ue
(b) When recv() occurs bef ore send()
Request to send
Request to send
Comunicação por mensagens assíncrona
• Rotinas não esperam que a comunicação termine antes de retornarem. É necessário um sistema de
armazenamento de mensagens.
• Mais do que uma versão, dependendo da semântica de retorno.
• Geralmente não sincronizam processos mas permitem aumentar o paralelismo. Devem ser usadas com
cuidado.
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.10
Blocking versus Non-Blocking em MPI
• Blocking – retornam após as operações locais serem completadas, a transferência da mensagem pode não estar completa.
• Non-blocking – retornam imediatamente.
Assume que os dados a transferir não são alterados por outras instruções antes de serem transferidos. Assegurar isso é da responsabilidade do programador.
Os termos blocking / non-blocking podem ter diferentes interpretações, dependendo da implementação.
Rotinas que retornam antes da mensagem ser transferida:
Process 1 Process 2
send();
recv();
Message b uff er
Read
message b uff er Contin ue
process Time
Necessário um Buffer de mensagens entre a origem e o destino:
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.12
Rotinas assíncronas (blocking) que passam a rotinas síncronas
• Quando as operações locais estão completas e a mensagem está armazenada em segurança, o emissor prossegue o seu trabalho.
• O buffer tem tamanho finito, pode acontecer que a rotina de envio bloqueie porque o buffer está cheio.
• Nesse caso, a rotina de send é suspensa até que o buffer esteja disponível, i.e., a rotina comporta-se como se fosse síncrona.
Etiquetas (tags) em mensagens
• Usadas para diferenciar mensagens de diferentes tipos.
• A etiqueta é enviada com a mensagem.
• Se a mensagem pode ser qualquer, poderá ser usado um “wild card” na mensagem.
Assim um receive vai corresponder a
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.14
Exemplo de uma “Message Tag”
Process 1 Process 2
send(&x,2, 5 );
recv(&y,1, 5 );
x y
Mo v ement of data
Espera mensagem do processo 1 com a tag 5
Envio da mensagem, x, com a tag 5 pelo
processo 1 para o processo 2 sendo atribuída a y:
Rotinas de “Grupo”
Existem rotinas que enviam mensagens para um grupo de processos ou que recebem
mensagens de um grupo de processos.
Mais eficiente do que usar rotinas de
comunicação ponto a ponto separadamente.
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.16
Broadcast
Envio de mensagens a todos os processos relacionados com o problema a resolver.
Multicast – enviar a mesma mensagem a um grupo pré- definido de processos.
bcast();
buf
bcast();
data
bcast();
data data
Process 0 Process 1 Process p - 1
Action
Code
MPI f or m
Scatter
scatter();
buf
scatter();
data
scatter();
data data
Process 0 Process 1 Process p - 1
Action
Code
Enviar cada elemento de um array no processo root para outro processo. Conteúdo da posição i do array é enviado para o processo i.
Slides for Parallel Programming Techniques & Applications Using Networked Workstations & Parallel Computers 2nd Edition, by B. Wilkinson & M. Allen, © 2004 Pearson Education Inc. All rights reserved.
2.18
Gather
gather();
buf
gather();
data
gather();
data data
Process 0 Process 1 Process p - 1
Action
Code
MPI f or m
Um processo recolhe valores individuais de um conjunto de processos.
Reduce
reduce();
buf
reduce();
data
reduce();
data data
Process 0 Process 1 Process p - 1
+
Action
Code
Operação de gather combinada com uma operação aritmética ou lógica.
Exemplo: valores são recolhido e adicionados no processo root:
2.20
Modelos híbridos de programação
CPU CPU
memory
CPU CPU threading
CPU CPU
memory
CPU CPU threading
CPU CPU
memory
CPU CPU threading
CPU CPU
memory
CPU CPU threading
network
MPI
MPI MPI
MPI
Modelos híbridos de programação
CPU CPU
memory
CPU CPU threading
CPU CPU
memory
CPU CPU threading
MPI MPI
GPU C U D A
GPU O P E N C L
2.22
Programação distribuída em python módulo multiprocessing
#Criar um processo:
import multiprocessing
def function(i):
print ('called function in process: %s' %i) return
if __name__ == '__main__':
Process_jobs = []
for i in range(5):
p = multiprocessing.Process(target=function, args=(i,)) Process_jobs.append(p)
p.start() # iniciar execução p.join() # esperar que termine
Programação distribuída em python módulo multiprocessing
#output
called function in process: 0 called function in process: 1 called function in process: 2 called function in process: 3 called function in process: 4
2.24
Programação distribuída em python módulo multiprocessing
#Classe Process:
class multiprocessing.Process(group=None, target=None, name
=None, args=(), kwargs={}, *, daemon=None)
Permite criar um fluxo de atividade que executará num processo separado.
group - só existe por compatibilidade com threading.Thread target - ”callable object” invocado pelo método run.
name - nome do processo.
Programação distribuída em python módulo multiprocessing
#Classe Process:
args - tuplo com os argumentos para a invocação do “target”.
invocation.
daemon - True ou False, por omissão, o processo herda o valor do processo pai.
2.26
Programação distribuída em python módulo multiprocessing
#Dar nome a um processo import multiprocessing
import time
def foo():
name = multiprocessing.current_process().name print ("Starting %s \n" %name)
time.sleep(3)
print ("Exiting %s \n" %name)
Programação distribuída em python módulo multiprocessing
#Dar nome a um processo
if __name__ == '__main__':
process_with_name = multiprocessing.Process\
(name='foo_process', target=foo)
process_with_default_name = multiprocessing.Process \ (target=foo)
process_with_name.start()
process_with_default_name.start()
2.28
Programação distribuída em python módulo multiprocessing
#Output
Starting Process-2
Exiting Process-2
Starting foo_process
Exiting foo_process
Programação distribuída em python módulo multiprocessing
#Terminar um processo
import multiprocessing import time
def foo():
print ('Starting function') time.sleep(0.1)
print ('Finished function')
2.30
Programação distribuída em python módulo multiprocessing
if __name__ == '__main__':
p = multiprocessing.Process(target=foo)
print ('Process before execution:', p, p.is_alive())
p.start()
print ('Process running:', p, p.is_alive())
p.terminate()
print ('Process terminated:', p, p.is_alive())
p.join()
print ('Process joined:', p, p.is_alive()) print ('Process exit code:', p.exitcode)
Programação distribuída em python módulo multiprocessing
#output
Process before execution: <Process(Process-1, initial)> False Process running: <Process(Process-1, started)> True
Process terminated: <Process(Process-1, started)> True
Process joined: <Process(Process-1, stopped[SIGTERM])> False Process exit code: -15
2.32
Programação distribuída em python módulo multiprocessing
if __name__ == '__main__':
p = multiprocessing.Process(target=foo)
print ('Process before execution:', p, p.is_alive())
p.start()
print ('Process running:', p, p.is_alive())
time.sleep(5) p.terminate()
print ('Process terminated:', p, p.is_alive())
p.join()
print ('Process joined:', p, p.is_alive()) print ('Process exit code:', p.exitcode)
Programação distribuída em python módulo multiprocessing
#output
Process before execution: <Process(Process-1, initial)> False Process running: <Process(Process-1, started)> True
Starting function Finished function
Process terminated: <Process(Process-1, stopped)> False Process joined: <Process(Process-1, stopped)> False
Process exit code: 0
O que aconteceu de diferente?
2.34
Programação distribuída em python módulo multiprocessing
# Criar um processo como subclasse de Process import multiprocessing
class MyProcess(multiprocessing.Process):
#sobrepor métod run
def run(self):
print ('called run method in %s' %self.name) return
Programação distribuída em python módulo multiprocessing
# Criar um processo como subclasse de Process
if __name__ == '__main__':
jobs = []
for i in range(5):
p = MyProcess() jobs.append(p) p.start()
p.join()
2.36
Programação distribuída em python módulo multiprocessing
# output:
called run method in MyProcess-1 called run method in MyProcess-2 called run method in MyProcess-3 called run method in MyProcess-4 called run method in MyProcess-5
Exemplo: Somar os inteiros de uma lista, sequencialmente, com threads, com processos:
import threading
import multiprocessing import random
import time
SIZE = 10000000
def somaParcial ( lista, p, u ):
soma = 0
for i in range(p, u):
soma += lista[i]
print (soma)
2.38
Exemplo: Somar os inteiros de uma lista, sequencialmente, com threads, com processos:
if __name__ =="__main__":
start= time.time()
lista = [random.randint(1,10) for i in range(SIZE)]
print ("random values %s" % (time.time() - start) )
print ("starting") start= time.time()
somaParcial(lista, 0, SIZE)
print ("Sequential time = %s" %(time.time() - start))
Exemplo: Somar os inteiros de uma lista, sequencialmente, com threads, com processos:
start= time.time()
t1 = threading.Thread (target = somaParcial, args =(lista, 0, \ int(SIZE/2)) ) t2 = threading.Thread (target = somaParcial, args =(lista, \
int(SIZE/2), SIZE)) t1.start()
t2.start() t1.join() t2.join()
print ("Multithreaded time = %s" % (time.time() - start))
2.40
Exemplo: Somar os inteiros de uma lista, sequencialmente, com threads, com processos:
print ("Multithreaded time = %s" % (time.time() - start)) start= time.time()
t1 = multiprocessing.Process (target = somaParcial, args = \ (lista, 0, int(SIZE/2)) ) t2 = multiprocessing.Process (target = somaParcial, args = \
(lista, int(SIZE/2), SIZE)) t1.start()
t2.start() t1.join() t2.join()
print ("Multiprocessing time = %s" % (time.time() - start))
Exercício:
- Testar o programa anterior, analisando os tempos de execução.
- Exercício para férias: Como obter o valor parcial calculado por cada thread e cada processo?
- Quais os tempos de execução, se na versão
multiprocessador, só passarmos para cada Processo a parte da lista que vai ser somada?
2.42
Programação distribuída em python módulo multiprocessing
Modifique o código da aula teórica T03 (pp. 31-34) para
Estudar os tempos de execução das 3 funções analisadas, lançadas em processos independentes. Teste para 1, 2, 4 e 8 processos.
Python multiprocessing
Comunicação de dados entre processos
Queues – permite vários produtores e vários recetores.
Pipes – comunicar entre dois processos
2.44
Python multiprocessing
Comunicação de dados entre processos
#Queue - exemplo
import multiprocessing import random
import time
class producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self) self.queue = queue
Python multiprocessing
Comunicação de dados entre processos
def run(self) :
for i in range(10):
item = random.randint(0, 256) self.queue.put(item)
print ("Process Producer : item %d appended to queue %s“ % (item,self.name)) time.sleep(1)
print ("The size of queue is %s“ % self.queue.qsize())
2.46
Python multiprocessing
Comunicação de dados entre processos
class consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self) self.queue = queue
Python multiprocessing
Comunicação de dados entre processos
def run(self):
while True:
if (self.queue.empty()):
print("the queue is empty") break;
else :
time.sleep(2)
item = self.queue.get()
print ('Process Consumer : item %d popped from by %s \n‘ % (item, self.name))
Método task_done() não existe
2.48
Python multiprocessing
Comunicação de dados entre processos
if __name__ == '__main__':
queue = multiprocessing.Queue()
process_producer = producer(queue) process_consumer = consumer(queue) process_producer.start()
process_consumer.start() process_producer.join() process_consumer.join()
Python multiprocessing
Comunicação de dados entre processos
#Pipe – exemplo
import multiprocessing
class producer(multiprocessing.Process):
def __init__(self, pipe):
multiprocessing.Process.__init__(self) self.pipe = pipe
2.50
Python multiprocessing
Comunicação de dados entre processos
def run(self) :
output_pipe, input_pipe = self.pipe for item in range(10):
output_pipe.send(item) output_pipe.send(99)
output_pipe.close()
#Pipe() devolve um par de “connection objects” ligados por um pipe bidirecional.
Python multiprocessing
Comunicação de dados entre processos
class consumer(multiprocessing.Process):
def __init__(self, pipe):
multiprocessing.Process.__init__(self) self.pipe = pipe
def run(self):
close, input_pipe = self.pipe close.close()
while (True):
item = input_pipe.recv() if (item == 99):
break
input_pipe.close()
2.52
Python multiprocessing
Comunicação de dados entre processos
if __name__ == '__main__':
pipe = multiprocessing.Pipe()
process_producer = producer(pipe) process_consumer = consumer(pipe) process_producer.start()
process_consumer.start() process_producer.join() process_consumer.join()
Python multiprocessing
Sincronização de processos
Lock, Rlock, Semaphore, Condition, Event
Barrier – ponto de sincronização entre um conjunto de processos
Todos os processos têm de atingir um determinado ponto antes de prosseguirem a execução em paralelo
2.54
Python multiprocessing
Sincronização de processos
import multiprocessing
from multiprocessing import Barrier, Lock, Process from time import time
from datetime import datetime
if __name__ == '__main__':
synchronizer = Barrier(2) “”” número de processo na barreira: 2 “””
serializer = Lock()
Process(name='p1 - test_with_barrier‘ , target=test_with_barrier,\
args=(synchronizer, serializer)).start() Process(name='p2 - test_with_barrier', target=test_with_barrier,\
args=(synchronizer,serializer)).start()
Process(name='p3 - test_without_barrier‘, target=test_without_barrier).start() Process(name='p4 - test_without_barrier‘ ,target=test_without_barrier).start()
Python multiprocessing
Sincronização de processos
def test_with_barrier(synchronizer, serializer):
name = multiprocessing.current_process().name synchronizer.wait() # barrier
now = time()
with serializer:
print("process %s ----> %s" %(name,datetime.fromtimestamp(now)))
def test_without_barrier():
name = multiprocessing.current_process().name now = time()
print("process %s ----> %s" %(name ,datetime.fromtimestamp(now)))
2.56
Python multiprocessing
Sincronização de processos
Output:
process p3 - test_without_barrier ----> 2018-04-01 19:27:49.878603 process p2 - test_with_barrier ----> 2018-04-01 19:27:49.878603 process p1 - test_with_barrier ----> 2018-04-01 19:27:49.878603 process p4 - test_without_barrier ----> 2018-04-01 19:27:49.931988
Instrução with
import threading import logging
logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s)
%(message)s',) if __name__ == '__main__':
lock = threading.Lock()
t1 = threading.Thread(target=threading_with, args=(lock,))
t2 = threading.Thread(target=threading_not_with, args=(lock,)) t1.start()
2.58
#Instrução with
def threading_with(statement):
with statement:
logging.debug('%s acquired via with' %statement)
def threading_not_with(statement):
statement.acquire() try:
logging.debug('%s acquired directly' %statement ) finally:
statement.release()
#Instrução with
Output:
•(Thread-5 ) <_thread.lock object at 0x000000000689ADC8> acquired via with
•(Thread-6 ) <_thread.lock object at 0x000000000689ADC8> acquired directly
Com with o lock é adquirido, só existe dentro do âmbito do with e é libertado quando termina o bloco do with
2.60
Python multiprocessing
Process Pool
- Permite criar um conjunto de processos
Algumas funções:
- map () – divide um conjunto iterável de dados em pedaços que submete aos processos da pool como tarefas individuais; bloqueia até à obtenção do resultado;
- map_async() – map não bloqueante; devolve um objeto
Python multiprocessing
Process Pool - exemplo
import multiprocessing
def function_square(data):
result = data*data return result
2.62
Python multiprocessing
Process Pool - exemplo
if __name__ == '__main__':
inputs = list(range(0,100))
pool = multiprocessing.Pool(processes=4)
pool_outputs = pool.map(function_square, inputs) print ('Pool :', pool_outputs)
inputs2 = list(range(100,200))
pool2 = multiprocessing.Pool(processes=4)
x = pool2.map_async(function_square, inputs2) #some work ....
x.wait()
print ('Pool :', x.get())