• Nenhum resultado encontrado

5.2 MapReduce Sobre a HPC Shelf

5.2.5 Orquestração em Sistemas MapReduce

A fim de explicar o fluxo de orquestração de sistemas de computação paralela Ma- pReduce, usaremos o exemplo de arquitetura da Figura 30, o qual foi empregado para a contagem de ocorrências de palavras em arquivos de texto contidos em um diretório acessível por um

componente fonte de dados de entrada. Vamos lembrar, inicialmente, que os componentes de computação e conectores envolvidos contém ações que sincronizam o fluxo de execução e troca de dados, suportados pelas definições da linguagem SAFeSWL, usando os bindings de tarefa. O componente binding, genérico, é do tipo TASKBINDINGBASE, possuindo um parâmetro de

contexto denominado task_port_type, do tipo TASKPORTTYPE, que delimita o tipo do binding,

ou seja, o conjunto de nomes de ações que suporta. Os argumentos usados nos conectores (SPLITTERe SHUFFLER) de sistemas MapReduce são TASKPORTTYPEADVANCEREADCHUNK,

TASKPORTTYPEADVANCEPERFORMe TASKPORTTYPEADVANCECHUNKREADY, respec-

tivamente contendo uma única ação: READ_CHUNK, PERFORM e CHUNK_READY. Há ainda o tipo TASKPORTTYPEADVANCE, usado em MAPPERe REDUCER, que inclui todas as três ações.

No caso do sistema de contagem de palavras, que não é iterativo, somente não é explorado o paralelismo entre múltiplos agentes de mapeamento e redução, ou seja, um único agente de mapeamento é responsável pela fase de mapeamento e um único agente de redução é responsável pela fase de redução. O paralelismo ocorre entre cada agente de mapeamento e de redução, bem como no pipeline formado entre esse dois.

A seguir, descrevemos os passos da execução do sistema MapReduce de contagem de palavras, de acordo com a nomenclatura apresentada na Figura 30:

• A fonte de dados de entrada (text files) oferece o acesso a um conjunto de arquivos de texto ao conector spitter_input, através de um binding cujo tipo da porta provedora, ligada a data_source, é PORTTYPETEXTFILESREADER e cujo tipo da porta usuária, ligada a

spitter_input, é PORTTYPEITERATOR. Portanto, esse binding é capaz de ler o conteúdo

dos arquivos de texto e emitir uma sequência de “chunks”, que são sequências de pares chave/valor de entrada, representando o conteúdo das linhas desses arquivos, onde a chave representa um inteiro sequencial e o valor representa o conteúdo da linha em si. O conector spitter_input lê os “chunks” incrementalmente, operação completada cada vez que se completa uma ativação da ação READ_CHUNK, na sua porta task_port_read_chunk. • Cada vez que se completa uma ativação da ação PERFORM na porta task_port_perform,

spitter_input usa a função de particionamento default para distribuir os pares de entrada do “chunk” atual, mais recentemente obtido, entre as facetas fornecedoras, com base na chave

inteira. A cada “chunk” distribuído, a ação CHUNK_READY da porta task_port_chunk_ready é ativada, a partir de quando uma ativação à ação READ_CHUNK da porta task_map pode ser ativada, para que mapper consuma o “chunk” produzido.

• O agente de mapeamento mapper, após a leitura de um chunk, aplica a função de mape- amento a cada par de entrada quando uma ativação da ação PERFORM é completada em sua porta task_map. Essa função de mapeamento emite, para cada linha, um conjunto de pares chave/valor intermediários, associando a uma palavra contida na linha o número de ocorrências nessa linha. Quando uma certa quantidade de pares é emitida, um “chunk” é formado e a ação CHUNK_READY é ativada. Quando essa ação se completa, uma ativação da ação READ_CHUNK de shuffler pode ser ativada, para consumir o “chunk” de pares intermediários.

• O conector shuffler, a cada vez que uma ativação da ação PERFORM é completada em sua porta task_shuffle_perform, distribui os pares intermediários do “chunk” atual entre suas facetas fornecedoras, de acordo com a função de particionamento default, onde são agrupadas em pares chave/multi-valor que serão consumidas de forma incremental pelo agente de redução reducer. Nesse caso, a ação CHUNK_READY é ativada cada vez que um “chunk” é completamente lido. Uma vez completada essa ativação, a ativação da ação

READ_CHUNK do reducer pode ser completada.

Nesse ponto, valem algumas observações importantes. Os pares chave/multi-valor emitidos pelo shuffler só estão completamente definidos após o processamento de todos os “chunks” intermediários, quando todos os valores associados a uma chave são recebidos. No caso do contador de palavras, é preciso processar todas as linhas de entrada a fim de que sejam agrupadas todas as ocorrências de palavras em todas essas linhas. Logo, a fim de manter o paralelismo pipeline entre os agentes de mapeamento e redução, é necessário um mecanismo segundo o qual reducer possa processar os valores de cada par chave/multi- valor emitido pelo shuffler de forma incremental, explicado adiante.

• O agente de redução reducer, a cada vez que uma ativação da ação PERFORM é completada em sua porta task_reduce, processa os valores emitidos pelo shuffler e consumidos após a última ativação da ação READ_CHUNK ter se completado. Além disso, novos pares chave/multi-valor podem surgir. Para que isso seja possível, a definição da função de redução inclui como parâmetro um acumulador que é alimentado com a última saída produzida pela função de redução para uma certa chave de um par chave/multi-valor. Além disso, o agente de redução mantém um dicionário armazenando pares chave/valor de saída parciais, o qual é atualizado cada vez que a função de redução é aplicada a um novo valor emitido associado à chave. Note que os pares chave/valor de saída parcial somente

