3.4 Exemplo: Um Arcabouço para Processamento MapReduce
3.4.2 MapReduce na HPC Shelf
Dada sua importância no cenário de computação em nuvens, o MapReduce torna-se uma das aplicações para validação da HPC Shelf e por consequência, do Alite. A implementação na HPC Shelf, no entanto, consiste em criar, não só a as funções de mapeamento e redução, como também todo o ambiente de execução, permitindo que diferentes configurações do framework de execução sejam definidas.
Um sistema de computação paralela MapReduce é constituído por instâncias dos componentes de computação MAPPER e REDUCER, ligados através de instâncias dos com-
Figura 6 – Visão de Execução
Fonte: (DEAN; GHEMAWAT, 2004). Figura 7 – Componente MAPPER(Agentes de Mapeamento)
[(TK,TV)] feed-pairs collect-pairs [(IK,IV)] virtual platform Mapper map units input_key_type IK Data input_value_type IV Data function F Function output_key_type TK Data output_value_type TV Data
Fonte: Próprio Autor
ponentes conectores SPLITTERe SHUFFLER. Esses quatro componentes são respectivamente
ilustrados nas figuras 7, 8, 9 e 10. Nessas ilustrações, deve-se observar que cada componente possui uma porta de entrada chamada collect_pairs e uma porta de saída chamada feed_pairs. O serviço dessas portas é a transmissão de listas de pares chave/valor (e.g. [hK, Vi]) ou pares chave/multi-valor (e.g. [hK, [V]i]), onde K e V representam, respectivamente, tipos arbitrários de chaves e valores. Os componentes de computação possuem ainda uma porta de alocação, chamada virtual platform nas figuras 7 e 8. Finalmente, os parâmetros de contexto de aplicação são também apresentados nas figuras.
1. MAPPER: agente de mapeamento, que aplica uma função a cada par chave/valor recebido
através da porta collect_pairs, retornando, para cada um, uma lista de pares chave/valor através da porta feed_pairs.
2. REDUCER: agente de redução, que aplica uma função sobre par chave/multi-valor recebido através da porta collect_pairs, emitindo, para cada par, um único par chave/valor através da porta feed_pairs.
3. SPLITTER: encaminha pares chave/valor recebidos através suas portas collect_pairs, uma
para cada faceta coletora, entre suas portas feed_pairs, cada uma associada a uma de suas facetas alimentadoras. Portanto, suas facetas coletora e alimentadora são múltiplas, podendo serem usadas para conectar múltiplos agentes de mapeamento e/ou redução, seja para receber saídas desses agentes ou para enviar entradas para eles, como ilustrado na Figura 11.
4. SHUFFLER: Diferencia-se do SPLITTERpor agrupar os pares chave/valor recebidos de suas
portas collect_pairs que possuem uma mesma chave em um único par chave/multi-valor, antes de encaminhá-las através de suas portas feed_pairs.
As figuras 7, 8, 9 e 10 omitem as portas de ações oferecidas por esses componentes, utilizadas para orquestração do sistema MapReduce realizada pelo seu componente workflow. Antes de apresentá-las, é preciso explicar que os pares chaves/valor são transmitidos entre agentes de computação e conectores em listas de chunks, onde um chunk corresponde a uma lista de pares chave/valor ou chave/multivalor, apenas com o propósito de controle da granularidade do processamento e redução da sobrecarga de comunicação resultante da transmissão individual de cada par.
Uma porta de ações chamada task_collector_read_chunk encontra-se presente em Figura 8 – Componente REDUCER(Agentes de Redução)
feed-pairs collect-pairs [(TK,[TV])] [(OK,OV)] virtual platform Reducer reduce units input_key_type TK Data input_value_type TV Data function F Function output_key_type OK Data output_value_type OV Data
Figura 9 – Componente SPLITTER [(IK,IV)] feed-pairs collect-pairs [(IK,IV)] Splitter collector facet feeder facet collect
units feedunits
key_type IK Data
value_type IV Data
partition_function F Function
Fonte: Próprio Autor
facetas coletoras de conectores, com uma única ação contendo os seguintes nomes de ações alternativos: READ_CHUNK, FINISH_CHUNK. A primeira é ativada pelo conector sempre que um chunké lido através da porta collect_pairs, enquanto a última é ativada após o último chunk ser lido, sinalizando terminação da entrada associada àquela faceta. Para orquestração da leitura de um conjunto de pares, o componente workflow pode realizar a ativação alternativa dos dois nomes de ação como controle de uma iteração, onde a iteração é finalizada quando a ativação se completa com um FINISH_CHUNK, como no fragmento de código SAFeSWL a seguir:
<iterate id_port="task_collector_read_chunk" until="FINISH_CHUNK" loop="READ_CHUNK"> ...
</iterate>
Além disso, uma porta de ações chamada task_collector_active_status encontra- se presente em cada faceta coletora, cujas ações são identificadas pelos seguintes nomes: BEGIN_CHANGE_STATUS, ACTIVE, INACTIVE, END_CHANGE_STATUS. O objetivo dessa porta é a habilitação ou desabilitação da leitura de pares através da faceta coletora a ela associada. A primeira e a última servem para o componente workflow sinalizar que pretende modificar o status da faceta, por razões de segurança de execução, enquanto as duas outras servem para Figura 10 – Componente SHUFFLER
[(TK,[TV])] feed-pairs collect-pairs [(TK,TV)]
Shuffler
collector facet feederfacetcollect
units unitsfeed
key_type TK Data
value_type TV Data
partition_function F Function
Figura 11 – Múltiplas Facetas em SPLITTERe SHUFFLER feed-pairs collect-pairs Splitter/Shuffler feeder facet #0 collect
units feedunits
feeder facet #1 feeder facet #n collector facet #0 collector facet #1 collector facet #m collect units collect units feed units feed units collect-pairs collect-pairs feed-pairs feed-pairs Fonte: Próprio Autor
Figura 12 – Sistema MapReduce Simples
Fonte: Próprio Autor
sinalizar que a faceta encontra-se no estado habilitado ou desabilitado, respectivamente. É importante enfatizar que a ativação de ACTIVE e INACTIVE só pode ser realizada entre chamadas de BEGIN_CHANGE_STATUS e END_CHANGE_STATUS.
Por sua vez, cada faceta alimentadora de um conector possui duas portas de ações: task_feeder_read_chunk e task_feeder_chunk_ready. A primeira possui semântica análoga à porta task_collector_read_chunk das facetas coletoras, enquanto a última possui uma ação com o nome CHUNK_READY, utilizada para sinalizar ao componente workflow que um chunk está pronto para ser lido pelo agente de computação conectado na porta feed_pairs.
Finalmente, cada agente de computação (MAPPER ou REDUCER) possui uma
única porta de ações (task_map ou task_reduce) que possui ações com os seguintes nomes: READ_CHUNK, PERFORM e CHUNK_READY. A primeira e a última possuem semântica análoga às ações de mesmo nome descritas para os conectores, enquanto PERFORM deve ser ativada após READ_CHUNK completar-se, a fim de que o agente de computação aplique sua função (mapea- mento ou redução) sobre os pares contidos no chunk.
A Figura 12 apresenta a arquitetura de um sistema MapReduce simples, composto de uma única etapa de mapeamento/redução.Tanto a etapa de mapeamento quanto a etapa de redução são realizadas por um par de agentes. Todavia, enquanto os dois agentes de mapeamento estão executando sobre a mesma plataforma virtual (platform map), os dois agentes de redução estão executando sobre plataformas distintas (platform 1st reducer e platform 2nd reducer). Essa arquitetura pode, por exemplo, ser usadas para uma contagem de ocorrências de palavras em um conjunto de arquivos de texto, lidos a partir de um repositório de dados representado por um componente chamado text files, da espécie fonte de dados, sendo o resultado do processamento lido pelo componente application. Para conectar os agentes de saída, respositório de entrada e aplicação foram necessários dois conectores SPLITTER(splitter input e splitter output) e um conector SHUFFLER(shuffler).
Por sua vez, a Figura 13 apresenta um exemplo de sistema MapReduce iterativo composto de duas etapas de mapeamento e redução, onde, na segunda etapa, pares de agentes de mapeamento e de redução são empregados.
Em sistemas iterativos, condições que governam a terminação no código de orques- tração podem ser sinalizadas através das funções de particionamento ou de computação (mapea- mento ou redução), que podem ser customizadas de acordo com o sistema MapReduce. Para isso, cada componente PARTITIONFUNCTION, MAPFUNCTIONe REDUCEFUNCTIONexpõe uma porta de ações cujo tipo pode ser customizado através dos contratos contextuais dos conectores SHUFFLERe SPLITTER(parâmetro partition_function_action_port_type), bem como dos agen- tes MAPPER(map_function_action_port_type) e REDUCER(reduce_function_action_port_type),
de acordo com a semântica do sistema de computação paralela. Figura 13 – Sistema MapReduce Iterativo
Figura 14 – Sistema MapReduce para Multiplicação de Matrizes
Fonte: Próprio Autor