podem ser considerados pares de saída definitivos após o processamento de todos os pares chave/multi-valor intermediários. Quando isso, acontece, são emitidos pelo agente de redução, em “chunks”, a cada ativação de CHUNK_READY, quando esses pares de saída podem ser lidos pelo conector splitter_output quando a ação READ_CHUNK de sua porta task_port_read_chunk se completa. Em uma aplicação onde seria possível decidir pela finalização de um par de saída antes do final da computação, uma implementação específica do componente REDUCER poderia existir para garantir paralelismo pipeline

entre a fase de redução e fases posteriores, inclusive no caso de computações iterativas. • Finalmente, o conector splitter_output é responsável por alimentar, ao final da computa-

ção, a fonte de dados de saída (count result) com as chaves de saída emitidas pelo agente de redução, cada vez que se completam as respectivas ativações das ações PERFORM e CHUNK_READY. 1 < p a r a l l e l > 2 < i t e r a t e > 3 < s e q u e n c e > 4 < i n v o k e i d _ p o r t =" t a s k _ s p l i t _ i n p u t _ r e a d _ c h u n k " a c t i o n ="READ_CHUNK" / > 5 < i n v o k e i d _ p o r t =" t a s k _ s p l i t _ i n p u t _ p e r f o r m " a c t i o n ="PERFORM" / > 6 < / s e q u e n c e > 7 < / i t e r a t e > 8 < i t e r a t e > 9 < s e q u e n c e > 10 < i n v o k e i d _ p o r t =" t a s k _ s p l i t _ i n p u t _ c h u n k _ r e a d y " a c t i o n ="CHUNK_READY" / > 11 < i n v o k e i d _ p o r t =" task_map " a c t i o n ="READ_CHUNK" / > 12 < s t a r t i d _ p o r t =" task_map " a c t i o n ="PERFORM" h a n d l e _ i d =" h1 " / > 13 < / s e q u e n c e > 14 < / i t e r a t e > 15 < i t e r a t e > 16 < s e q u e n c e > 17 < i n v o k e i d _ p o r t =" task_map " a c t i o n ="CHUNK_READY" / > 18 < i n v o k e i d _ p o r t =" t a s k _ s h u f f l e _ r e a d _ c h u n k " a c t i o n ="READ_CHUNK" >< / i n v o k e > 19 < s t a r t i d _ p o r t =" t a s k _ s h u f f l e _ p e r f o r m " a c t i o n ="PERFORM" h a n d l e _ i d =" h2 " / > 20 < / s e q u e n c e > 21 < / i t e r a t e > 22 < i t e r a t e > 23 < s e q u e n c e > 24 < i n v o k e i d _ p o r t =" t a s k _ s h u f f l e _ c h u n k _ r e a d y " a c t i o n ="CHUNK_READY" / > 25 < i n v o k e i d _ p o r t =" t a s k _ r e d u c e " a c t i o n ="READ_CHUNK" / > 26 < s t a r t i d _ p o r t =" t a s k _ r e d u c e " a c t i o n ="PERFORM" h a n d l e _ i d =" h3 " / > 27 < / s e q u e n c e > 28 < / i t e r a t e > 29 < i t e r a t e > 30 < s e q u e n c e > 31 < i n v o k e i d _ p o r t =" t a s k _ r e d u c e " a c t i o n ="CHUNK_READY" / >

32 < i n v o k e i d _ p o r t =" t a s k _ s p l i t _ o u t p u t _ r e a d _ c h u n k " a c t i o n ="READ_CHUNK" / > 33 < s t a r t i d _ p o r t =" t a s k _ s p l i t _ o u t p u t _ p e r f o r m " a c t i o n ="PERFORM" h a n d l e _ i d =" h4 " / > 34 < / s e q u e n c e > 35 < / i t e r a t e > 36 < i t e r a t e > 37 < i n v o k e i d _ p o r t =" t a s k _ s p l i t _ o u t p u t _ c h u n k _ r e a d y " a c t i o n ="CHUNK_READY" / > 38 < / i t e r a t e > 39 < / p a r a l l e l >

Código 8 – SAFeSWL Fragmento de orquestração MapReduce.

Na Listagem 8, apresentamos um fragmento do fluxo de orquestração MapRe- duce para os componentes na arquitetura apresentada na Figura 30, ignorando a ativação das ações de controle do ciclo de vida dos componentes de computação e conectores envolvidos (splitter_input, mapper, shuffler, reducer e splitter_output).

A Tabela 10 apresenta os argumentos de contexto aplicados aos parâmetros de contexto dos componentes do sistema MapReduce para contagem de palavras.

Nome do Parâmetro Limite Contextual input_key_type INTEGER input_value_type STRING

map_function COUNTWORDS[... ] intermediary_key_type STRING intermediary_value_type INTEGER

reduce_function SUMVALUES[... ] output_key_type STRING output_value_type INTEGER

Tabela 10 – Argumentos de contexto dos componentes para contagem de palavras

Nas seções seguintes, onde apresentamos o arcabouço Gust, teremos oportunidade de mostrar diferentes formas, não-triviais, de combinar os componentes de sistemas MapRe- duce, incluindo algoritmos iterativos e com múltiplas rodadas de mapeamento e redução. Essa característica de composicionabilidade é uma das principais características do arcabouço para sistemas MapReduce proposto nesta Tese de Doutorado, que oferece oportunidades para con- tornar dificuldades expressivas relatadas na literatura em relação a outros arcabouços dentro do estado-da-arte.