• Nenhum resultado encontrado

Mecanismos para escalonamento de aplicações MapReduce de diferentes prioridades

N/A
N/A
Protected

Academic year: 2021

Share "Mecanismos para escalonamento de aplicações MapReduce de diferentes prioridades"

Copied!
76
0
0

Texto

(1)

Universidade Estadual de Campinas Instituto de Computação

INSTITUTO DE COMPUTAÇÃO

Augusto Rodrigues de Souza

Mecanismos para Escalonamento de Aplicações

MapReduce de Diferentes Prioridades

CAMPINAS

2017

(2)

Augusto Rodrigues de Souza

Mecanismos para Escalonamento de Aplicações MapReduce de

Diferentes Prioridades

Dissertação apresentada ao Instituto de Computação da Universidade Estadual de Campinas como parte dos requisitos para a obtenção do título de Mestre em Ciência da Computação.

Orientadora: Profa. Dra. Islene Calciolari Garcia

Este exemplar corresponde à versão final da Dissertação defendida por Augusto Rodrigues de Souza e orientada pela Profa. Dra. Islene Calciolari Garcia.

CAMPINAS

2017

(3)

Agência(s) de fomento e nº(s) de processo(s): Não se aplica. ORCID: http://orcid.org/0000-0001-9296-7769

Ficha catalográfica

Universidade Estadual de Campinas

Biblioteca do Instituto de Matemática, Estatística e Computação Científica Ana Regina Machado - CRB 8/5467

Souza, Augusto Rodrigues de,

So89m SouMecanismos para escalonamento de aplicações MapReduce de diferentes prioridades / Augusto Rodrigues de Souza. – Campinas, SP : [s.n.], 2017.

SouOrientador: Islene Calciolari Garcia.

SouDissertação (mestrado) – Universidade Estadual de Campinas, Instituto de Computação.

Sou1. Sistemas distribuídos. 2. MapReduce (Programação paralela distribuída). 3. Processamento eletrônico de dados - Processamento distribuído. I. Garcia, Islene Calciolari,1971-. II. Universidade Estadual de Campinas. Instituto de Computação. III. Título.

Informações para Biblioteca Digital

Título em outro idioma: Scheduling mechanisms for MapReduce applications with distinct

priorities

Palavras-chave em inglês:

Distributed systems

MapReduce (Distributed parallel programming) Electronic data processing - Distributed processing

Área de concentração: Ciência da Computação Titulação: Mestre em Ciência da Computação Banca examinadora:

Islene Calciolari Garcia [Orientador] Luiz Fernando Bittencourt

Gustavo Maciel Dias Vieira

Data de defesa: 28-04-2017

Programa de Pós-Graduação: Ciência da Computação

(4)

Universidade Estadual de Campinas Instituto de Computação

INSTITUTO DE COMPUTAÇÃO

Augusto Rodrigues de Souza

Mecanismos para Escalonamento de Aplicações MapReduce de

Diferentes Prioridades

Banca Examinadora:

• Profa. Dra. Islene Calciolari Garcia Instituto de Computação — UNICAMP • Prof. Dr. Gustavo Maciel Dias Vieira

Departamento de Computação — UFSCar-Sorocaba • Prof. Dr. Luiz Fernando Bittencourt

Instituto de Computação — UNICAMP

A ata da defesa com as respectivas assinaturas dos membros da banca encontra-se no processo de vida acadêmica do aluno.

(5)

Agradecimentos

À minha esposa Carolina e minha filha Julieta pelo apoio e compreensão principalmente quando o escasso tempo que deveria ser gasto com a família foi dedicado a este trabalho. Ao meu pai Nelson, minha mãe Vera Ruth e minhas irmãs Katia e Camila pela minha formação como cidadão e estudante, por me fazerem me interessar por estudar.

Às comunidades do Hadoop e do Disco que nos ajudaram fornecendo os projetos que foram utilizados nesse trabalho de mestrado. Em especial, aos desenvolvedores da Apache Carlo Curino e Chris Douglas que foram mentores no início do nosso trabalho no Hadoop. Ao serviço de computação na nuvem Digital Ocean que nos forneceu créditos para executarmos nossos testes.

Por último, mas não menos importante, à minha orientadora Professora Doutora Islene Calciolari Garcia pela orientação, paciência, ensinamentos e compreensão. Sem dúvidas, este foi um dos trabalhos mais complicados da minha vida e pude contar com uma orien-tação que o simplificou muito nos momentos mais necessários.

(6)

Resumo

Em 2004, o Google surpreendeu a comunidade de sistemas distribuídos ao divulgar como funcionava seu framework de MapReduce e seu sistema de arquivos distribuído. Tal inova-ção rapidamente chamou a ateninova-ção das comunidades de software livre e diversos sistemas de MapReduce de código aberto foram criados implementando as ideias divulgadas pelo Google. Neste trabalho, focamos nossa atenção em dois deles, chamados Hadoop e Disco. O primeiro é desenvolvido principalmente em Java e o segundo em Erlang e discutimos um pouco sobre como essas linguagens influenciaram a arquitetura e o funcionamento destes sistemas.

Focamos em especial em como esses sistemas trabalham com cargas heterogêneas, ou seja, compostas por aplicações de pesquisa (baixa prioridade e longa duração) e de produção (alta prioridade e baixa duração) em um mesmo cluster. Analisamos a fundo como esses sistemas funcionam, principalmente, quais são os mecanismos que auxiliam na tomada de decisão sobre agendamento de tarefas aos escalonadores. Por fim, propusemos o uso de uma política de escalonamento justa e baseada em preempção de tarefas para auxiliar o escalonador do Disco a dar ênfase à aplicação de produção sem deixar de ser justo, ou seja, faz com que rapidamente a aplicação de produção atinja a quantidade de recursos do cluster que por justiça pertencem à ela em uma divisão justa dos mesmos. Para isso tarefas da aplicação menos prioritária (de pesquisa) sofrem preempção.

Contribuímos para as comunidades do Hadoop e do Disco ao longo desse mestrado: colaboramos na codificação de uma política de preempção e checkpointing ao Hadoop e desenvolvemos uma política justa e que se utiliza de preempção no Disco para priorizar as aplicações de produção com resultados significativos no tempo de execução da nossa carga experimental. Outra contribuição deste trabalho é uma interface Web auxiliar ao Disco para reprodução dos nossos experimentos.

(7)

Abstract

In 2004, Google released a paper in which they described how their MapReduce framework worked and how they structured their distributed file system. This innovation made the open software communities to organize themselves in order to develop open source alternatives for the Google’s MapReduce framework. In this work, we focused in two of these systems: Hadoop and Disco. The former was developed mainly in Java and the later in Erlang, and we also discuss how these languages influenced the architecture and the behavior of these systems.

Especially we looked to how these systems address the problem related to hetero-geneous workloads which are composed of research applications (low priority and long duration) and production (high priority and short duration) in the same cluster. We an-alyzed deeply how these systems work and which are the mechanisms they use to help in the scheduling decision making. Lastly, we proposed the use of a Fair scheduling policy based in using preemption of tasks to guide the scheduler of Disco to give emphasis to the production application without giving up of being fair. As a consequence the produc-tion applicaproduc-tion quickly gets the resources designated to it in a Fair division. For this to happen, tasks with the lower priority application (research) are affected by preemption.

We also contributed to the communities of Hadoop and Disco: we collaborated in the development of a preemption policy and checkpointing service for Hadoop and developed a Fair scheduling policy based on preemption for Disco to help it prioritize production applications with good results in the execution time for our experimental workload. An-other contribution of this work was a Web interface for Disco to help in the reproduction of our experiments.

(8)

Lista de Figuras

2.1 Execução do contador de palavras em um ambiente MapReduce . . . 19

3.1 Execução de MapReduce (traduzido de [19]) . . . 24

3.2 Arquitetura clássica do Hadoop (traduzido de [50]) . . . 25

3.3 Arquitetura do YARN (traduzido de [17]) . . . 26

3.4 Divisão de recursos utilizando-se o Hadoop Fair Scheduler . . . 27

3.5 Divisão de recursos com o Hadoop Capacity Scheduler . . . 28

4.1 Visão geral da arquitetura do Disco (traduzido de [13]) . . . 32

4.2 Troca de mensagens do protocolo Worker para execução de uma tarefa ([13]) 33 4.3 Pipeline do Disco: abstração baseada em uma sequência de estágios . . . . 34

4.4 Política de agrupamento: split ([54]) . . . 35

4.5 Política de agrupamento: group_all ([54]) . . . 35

4.6 Política de agrupamento: group_label ([54]) . . . 36

4.7 Política de agrupamento: group_label_node ([54]) . . . 36

4.8 Política de agrupamento: group_node ([54]) . . . 37

4.9 MapReduce segundo as regras do pipeline . . . 38

4.10 Comparação da distribuição dos recursos do cluster para as políticas FIFO e Fair do Disco . . . 39

5.1 Replicação de dados em um sistema de arquivos distribuído e sua relação com o mecanismo de localicade dos dados . . . 42

5.2 Exemplo de execução de tarefas especulativas para o Hadoop . . . 43

5.3 Armazenamento e transporte dos dados intermediários sem a presença de I-File no sistema de arquivos distribuídos . . . 46

5.4 Armazenamento e transporte dos dados intermediários com a presença de I-File no sistema de arquivos distribuídos . . . 47

5.5 Preempção sem checkpointing . . . 48

5.6 Preempção com checkpointing . . . 48

6.1 Melhor utilização dos recursos do cluster com o auxílio de preempção e checkpointing . . . 55

6.2 Preempção das tarefas de uma aplicação menos prioritária pela política de escalonamento “Preemptive Fair”. . . 58

6.3 Concessão dos recursos à aplicação mais injustiçada pela política de esca-lonamento “Preemptive Fair”. . . 58

6.4 Tempos de execução das aplicações de produção . . . 59

6.5 Tempos de execução das aplicações de pesquisa. . . 59

6.6 Utilização do cluster para a execução do experimento com a versão original da política Fair. . . 60

(9)

6.7 Utilização do cluster para a execução do experimento com a política Preemptive Fair que implementamos . . . 61 6.8 Interface do Jupyter Notebooks, famosa ferramenta da comunidade Python 62 6.9 Link para a ferramenta Notebooks exibido no canto superior direito da tela 63 6.10 Tela do Notebooks para a submissão de novas aplicações . . . 64 6.11 Tela principal do Disco Web enquanto aplicações estão em execução . . . . 65 6.12 Tela do Notebooks com a listagem dos relatórios disponíveis . . . 65 6.13 Tela do Notebooks com os resultados dos experimentos de um dado

(10)

Lista de Tabelas

(11)

Sumário

1 Introdução 13 1.1 Contribuições . . . 15 1.2 Organização da dissertação . . . 16 2 MapReduce 17 2.1 Modelo de programação . . . 18 2.2 Ambiente de execução . . . 19 2.3 Exemplos de aplicações . . . 21 3 Hadoop 23 3.1 Arquitetura . . . 24 3.2 Escalonadores . . . 26 3.2.1 Fair Scheduler . . . 27 3.2.2 Capacity Scheduler . . . 28 3.3 Preempção de tarefas . . . 29 4 Disco 30 4.1 Arquitetura . . . 31 4.2 Protocolo do worker . . . 32 4.3 Pipeline . . . 33 4.3.1 Split . . . 34 4.3.2 Group_all . . . 35 4.3.3 Group_label . . . 36 4.3.4 Group_label_node . . . 36 4.3.5 Group_node . . . 37

4.3.6 MapReduce como um pipeline . . . 37

4.4 Políticas de escalonamento . . . 37

5 Mecanismos Auxiliares aos Escalonadores 40 5.1 Localidade de dados . . . 40

5.2 Tarefas especulativas . . . 41

5.3 I-Files . . . 44

5.4 Preempção e checkpointing . . . 45

5.5 Outros trabalhos relacionados . . . 49

6 Contribuições e Resultados 51 6.1 Trabalho realizado no Hadoop . . . 52

6.2 Trabalho realizado no Disco . . . 56

(12)

6.2.2 Notebooks . . . 60

7 Conclusão 67

Referências Bibliográficas 69

A Comparação entre Java e Erlang 74

A.1 Gerenciamento de memória . . . 75 A.2 Concorrência e paralelismo . . . 76

(13)

Capítulo 1

Introdução

Atualmente, convivemos com coleções de dados tão grandes e complexas que ferramentas computacionais tradicionais, como por exemplo bancos de dados relacionais, já não são suficientes para armazenar e analisar esses dados. Um dos principais gargalos é a veloci-dade de acesso ao disco. Enquanto a capaciveloci-dade dos discos cresceu muito com o passar dos anos — um HD típico de 1990 armazenava 1370 MB e atualmente é comum termos HDs de 1 TB — a velocidade de acesso ao disco não acompanhou tal crescimento (4,4 MB/s em 1990 e 100 MB/s em 2012). Com essa capacidade e velocidade de acesso, em média um disco inteiro era lido em 5 minutos em 1990, enquanto no início desta década mais do que duas horas e meia eram gastas para realizar tal atividade [19, Capítulo 1].

Uma solução simples para o problema de se armazenar e processar grande quantidade de dados é distribuí-los em vários discos, formando um cluster de máquinas; assim o conteúdo é lido de vários HDs ao mesmo tempo e a velocidade de acesso global é maior. Porém, isso cria uma série de dificuldades, como por exemplo:

• de que forma desenvolver aplicações que leem seus dados desse sistema de arquivos distribuído;

• como aproveitar o poder de processamento do cluster, pois não seria eficiente utilizar apenas o poder de armazenamento;

• como identificar e se recuperar em casos de falhas de hardware e rede;

• como estruturar uma arquitetura com máquinas convencionais (commodity machi-nes) para diminuir os custos.

O modelo computacional MapReduce conforme o Google propôs [6] tinha como ob-jetivo principal facilitar o tratamento dessas questões comuns a diversas aplicações dis-tribuídas em um framework. Dessa maneira, os programadores podem fazer uso desse framework ao invés de reimplementar as soluções para esses problemas em cada aplicação individualmente. MapReduce se baseia em delegar ao programador a responsabilidade por implementar as funções Map e Reduce. A primeira é responsável por dividir a entrada em vários agrupamentos pequenos e independentes, que são processados em paralelo nas diversas máquinas do cluster. As saídas dos mappers (processos que executam a função

(14)

CAPÍTULO 1. INTRODUÇÃO 14

Map) são entradas para os reducers (processos que executam paralelamente a função Re-duce). Os reducers, por sua vez, combinam os dados para chegar à resposta final. Para que MapReduce funcione adequadamente, é necessário o apoio de um sistema de arquivos distribuído e escalável que forneça os dados de forma confiável e a altas taxas. No caso do Google, o GFS (Google File System) [9] se encarrega disto.

Baseando-se no MapReduce e GFS do Google, alguns projetos de código aberto sur-giram com o objetivo de resolver os mesmos problemas. Tratam-se de maneiras mais acessíveis, já que o sistema utilizado pelo Google é proprietário e somente seus engenhei-ros têm acesso a ele. Neste trabalho vamos analisar dois desses projetos: Hadoop [24] e Disco [33]. O primeiro é mantido pela Apache e tem como objetivo “Desenvolver soft-ware de código aberto para computação confiável, escalável e distribuída”. Além disso, o Hadoop é altamente conhecido e utilizado na indústria inclusive por grandes empresas como Facebook e Yahoo e usa principalmente a linguagem de programação Java em seu núcleo. O segundo, por sua vez, foi desenvolvido pela Nokia em 2008 é mais enxuto em termos de quantidade de linhas de código e complexidade tanto para administração como para adição de novas funcionalidades. O Disco é escrito em Erlang — uma linguagem de programação desenvolvida pela Ericsson especialmente para tratar problemas relacionados a sistemas distribuídos como concorrência, comunicação entre as máquinas de um cluster e tolerância a falhas [2]. Para escrever aplicações MapReduce, o Hadoop possibilita o uso de Java, Python, Ruby e C++, sendo Java a mais popularmente adotada. Aplicações que rodem no Disco podem ser escritas em qualquer linguagem que possua uma biblioteca compatível com o seu núcleo (worker), mas suporta e adota principalmente o Python para essa finalidade.

A popularização das soluções de MapReduce fez com que diversos tipos de aplicações fossem desenvolvidas seguindo seu modelo. Muitas decisões de negócios dependem hoje em dia de aplicações dessa natureza. Algumas dessas decisões são urgentes, como o pro-cessamento dos logs de cliques para medir a efetividade de anúncios em grandes sites; já outras nem tanto, como analisar os dados de uso para decidir onde posicionar anúncios em uma página. É comum que clusters recebam aplicações com diferentes prioridades [5]. Dividimo-nas em duas principais categorias: pesquisa e produção. Normalmente, as apli-cações de pesquisa possuem baixa prioridade e tarefas com tempo de duração longo, enquanto as de produção são mais prioritárias e rápidas no que diz respeito à duração.

O requisito que faz com que seja necessário respeitar as prioridades e rodar aplicações em paralelo para melhor aproveitar um cluster gera a necessidade de mecanismos para que os escalonadores dos frameworks tomem boas decisões sobre qual aplicação deve ter sua tarefa atribuída a um dado recurso que esteja disponível. O principal desses mecanismos é a preempção, que possibilita a interrupção de parte das tarefas de uma aplicação de pesquisa quando uma de produção precisa ser executada. O escalonador principal do Hadoop é o Capacity Scheduler, onde existem mecanismos de preempção e também de recuperação do estado de uma tarefa que sofreu preempção para que a computação não seja perdida quando a execução é retomada.

Ao mecanismo de recuperação do estado das tarefas damos o nome de Checkpointing. As políticas de escalonamento do Disco — FIFO (First In First Out) e Fair (justiça na distribuição dos recursos disponíveis) — possuem limitações no que diz respeito a

(15)

CAPÍTULO 1. INTRODUÇÃO 15

mecanismos para execução de aplicações com diferentes prioridades. FIFO escalona todas as tarefas da primeira aplicação submetida antes de começar a escalonar as da segunda independente da prioridade. Fair demora muito para fornecer à aplicação de produção seus recursos já que a aplicação de pesquisa possui tarefas longas que acabam por retê-lo até o término de sua execução na ausência de um mecanismo de preempção.

Para contribuir com tal discussão descrevemos nas páginas que se seguem o resultado de uma pesquisa sobre mecanimos que auxiliam os escalonadores do Hadoop e do Disco a trabalharem melhor com cargas heterogêneas de aplicações. Chamamos de carga he-terogênea esse uso dos clusters MapReduce com aplicações de prioridades e tempos de execução diferentes em paralelo, ou seja, execução concorrentes de aplicações de pesquisa e produção.

1.1

Contribuições

Além desta dissertação, nossa pesquisa produziu as seguintes contribuições:

• Apresentação do trabalho no IX WTD e consequente publicação do resumo expan-dido:

– Augusto Souza e Islene Garcia. Preempção de Tarefas MapReduce via Check-pointing. Em Anais do IX Workshop de Teses, Dissertações e Trabalhos de Iniciação Científica em Andamento do IC-UNICAMP, páginas 86–91. Insti-tuto de Computação, Universidade Estadual de Campinas, agosto de 2014 [7]. • Artigo aceito no WPerformance 2016 em Porto Alegre e publicado nos Anais do

XXXVI CSBC:

– Augusto Souza e Islene Garcia. A preemptive fair scheduler policy for disco MapReduce framework. Em Anais do XXXVI Congresso da Sociedade Brasi-leira de Computação, páginas 2758–2769. Pontifícia Universidade Católica do Rio Grande do Sul (PUCRS), julho de 2016 [8].

• Trabalho junto à comunidade do projeto Hadoop evidenciado pelas seguintes ativi-dades:

– Atualizações em patch para adequar uma solução de checkpointing desenvol-vida por outros pesquisadores ao código corrente na época deste trabalho:

∗ Preemption of Reducer (and Shuffle) via checkpointing

https://issues.apache.org/jira/browse/MAPREDUCE-5269

– Proposta de divisão de patch em componentes menores, mais facilmente atua-lizáveis e testáveis:

∗ Add support for PartialFileOutputCommiter when checkpointing is an op-tion during preempop-tion

(16)

CAPÍTULO 1. INTRODUÇÃO 16

∗ Add a checkpointable version of shuffle and reduce context supported by a checkpoint manager which uses the HDFS

https://issues.apache.org/jira/browse/MAPREDUCE-6444

• Pesquisa e desenvolvimento de funcionalidades e ferramentas para o projeto Disco: – Disponibilização do código da política de escalonamento Fair e com preempção

à comunidade:

∗ Repositório no Github com a mesma licença do Disco e com o código do nova política de escalonamento

https://github.com/augustorsouza/disco/tree/fair_policy_with_ preemption

– Arquitetura e desenvolvimento de solução para execução facilitada de aplica-ções no Disco por meio de uma interface Web já pré-configurada para facilitar a reprodução dos experimentos descritos nesta dissertação

∗ Repositório no Github com a mesma licença do Disco e com o código da interface Web Notebooks

https://github.com/augustorsouza/disco/tree/notebooks

1.2

Organização da dissertação

O restante dessa dissertação se encontra organizado da seguinte forma:

• Capítulo 2: descreve o modelo de programação e o ambiente de execução para aplicações MapReduce conforme proposto pelo Google [6];

• Capítulo 3: detalha como MapReduce foi implementado no Hadoop; • Capítulo 4: detalha como MapReduce foi implementado no Disco;

• Capítulo 5: descreve alguns mecanismos auxiliares aos escalonadores de sistemas MapReduce;

• Capítulo 6: aqui apresentamos os resultados atingidos pela nossa pesquisa deta-lhando as contribuições para o Hadoop e para o Disco;

• Capítulo 7: explicita conclusões e trabalhos futuros;

• Apêndice A: auxilia no entendimento das diferenças entre Hadoop e Disco por meio do entendimento das diferenças entre as principais linguagens de programação uti-lizadas por esses dois sistemas.

(17)

Capítulo 2

MapReduce

Em 2004, pesquisadores do Google divulgaram uma pesquisa muito importante para a área de sistemas distribuídos por auxiliar a comunidade científica a entender como essa em-presa conseguia produzir e computar quantidades tão massivas de dados [6]. De maneira empírica, os pesquisadores do Google notaram que as aplicações distribuídas executadas nos clusters da empresa continham diversas similaridades no que diz respeito às operações sobre os dados e ao ambiente escalável propício para sua execução.

O formato dessas aplicações podia ser abstraído com o auxílio de duas operações co-muns em linguagens funcionais (como Lisp, por exemplo). A primeira operação chamada de Map, seria responsável por mapear os dados iniciais em conjuntos de valores que com-partilhavam a mesma chave. Na sequência, a operação de Reduce seria responsável por aplicar uma computação sobre o conjunto gerado por Map, produzindo uma saída fi-nal que seria uma resposta para a computação pretendida. Após notar essa organizaçao comum às aplicações distribuídas, os pesquisadores do Google propuseram um modelo de programação aos desenvolvedores que criavam aplicações distribuídas baseadas nessas duas operações. Dessa maneira, todos os detalhes referentes à execução distribuída não precisariam mais serem implementados por cada aplicação, mas sim por um ambiente de execução comum. Na Seção 2.1 mostramos esse modelo de programação com mais deta-lhes e um exemplo de aplicação expressa segundo essa abstração. Diversos problemas do mundo real podem ser expressos seguindo esse formato.

O ambiente de execução desejado para as aplicações precisaria atender a uma série de requisitos. O primeiro deles, executar de maneira paralelizada sobre máquinas con-vencionais. A quantidade de computadores e CPUs que cada um possui pode variar de alguns poucos a milhares. Máquinas podem ser adicionadas ou removidas do cluster sem interrupções de sua utilização, o que também facilita manutenções. Quando se trabalha com milhares de máquinas, detalhes referentes à tolerância a falhas são essenciais, já que uma falha completa ou parcial de um dado computador é mais provável nessa situação e isso não deve ser motivo para que o ambiente deixe de estar disponível ou que uma dada execução seja interrompida. Por fim, deve-se escalonar os pedaços dessas aplicações (tarefas) de maneira eficiente, por meio da gerência da comunicação entre as máquinas. Na Seção 2.2 veremos como o Google atende esses requisitos.

Para finalizar o capítulo, na Seção 2.3, explicamos em altíssimo nível como alguns problemas comuns podem ser implementados utilizando-se da lógica proposta por

(18)

CAPÍTULO 2. MAPREDUCE 18

Reduce.

2.1

Modelo de programação

O usuário da biblioteca de MapReduce em sua aplicação expressa sua computação em termos de duas funções: Map e Reduce. Map processa um arquivo de entrada e gera pares chave-valor. Esses constituem um conjunto intermediário de dados, que serão agrupados pelo ambiente de execução de acordo com a chave, e repassados para a função de Reduce. Reduce, por sua vez, recebe a chave intermediária anterior e o conjunto de valores mapeado a essa chave e mescla-os tipicamente em zero ou um único valor de saída. É importante notar que para serem computáveis, os dados intermediários são lidos em lotes para que não possam ocupar mais memória do que há disponível.

Para entender melhor o modelo de programação MapReduce um exemplo pode auxi-liar. Considere o problema de contar a quantidade de ocorrências de cada palavra contida em uma coleção de documentos. Nesse casso, o desenvolvedor poderia escrever o seguinte pseudo-código:

Algoritmo 1 Exemplo de função Map para o problema do Contador de Palavras . Chave: nome do documento

. Valor: conteúdo textual do documento function map(Chave, Valor)

for each palavra ∈ V alor do

EmitaIntermediario(palavra, 1) end for

end function

Algoritmo 2 Exemplo de função Reduce para o problema do Contador de Palavras . Chave: uma palavra

. Valores: uma lista de inteiros com valor 1 function reduce(Chave, Valores)

resultado ← 0

for each valor ∈ V alores do resultado ← resultado + 1 end for

EmitaResultado(Chave, resultado) end function

A função Map emite para cada palavra de um dado documento essa mesma palavra e o valor 1. Reduce, por sua vez, recebe para cada palavra um conjunto com a lista dos inteiros emitidos pelo Map, anteriormente. Lembrando que o ambiente de execução tratou de agrupar as saídas do Map nesse formato de chave-valores. Para ser executável, nosso exemplo ainda precisa que o cliente que irá utilizar o sistema de MapReduce forneça informações a respeito dos dados de entrada e de saída, como os nomes dos arquivos, por exemplo.

(19)

CAPÍTULO 2. MAPREDUCE 19

2.2

Ambiente de execução

O modelo de programação descrito na Seção 2.1 precisa de um ambiente de execução escalável para tirar melhor proveito dos recursos de computação disponíveis. Apesar de nesta dissertação focarmos em execuções de aplicações que geram ou consomem uma grande quantidade de dados em clusters de máquinas convencionais, existem adaptações do modelo de programação MapReduce para outros ambientes como o de memória com-partilhada, por exemplo [14]. Além disso, o tipo de sistema de MapReduce base para esta dissertação é aquele que compartilha os recursos de computação, armazenamento e comunicação entre as máquinas de maneira transparente aos desenvolvedores. Todos os computadores do cluster estão conectados e possuem informações sobre a localização de seus pares (se estão no mesmo rack, por exemplo). Cada uma dessas máquinas possui um disco rígido tradicional que é utilizado pelo sistema de arquivos distribuído que serve como base para o MapReduce, replicação de dados é o principal mecanismo para disponi-bilidade e integridade dos dados. Linux é o sistema operacional mais usualmente utilizado em tais máquinas e elas podem ou não ter múltiplos núcleos.

Uma aplicação MapReduce é constituída de tarefas que são unidades responsáveis principalmente por executar uma das duas funções primordiais do modelo de programação (Map ou Reduce). Cabe ao escalonador do ambiente de execução receber essas aplicações e decidir em que momento cada recurso será alocado para cada tarefa. Além disso, esse escalonador deve ser capaz de trabalhar com múltiplas aplicações em paralelo, decidindo quais de suas tarefas devem ser direcionadas a cada máquina sempre que existirem recursos disponíveis no cluster.

Figura 2.1: Execução do contador de palavras em um ambiente MapReduce A Figura 2.1 ilustra a execução de uma aplicação de contar palavras descrita na Se-ção 2.1 em um ambiente de execuSe-ção MapReduce conforme descrito pelo Google. Primei-ramente, note que a arquitetura é mestre-escravo, ou seja, a uma das máquinas (mestre) é atribuída a responsabilidade de coordenar o trabalho das demais (escravos). Note

(20)

tam-CAPÍTULO 2. MAPREDUCE 20

bém que cada um dos escravos deve executar a função de Map ou a de Reduce. Além disso, existe um canal de comunicação sempre ativo entre o mestre e os escravos, repre-sentado pelas setas com linhas tracejadas. Dito isso, a aplicação segue o seguinte roteiro de execução:

1. A quantidade de máquinas responsáveis por executar funções de Map e de Reduce é um dado previamente configurado no cluster. Vamos supor que tenhamos M máquinas designadas à Map e R à Reduce. Então, o cliente submete a aplicação e as informações necessárias para a inicialização, como a localização dos arquivos de entrada, por exemplo. Múltiplas cópias desse programa são inicializadas pelas diferentes máquinas do cluster. A entrada é dividida em M partes.

2. À uma das cópias do programa é dada a função de mestre (o restante são consi-deradas escravos e recebem comandos dele). M dos escravos que estão disponíveis recebem a designação de executar Map e R a de executar Reduce.

3. Aqueles que receberam a designação de executar Map leem uma parte da entrada e chamam a função correspondente com os parâmetros correspondentes. Os resultados intermediários produzidos como saída dessa chamada são mantidos em memória. 4. Para não arriscar perder os dados intermediários, periodicamente eles são gravados

em arquivo e particionados em R pedaços. A localização desses arquivos é enviada do escravo correspondente ao mestre para que esse último possa depois repassar aos escravos que executarão Reduce.

5. O mestre notifica os escravos que irão executar Reduce que prontamente via cha-mada remota de funções pedem a parte do resultado da execução de Map que lhes interessa aos escravos correspondentes. Com todos os dados lidos, o escravo respon-sável por Reduce ordena-os de acordo com a chave. Por fim, a função Reduce do usuário é chamada recebendo por parâmetro a chave e o conjunto de valores corres-pondente. A saída é então concatenada a um arquivo de saída que estará gravado no sistema de arquivos distribuído.

Uma importante restrição desse ambiente reside no mestre, pois ele precisa manter uma estrutura de dados que indique a localização para cada máquina que executou a fun-ção de Map das suas R saídas. Sendo assim, os valores de M e R precisam ser muito bem escolhidos para que a memória do mestre possa comportar tamanha estrutura de dados. Além disso, na arquitetura proposta pelo Google, o mestre é um ponto único de falha, ou seja, caso ele falhe não é possível que o sistema se recupere e que a computação até o momento da falha não seja perdida. Apesar disso, a falha de mestre é bastante impro-vável estatisticamente. Por outro lado, os escravos tendem a falhar com mais frequência. Por esse motivo, quando falham o mestre consegue detectar isso por meio do canal de comunicação entre eles e imediatamente pode demandar a reexecução da atividade que o escravo estivesse executando. No caso da mesma atividade ter sido executada por dois escravos diferentes (por uma lentidão de resposta no canal de comunicação, por exemplo), a primeira a ser enviada como resposta ao mestre é aquela que é considerada. Essa úl-tima otimização é denominada Backup Task e auxilia no caso de atrasos pela presença no

(21)

CAPÍTULO 2. MAPREDUCE 21

cluster de máquinas que estejam lentas, porém não tenham falhado ainda (por um disco defeituoso, por exemplo).

Tanto no início, para a leitura dos dados iniciais, como no término, para a gravação dos dados finais em um ambiente confiável, a aplicação MapReduce faz uso de um sistema de arquivos distribuído. Esse tipo de sistema costuma distribuir um mesmo arquivo em diversas máquinas para garantir a alta disponibilidade das informações. Esse detalhe de implementação possibilita uma outra otimização garantida pelo ambiente de execução descrito nesta seção denominada: localidade dos dados. Essa otimização busca diminuir a quantidade de dados trafegados na rede do cluster através da alocação de tarefas próximas aos seus dados, ou seja, suponha que uma dada máquina irá executar a função de Map e precise de um determinado arquivo para a computação a ser realizada, o ambiente sabendo disso pode garantir que uma máquina onde o arquivo esteja gravado seja também onde a função será executada. Esse detalhe garante que o arquivo não precise ser lido e transportado pela rede.

2.3

Exemplos de aplicações

Nesta seção vamos exemplificar mais algumas aplicações sobre a ótica do modelo de pro-gramação MapReduce [6]:

Grep Distribuído: caso um padrão seja reconhecido em uma dada linha de texto a função Map emite essa linha. Reduce apenas reemite o mesmo valor, servindo apenas para copiar o resultado intermediário para a saída final.

Contagem de frequência de acessos a URL: muito similar ao contador de palavras, nesse caso, Map se encarrega de emitir um par com as URLs e um valor 1, a partir do processamento de um arquivo de log de requisições de páginas da Web. Reduce deve computar a soma das aparições das URLs.

Grafo reverso de links da Web: dada uma página da Web (fonte) com diversos links em seu conteúdo textual, Map deve varrê-la para emitir pares compostos por link e fonte. Reduce concatena as fontes que apontam para um determinado link gerando um mapeamento entre link e a lista de fontes que apontam para ele.

Frequência de palavras em um host: um host pode conter muitos documentos e deseja-se saber as palavras mais frequentes dentro deles. Map gera para cada documento uma estrutura de dados contendo a palavra e sua frequência, no final possuirá uma lista dessas estruturas chamada de vetor de termos. A saída de Map será um par o host e esse vetor de termos. Reduce mescla os vetores de termos de cada documento por meio da soma das frequências e descarta as palavras menos frequentes. No fi-nal, produz o vetor de termos final para um dado host que contêm a informação compilada.

Índice inverso: esse programa é útil para saber em quais documentos algumas palavras aparecem. Map emite para cada palavra que aparece no documento um identificador

(22)

CAPÍTULO 2. MAPREDUCE 22

único. Reduce se encarrega de ordenar os identificadores dos documentos e mesclá-los gerando uma lista de documentos para cada palavra. Facilmente, poderíamos também monitorar o posicionamento das palavras.

Ordenação distribuída: a função de Map extrai a chave de cada registro e emite essas informações. Como a computação intermédiaria entre as fase de Map e Reduce se encarregam de ordenar os dados intermediários, Reduce pode apenas emitir as chaves e valores sem alterações.

Importante notar que neste trabalho de mestrado não focamos em uma aplicação espe-cífica mas em uma taxonomia de aplicações que as divide entre produção e pesquisa [5]. As aplicações de produção tendem a ser mais urgentes, seus resultados precisam ser calcula-dos rapidamente, por isso possuem uma prioridade alta, porém sabe-se que são executadas rapidamente. As aplicações de pesquisa, por sua vez, são menos prioritárias e possuem uma execução mais demorada. Costuma-se exigir que os clusters consigam trabalhar com cargas heterogêneas, ou seja, constituídas por esses dois tipos de aplicações e é um desafio aos escalonadores se comportarem bem nessa situação.

(23)

Capítulo 3

Hadoop

Hadoop contém uma implementação de MapReduce disponível livremente através da li-cença Apache [25]. Seu código base é primariamente escrito em Java assim como as aplicações MapReduce que rodam sobre sua plataforma. Utilizando a biblioteca Hadoop Streaming [40] que faz uma interface com outras linguagens de programação, o desenvol-vedor pode optar também por Python, Ruby e C++.

Hadoop foi promovido pela Apache a um projeto de nível significativo em 2008, mas sua história começa muito antes. Em 2002, Doug Cutting, criador do projeto Apache Nutch, o fez com o objetivo de dar uma solução de busca na Web mais democrática e aberta. O Nutch pode ser considerado o pai do Hadoop, pois esse projeto utilizava sistemas distribuídos como base para um rastreador e indexador das páginas da Web. Porém, em 2003 já era notável que a solução de Cutting não seria escalável. Nesse mesmo ano, o Google publicou o artigo sobre seu sistema de arquivos — o Google File System (GFS) [9]. Uma solução similar a desse sistema de arquivos ajudaria o projeto Nutch a armazenar a imensa quantidade de dados gerados pelo seu algoritmo de rastreamento da internet. Em 2004, eles finalizaram a implementação do NDFS (Nutch Distributed File System) que foi a base para o HDFS (Hadoop Distributed File System) do Hadoop [16]. Em 2005, depois de se inspirar novamente em um trabalho do Google, dessa vez a respeito de MapReduce [17], os desenvolvedores do Nutch criaram uma versão similar de computação dentro do projeto e portaram diversos de seus algoritmos para esse novo sistema. Novamente, o ganho de escalabilidade e desempenho foi impressionante e decidiu-se por modulizar os componentes de computação MapReduce e sistemas de arquivos do Nutch, criando-se as bases para o projeto Hadoop. Graças ao investimento do Yahoo no Hadoop, que veio a contratar Doug Cutting e a prover um time e recursos para incrementar e acelerar o desenvolvimento da solução, foi que o projeto ganhou ainda mais notoriedade. Prova disso é que além do Yahoo, em janeiro de 2008, empresas como Last.fm, Facebook e New York Times já haviam divulgado casos de sucesso com o uso da plataforma. Destaque especial para o projeto do New York Times junto à Amazon EC2 para digitalizar o seu acervo [51].

Atualmente, Hadoop pode ser considerado o projeto de MapReduce com maior adoção e de maior sucesso dentro da comunidade de sistemas distribuídos, sendo utilizado por dezenas de empresas como a sua solução padrão para armazenamento e computação de grandes quantidades de dados. Isso pode ser notado pela quantidade de produtos que

(24)

CAPÍTULO 3. HADOOP 24

utilizam Hadoop de alguma maneira, existem distribuições feitas pela EMC, IBM, Mi-crosoft e Oracle, além de grandes empresas especializadas na plataforma como Cloudera, Hortonworks e MapR [19].

Neste capítulo falamos sobre a arquitetura do Hadoop (Seção 3.1), sobre seus princi-pais escalonadores (Capacity e Fair) (Seção 3.2) e sobre como esse sistema trabalha com preempção de tarefas (Seção 3.3).

3.1

Arquitetura

Figura 3.1: Execução de MapReduce (traduzido de [19])

Um arquivo de entrada previamente gravado no HDFS é utilizado em conjunto com funções de Map e Reduce para formar o que chamamos de aplicação no Hadoop. Conforme ilustrado na Figura 3.1, primeiramente, os mappers leem os dados de entrada do HDFS e produzem sua saída (chave-valor). Então, cada reducer computa um intervalo de chaves e seus respectivos valores, para isso é necessário que os reducers copiem a saída dos mappers diretamente do disco local de cada máquina que as processou. Essa fase de cópia é denominada Shuffle. Após a fase de Reduce, múltiplas saídas são concatenadas em um arquivo que é gravado no HDFS.

A primeira proposta do time do Hadoop para possibilitar a execução de tarefas Map-Reduce foi dividir as máquinas do cluster dentre as seguintes responsabilidades: Job-Tracker, TaskJob-Tracker, NameNode e DataNode — os dois primeiros pertencem ao Map-Reduce e os dois últimos ao HDFS. Destacamos que uma arquitetura mestre-escravo foi proposta, tanto para o ambiente de execução (MapReduce), como para o sistema de ar-quivos distribuído (HDFS) — sendo os mestres JobTracker e NameNode responsáveis por coordenar os escravos TaskTracker e DataNode, respectivamente. O principal problema

(25)

CAPÍTULO 3. HADOOP 25

Figura 3.2: Arquitetura clássica do Hadoop (traduzido de [50])

desse tipo de arquitetura é o ponto único de falha: o mestre. A presença de tal vulne-rabilidade é algo muito ruim, pois é difícil para o sistema se recuperar em caso de falha desse mestre.

A arquitetura clássica do Hadoop é ilustrada na Figura 3.2. Nesse modelo, após o JobTracker dividir a aplicação em tarefas para distribuir seu processamento, ele cria um mapper por pedaço do arquivo de entrada e um número pré-configurado de tarefas de Reduce. Tarefas são designadas para os nós baseados no tamanho de suas filas internas. Além disso, quando uma tarefa de Map é designada para um nó, preferencialmente são escolhidos aqueles que contenham os blocos dos arquivos de entrada, em seguida nós no mesmo rack que os dados de entrada, por fim no mesmo cluster e mesmo datacenter. Chama-se a esta otimização de localidade dos dados [6]. Os TaskTrackers além de realizar a computação em si, enviam mensagens de heartbeat para o JobTracker regularmente. Essas mensagens transportam informações sobre a porcentagem de pedaços da entrada que já foram processados pelas tarefas locais (se alguma estiver sendo executada). Além disso, sinalizam quando uma tarefa finaliza e alertam o JobTracker sobre condições de erros. Respostas a heartbeats podem carregar novas tarefas para rodar no nó.

Uma mudança arquitetural mostrou-se necessária à medida que o Hadoop passou a ser amplamente adotado em grandes clusters. Com mais de 4000 nós, o MapReduce mostrou-se não escalável [17]. O principal gargalo era o JobTracker e foi por isso que um grupo do Yahoo trabalhou em um novo modelo arquitetural denominado YARN (ou MapReduce 2), que significa “Yet Another Resource Negotiator”. A maior mudança no YARN foi a divisão das responsabilidade do JobTracker entre duas novas entidades: o Resource-Manager e o ApplicationMaster. O primeiro gerencia os recursos do cluster, enquanto o segundo gerencia o ciclo de vida da aplicação. Além disso, introduziu-se o conceito de

(26)

CAPÍTULO 3. HADOOP 26

containers, divisão lógica de memória e CPU dos nós, utilizada pelo ResourceManager na alocação de recursos. A responsabilidade de gerenciar os containers é de uma nova entidade denominada NodeManager.

Figura 3.3: Arquitetura do YARN (traduzido de [17])

A arquitetura do YARN é ilustrada na Figura 3.3, onde duas aplicações (representadas pelas cores preto e cinza) estão sendo executadas em um mesmo cluster. No YARN, a execução de uma aplicação de MapReduce se inicia quando um cliente do cluster sub-mete uma aplicação ao ResourceManager, em seguida, o escalonador aloca um container para executar o ApplicationMaster que obtém a entrada necessária para sua execução. Ele então negocia com o ResourceManager por mais containers repassando informações sobre a localidade dos dados. Além disso, o ResourceManager também recebe dados a respeito da quantidade de memória e CPU necessários para a execução das tarefas para serem utilizadas no escalonamento. Depois que o escalonador mapeia tarefas a recursos, o ApplicationMaster inicia os containers através de uma requisição ao NodeManager que busca os dados necessários para realizar a execução propriamente dita da tarefa.

3.2

Escalonadores

Hadoop começou com uma abordagem bastante simplificada para seu escalonador, o cha-mado escalonador “First In First Out” (FIFO) fez parte da primeira solução que o sistema adotou. Normalmente, utilizando-se da política FIFO, cada aplicação tem 100% do clus-ter dedicado a ela durante sua execução. Além disso, enquanto uma aplicação está sendo executada as outras devem aguardar em uma fila ordenada pela sua ordem de submissão. Claramente, esse escalonador não é ideal para casos com múltiplos usuários tendo acesso ao mesmo tempo aos recursos computacionais do cluster. É muito comum que os clusters com Hadoop tenham a necessidade de suportar configurações multiusuários.

(27)

CAPÍTULO 3. HADOOP 27

A primeira ideia para auxiliar a execução das aplicações em ambientes compartilhados foi a inserção de prioridades às aplicações do Hadoop. Assim, cada aplicação poderia rece-ber um parâmetro complementar no momento de sua submissão indicando qual seu nível de prioridade. As possibilidades indo da mais prioritária até a menos prioritária eram: VERY_HIGH, HIGH, NORMAL, LOW e VERY_LOW. Mesmo assim, com o escalona-dor FIFO do Hadoop, preempção não é algo suportado. A falta desse mecanismo acarreta na possibilidade de uma aplicação de baixíssima prioridade tomar para si todos os recur-sos do cluster fazendo com que uma aplicação de altíssima prioridade necessite aguardar seu término. Basta que a mais prioritária seja submetida após a menos prioritária.

Dois escalonadores surgiram no Hadoop para tratar clusters multiusuários, são eles o Fair Scheduler e o Capacity Scheduler. Nas próximas seções vamos apresentar ambos com maiores detalhes.

3.2.1

Fair Scheduler

O principal objetivo do Fair Scheduler é fornecer a cada usuário uma fatia justa dos re-cursos do cluster, para isso se utiliza do conceito de pools [39]. Sendo assim, se em uma pool que utilize uma distribuição Fair dos recursos, em dado momento uma aplicação estiver sendo executada sozinha, ela deve estar consumindo 100% dos recursos disponí-veis, porém se duas estiverem sendo executadas, 50% dos recursos devem ser dedicados a cada uma delas, e assim por diante. Suponha que 100% dos recursos estão destinados à aplicação A e então a aplicação B é submetida na mesma pool, a passagem dos recursos antes destinados a A para B é feita de maneira incremental, ou seja, à medida que tarefas de A vão terminando, o escalonador garante que recursos agora vagos sejam atribuídos às tarefas de B. Esse processo continua até que a quantidade justa de recursos tenha sido destinada a cada aplicação, ou seja, dentro de uma mesma pool não há preempção. Com essa distribuição, ambas estarão progredindo com o passar do tempo sem que uma neces-site aguardar o término da outra. Esse é o principal diferencial que o Fair Scheduler traz em relação ao FIFO Scheduler e é o que o torna uma opção para clusters multiusuários.

Figura 3.4: Divisão de recursos utilizando-se o Hadoop Fair Scheduler

A estrutura que auxilia a coordenação das aplicações pelo Fair Scheduler são suas pools e entre elas, inclusive suporta-se preempção. Cada usuário possui uma pool e os recursos são divididos igualmente dentre as pools que possuem aplicações em execução. Cada pool, por sua vez, pode dividir seus recursos dentre as aplicações que contêm utilizando-se de uma política Fair ou FIFO. Além disso, esutilizando-se escalonador suporta preempção entre diferentes pools, então se uma pool estiver com menos recursos do que o justo para ela,

(28)

CAPÍTULO 3. HADOOP 28

tarefas de aplicações em outras pools serão mortas para garantir que a primeira receba seus recursos.

A Figura 3.4 ilustra uma divisão de recursos do cluster onde dois usuários submete-ram 5 aplicações para execução. O primeiro usuário submeteu duas aplicações (A e B), enquanto o segundo submeteu três (C, D e E). Perceba que cada usuário ganha 50% dos recursos do cluster que por sua vez são divididos com suas respectivas aplicações. No final, teremos as aplicações A e B com 25% do total de recursos cada e C, D e E com 16.6666% aproximadamente, caso Fair também esteja sendo utilizado para a distribuição dos recursos internos das pools.

3.2.2

Capacity Scheduler

Com o Capacity Scheduler a abordagem de escalonamento é um pouco diferente quando comparada a do Fair Scheduler. Também é muito apropriado para ambientes multiusuá-rios e nele o cluster é dividido em filas [38]. Cada fila pode possuir filas filhas, formando uma árvore com configurações e características compartilhadas. Cada uma dessas filas tem uma capacidade destinada a ela, ou seja, uma porcentagem dos recursos do cluster, podendo ser um intervalo de minímo e máximo percentual.

Até o momento o Capacity e o Fair Scheduler têm estruturas análogas e muito simila-res, no primeiro temos as filas enquanto no último as pools. Porém, nas filas do Capacity Scheduler uma lógica FIFO com prioridades é utilizada para selecionar qual a próxima aplicação a ser executada. A Figura 3.5 ilustra esse comportamento.

Figura 3.5: Divisão de recursos com o Hadoop Capacity Scheduler

Esse escalonador é muito utilizado e é o padrão na versão estudada do Hadoop (2.4.1). É muito útil quando os usuários querem enxergar a sua fila de forma a simular um cluster completo onde FIFO é a política de escalonamento. Esse é um cenário comum às orga-nizações que utilizam Hadoop [19]. Diferentemente do Fair Scheduler, não ocorre nesse caso o compartilhamento dos recursos internos à pool, cada fila é independente.

(29)

CAPÍTULO 3. HADOOP 29

3.3

Preempção de tarefas

Tipicamente se trabalha com dois tipos de aplicações no Hadoop, as de produção e as de pesquisa. Essa heterogeneidade de prioridades criou a necessidade do suporte à preempção nos escalonadores do Hadoop.

Na versão estudada do Hadoop (2.4.1), a preempção de uma tarefa corresponde a finalizar a sua execução imediatamente para liberar seus recursos para serem ocupados por uma nova tarefa (de uma aplicação de maior pioridade). Nenhum tipo de estado é salvo, por isso, todo o trabalho já realizado por aquela tarefa é perdido e quando ela voltar a ser executada precisará ser refeito.

Em condições convencionais, o retrabalho realizado quando as tarefas de uma aplicação de menor prioridade retomarem suas execuções pode não significar grande parte do tempo de processamento. Porém, em casos com tarefas de longa duração isso pode representar um problema grave. Para exemplicar isso, imagine um cluster onde uma aplicação de prioridade baixa está sendo executada e que ela seja composta de tarefas de longa duração. Agora, imagine que nesse mesmo cluster, aplicações de maior prioridade passem a ser submetidas com frequência. Uma perda de recursos significativa poderá ser observada a cada chegada de uma aplicação de maior prioridade, pois toda a computação realizada pelas tarefas da aplicação de menor prioridade que forem suspensas não será reaproveitada. Para piorar, como essas tarefas são longas, isso consistirá em um desperdício grande de tempo gasto em computar dados que anteriomente já haviam sido processados.

(30)

Capítulo 4

Disco

Enquanto o Yahoo olhava para a implementação de MapReduce proposta pelo Google e in-vestia no Hadoop, outras empresas também resolveram investir no paradigma MapReduce e criaram outros projetos de código aberto. Esse foi o caso da Nokia, que desenvolveu em 2008 em seu centro de pesquisas em Palo Alto um projeto chamado Disco [13]. Trata-se de uma implementação simples e leve que também suporta a computação paralela sobre grandes quantidades de dados em clusters de máquinas convencionais. Quando dizemos que o Disco é simples isso na verdade é uma qualidade do sistema, pois é muito mais flexível e ao invés de tentar resolver todos os problemas relacionados a computação distri-buída internamente, ele delega muito disso à máquina virtual do Erlang. Veja a Tabela 4.1 comparando o tamanho dos projetos Disco e Hadoop em termos de arquivos e linhas de código para entender melhor o quão mais fácil é implementar algo novo no Disco.

A ferramenta é muito utilizada em atividades como análise de logs, modelagem proba-bilística, mineração de dados e indexação de textos. O núcleo principal do Disco é escrito em Erlang, uma linguagem de programação funcional famosa pelas suas funcionalida-des associadas a sistemas distribuídos como imutabilidade, um modelo de concorrência simplificado e a facilidade na comunicação entre as máquinas do cluster que estiverem executando sua máquina virtual. Sobre essa sólida fundação, o Disco pode fornecer ao programador um ambiente que cuide de entraves técnicos da execução de aplicações Map-Reduce, como protocolos de comunicação, balanceamento de carga, escalonamento das tarefas e tolerância a falhas.

Os programadores, usualmente, utilizam Python para escrever suas aplicações. Lin-guagem simples e poderosa, com um ambiente rico de bibliotecas para computação ci-entífica, estatística e de análise de dados, muito apropriada para aplicações MapReduce. Além disso, o projeto almeja permitir que o programador que não se sinta confortável com Python também possa escrever aplicações na linguagem de sua preferência, para isso

Tabela 4.1: Comparação de tamanhos dos projetos Hadoop e Disco

Hadoop Disco

As duas linguagens mais utilizadas Java e XML Erlang e Python Quantidade de arquivos nessas duas linguagens 6.474 190

Quantidade de linhas de código nessas duas linguagens 1.688.407 19.134

(31)

CAPÍTULO 4. DISCO 31

fornece uma abstração chamada worker para lidar com a comunicação de seu núcleo com a aplicação MapReduce.

Assim como o HDFS atua no Hadoop como seu sistema de arquivos distribuídos para auxiliar o ambiente de execução com a persistência dos dados, o Disco possui seu equi-valente, o DDFS (Disco Distributed Filesystem). Esse sistema de arquivos se utiliza do conceito de tags ao invés de diretórios para mapear a localização de seus arquivos e também fornece alta disponibilidade e confiabilidade ao cluster.

Nas próximas seções vamos no aprofundar um pouco mais no Disco. Vamos analisar sua arquitetura (Seção 4.1), o protocolo do worker (Seção 4.2), o pipeline (Seção 4.3) e as políticas de escalonamento (Seção 4.4).

4.1

Arquitetura

A arquitetura do Disco estudada neste trabalho é a da versão 0.5.4. Nessa versão, exa-tamente da mesma maneira que o Hadoop clássico (antes do YARN), um padrão de mestre-escravo é utilizado para organizar as máquinas do cluster. Nessa arquitetura o mestre é responsável por duas atividades principais:

• inicialização de todos os componentes do sistema como o mecanismo de log, a in-terface Web e os workers que são executados nos nós escravos;

• escalonamento, monitoramento e alocação dos recursos para as tarefas de novas aplicações submetidas pelos clientes.

O mestre precisa de poucas configurações para inicializar seus escravos. O administra-dor do cluster apenas deve informar via interface adminstrativa quantos workers cada nó irá executar e quais os hostnames desses nós. O mais usual é configurar cada host para utilizar um worker por core de CPU que possua. Após essa configuração, o mestre usa os hostnames para inicializar um escravo único em cada nó escravo através da máquina virtual do Erlang. Os nós escravos serão responsáveis por executar e monitorar a execução das tarefas especificadas pelas aplicações submetidas pelo cliente. Além disso, os escra-vos são as unidades de armazenamento utilizadas pelo DDFS [30]. Os administradores do cluster podem adicionar e remover nós escravos em tempo real via a interface Web administrativa do Disco.

A decisão de utilizar a máquina virtual do Erlang se mostra uma ótima escolha quando se observa a maneira como os nós interagem uns com os outros. As conexões entre os mes-tres e os escravos são monitoradas e mantidas pela infraestrutura de monitoramento do Erlang. Como a linguagem e sua máquina virtual foram desenhadas especialmente para trabalhar com sistemas distribuídos, existem funcionalidades muito úteis que proveem de maneira transparente o gerenciamento de comunicação, tolerância a falhas, alta disponi-bilidade e escaladisponi-bilidade. Dessa maneira, quando um nó falha, o mestre é notificado e pode rescalonar as tarefas deste nós em outro.

Um resumo da arquitetura mestre-escravo do Disco e a forma como a comunicação entre os nós é feita são ilustrados pela Figura 4.1, em que quadrados cinzas denotam os componentes principais (mestre, escravos e workers).

(32)

CAPÍTULO 4. DISCO 32

Figura 4.1: Visão geral da arquitetura do Disco (traduzido de [13])

4.2

Protocolo do worker

Um dos objetivos da arquitetura do Disco é permitir que aplicações escritas em qualquer linguagem sejam executáveis em seu ambiente MapReduce. Para atingir esse objetivo de design, existe um protocolo de comunicação entre os nós escravos do Disco e seus wor-kers. Hoje em dia, além de Python existem implementações do protocolo do worker em OCaml [52], Golang [36], LFE [48], e Haskell [41]. Isso dá aos programadores a possi-bilidade de utilizar a linguagem de programação com a qual se sintam mais confortáveis na hora de programar sua aplicação MapReduce. O protocolo se utiliza do descritor de arquivos standard error (stderr) para mensagem do worker para o nó escravo (caracteri-zado por estar executando a máquina virtual Erlang com Disco) e o descritor de arquivos standard input (stdin) para mensagem do nó escravo ao worker. O fluxo de mensagens é sempre inicializado pelo worker e as mensagens principais do protocolo são:

WORKER: anuncia o início da execução;

TASK: requisita informações a respeito da tarefa; INPUT: requisita as entradas para a tarefa;

INPUT_ERR: avisa que ocorreram falhas enquanto as entradas estavam sendo requi-sitadas;

MSG: utilizada para mensagens gerais que serão apresentadas na interface Web, por exemplo;

(33)

CAPÍTULO 4. DISCO 33

OUTPUT: indica que a saída será gravada; DONE: o worker finalizou sua execução;

ERROR: falha com a entrada ou erro transitório; FATAL: erro fatal;

PING: mensagem heartbeat (utilizada para garantir que o mestre está funcional e exe-cutando);

O seguinte diagrama, ilustrado pela Figura 4.2 mostra algumas dessas trocas de men-sagens para o início da execução de uma tarefa.

Figura 4.2: Troca de mensagens do protocolo Worker para execução de uma tarefa ([13])

4.3

Pipeline

O pipeline no Disco é uma abstração adicionada para permitir que o Disco execute apli-cações além de MapReduce, ou seja, que respeite outros paradigmas. Trata-se de uma divisão lógica dos estágios computacionais que definem uma aplicação em duas etapas:

(34)

CAPÍTULO 4. DISCO 34

política de agrupamento seguida de função de tranformação dos dados. Um pipeline é uma sequência desses estágios sendo a saída de um estágio a entrada do próximo como ilustra a Figura 4.3.

Figura 4.3: Pipeline do Disco: abstração baseada em uma sequência de estágios Um estágio consiste de um conjunto de tarefas que executam a mesma computação, porém em diferentes entradas. Por exemplo, um estágio de Map consiste de um conjunto de tarefas Map, cada uma recebe uma entrada única, enquanto um estágio de Reduce tipicamente consiste de uma única tarefa Reduce que processa todas as saídas de Map no estágio de Reduce.

Uma política de agrupamento é uma operação que especifica como as entradas para um estágio devem ser divididas e agrupadas. O agrupamento de um conjunto de entradas é executado utilizando uma informação que o Disco permite ser adicionada a cada entrada chamada de label.

As saídas geradas pelas tarefas em um estágio são as entradas para as tarefas do próximo estágio na sequência. Sempre que uma tarefa gera uma saída ela adiciona um label a ela. Utilizando-se dessa informação a política de agrupamento sabe como operar sobre as saídas para torná-las a entrada do próximo estágio.

Em outras palavras, um pipeline é uma sequência de estágios e cada estágio executa um agrupamento de suas entradas demarcadas com um label gerando um ou mais grupos. Esses grupos, por sua vez, são as entradas a serem processadas por uma tarefa cuja definição também compõe o próximo estágio no pipeline.

Existem cinco diferentes políticas de agrupamento: split, group_node, group_label, group_node_label e group_all. Cada uma dessas políticas recebe um conjunto de entra-das e as agrupa utilizando-se de duas informações importantes: os labels e a localidade do nó do cluster onde a entrada está armazenada. Vamos detalhar cada uma dessa políticas iniciando nossa explicação por aquelas que não dependem da informação referente ao nó do cluster onde a entrada está armazenada.

4.3.1

Split

A operação de split simplesmente recebe a entrada e independentemente do label a repassa a tarefa para processá-la. A Figura 4.4 ilustra tal comportamento, onde os retângulos

(35)

CAPÍTULO 4. DISCO 35

maiores com bordas pretas representam os nós do cluster onde a entrada está armazenada, enquanto as cores das entradas representam seus labels (cinza claro, cinza escuro e preto, por exemplo) e os círculos representam as tarefas que irão utilizar as entradas.

Figura 4.4: Política de agrupamento: split ([54])

4.3.2

Group_all

A política group_all coloca todas as entradas em um único grupo, independentemente de seus labels ou nós onde estão armazenadas. A Figura 4.5 ilustra essa técnica.

(36)

CAPÍTULO 4. DISCO 36

4.3.3

Group_label

A política group_label, por sua vez, leva em consideração os labels das entradas e não os nós onde estão armazenadas para agrupá-las. Cada label diferente se torna um grupo que conterá as saídas correspondentes como mostra a Figura 4.6

Figura 4.6: Política de agrupamento: group_label ([54])

4.3.4

Group_label_node

A política group_label_node ilustrada na Figura 4.7 tem basicamente o comportamento da política group_label apresentada anteriormente, mas levando em consideração a loca-lidade dos dados. Nesse caso, o agrupamento ocorre por label porém internamente em cada nó. Perceba que um estágio utilizando-se de tal política pode ser útil em uma fase in-termediária entre as fases de Map e Reduce para condensar os dados antes de transferi-los pela rede.

(37)

CAPÍTULO 4. DISCO 37

4.3.5

Group_node

Essa política equivale a group_all, porém realizada internamente em cada nó também, assim como a anterior. A Figura 4.8 a ilustra.

Figura 4.8: Política de agrupamento: group_node ([54])

4.3.6

MapReduce como um pipeline

Por fim, vamos ver como expressar uma computação MapReduce segundo essa lógica de pipeline do Disco.

O estágio de Map pode ser representado pelo agrupamento do tipo split, onde cada entrada é repassada diretamente para a função (no caso, à função Map). O label dado pela função da Map as suas saída deve ser calculado de maneira a distribuir os grupos de chaves entre os reducers pois o estágio de Reduce, por sua vez, pode ser representado como um agrupamento do tipo group_label. Nesse tipo de agrupamento, a informação referente ao label é utilizada para agrupar as entradas antes de repassá-las à função (no caso a função Reduce).

Note que a quantidade de labels nesse caso será igual à quantidade de reducers. Note também que a operação de agrupamento group_label é o que muitas vezes chamamos de Shuffle no MapReduce. A quantidade de dados trafegados pode ser otimizada utilizando-se de mais estágios intermediários entre o estágio de Map e de Reduce, como o group_-label_node, por exemplo.

A Figura 4.9 ilustra essa computação de MapReduce descrito dentro das regras do pipeline.

4.4

Políticas de escalonamento

O escalonador do Disco possui um mecanismo bem definido e facilmente adaptável de políticas de escalonamento. Duas políticas de escalonamento são compiladas junto com o código do Disco por padrão: FIFO e Fair.

A política FIFO trabalha com uma fila simples onde a primeira aplicação será prova-velmente a única em execução em um dado momento. Ela se apossa de todos os recursos

(38)

CAPÍTULO 4. DISCO 38

Figura 4.9: MapReduce segundo as regras do pipeline

disponíveis do cluster e executa até seu término enquanto as outras devem aguardar. Com a política FIFO, quando existem recursos disponíveis a primeira aplicação da fila é sempre priorizada para recebê-los. Caso ela não possua mais tarefas prontas para serem executadas, a próxima aplicação da fila é selecionada pelo escalonador para ocupar os recursos com suas tarefas.

Por sua vez, a política Fair tenta dividir o cluster em fatias iguais, fornecendo às apli-cações que estiverem sendo executadas a mesma quantidade de recursos em média com o passar do tempo. Para implementar tal comportamento, o escalonador Fair circula dentre as aplicações em execução quando existirem recursos disponíveis no cluster. Ele também mantêm um cálculo de déficit, ou seja, de quanto tempo uma aplicação foi injustiçada com menos recursos do que deveriam ter sido destinados a ela porque outras estavam com uma fatia maior do que deveriam. O objetivo de manter tal cálculo é compensar as apli-cações que ficarem desfavorecidas com o passar do tempo. A política Fair padrão sempre tenta não possuir recursos inutilizados enquanto existirem tarefas a serem escalonadas. Um demonstrativo dessa divisão pode ser visto na Figura 4.10.

Ambas as políticas possuem problemas quando tentam trabalhar com um carga de trabalho composta de aplicações de pesquisa e de produção. FIFO não prioriza a divisão dos recursos do cluster entre as aplicações em execução, dessa forma, se a aplicação de produção for submetida após a de pesquisa teremos problemas. A aplicação de produção só poderá executar depois de um longo período de espera, porque a aplicação de pesquisa se apossa de todos os recursos do cluster pelo tempo que precisar, porém a aplicação de produção possui uma prioridade maior, por isso essa distribuição dos recursos não é ideal. Por outro lado, Fair também não se sai muito melhor com tal carga de trabalho. Isso porque, Fair não possui o conceito de preempção das tarefas em execução, dessa forma, se a aplicação de pesquisa se apossa de todos os recursos do cluster e possui tarefas longas, somente após o término dessas tarefas é que a aplicação de produção poderá iniciar sua execução.

Outro problema pode ocorrer no escalonador Fair do Disco entre transições de fases na aplicação de produção. Nesse contexto, chamamos de transição de fases o estágio entre o fim da fase de Map e início da fase de Shuffle ou entre o fim da fase de Shuffle e o começo da

(39)

CAPÍTULO 4. DISCO 39

Figura 4.10: Comparação da distribuição dos recursos do cluster para as políticas FIFO e Fair do Disco

fase de Reduce. Esse período é especial durante a execução da aplicação porque o mestre está calculando a quantidade de trabalho remanescente para a aplicação de produção. Caso não seja monitorado que a aplicação se encontra em uma transição de fases, durante tal período pode ocorrer de o escalonador se perder durante a distribuição dos recursos e fornecê-los novamente à aplicação de pesquisa, já que ele ainda não terminou de calcular a quantidade de trabalho remanescente para a aplicação de produção e por isso considera que o melhor é priorizar aquela que ele já sabe que possui trabalho remanescente (no caso a de pesquisa). Como a aplicação de pesquisa pode possuir tarefas muito longas, ela pode acabar por travar os recursos por um longo tempo para si, novamente impedindo que a aplicação de produção execute e finalize rapidamente sua execução prioritária.

Neste trabalho propomos uma nova política de escalonamento que utiliza preempção e atraso no escalonamento durante transições de fases para possibilitar a rápida execução de aplicações de produção nesse cenário de carga heterogênea do cluster.

(40)

Capítulo 5

Mecanismos Auxiliares aos

Escalonadores

Os escalonadores de MapReduce se valem de diversos mecanismos auxiliares para ajudar na tomada da decisão de escalonamento.

Nas próximas seções, vamos descrever em detalhes alguns mecanismos já relatados por outros pesquisadores que auxiliam os escalonadores de MapReduce a tomarem melhores decisões de distribuição das tarefas junto aos recursos do cluster. Nesse caso, melhores decisões podem ser aquelas que acelerem a execução de todas as aplicações ou que le-vem em conta a prioridade entre as aplicações para acelarar preferencialmente aquelas mais prioritárias. Além disso, para cada mecanismo descrito neste capítulo vamos comen-tar brevemente em qual dos frameworks MapReduce estudados nesta dissertação têm-se relatos de que o mecanismo foi implementado.

Neste capítulo, começaremos com o mecanismo de localidade de dados (Seção 5.1) e de tarefas especulativas (Seção 5.2), os dois mais populares e já descritos pelo Google no trabalho inicial sobre MapReduce [6]. Depois detalhamos os I-Files (Seção 5.3) e o mecanismo de preempção e checkpointing (Seção 5.4), assuntos co-relacionados, já que esses arquivos são estruturas de dados que auxiliam os serviços de checkpointing. Outros trabalhos, não menos importantes, mas para os quais não nos aprofundamos tanto fecham o capítulo (Seção 5.5).

5.1

Localidade de dados

Os três frameworks MapReduce que estudamos neste trabalho utilizam um mecanismo relacionado a localidade dos dados no cluster onde o ambiente de execução MapReduce está configurado para auxiliar no escalonamento das tarefas. A proposta desse mecanismo vem do artigo original do MapReduce [6] com o objetivo de reduzir a utilização de banda de rede, que é o recurso mais crítico para um cluster.

O conceito de localidade de dados está relacionado à proximidade dos dados. Não se trata de uma proximidade física essencialmente, mas sim de uma proximidade lógica. Por exemplo, um arquivo consegue ser mais rapidamente acessado por máquinas que compartilhem o mesmo switch de rede com o computador que o hospeda.

Referências

Documentos relacionados

Prova para o Ingresso no Curso de Mestrado – Primeiro semestre de 2016 No diagrama abaixo são esboçadas as sobreposições dos orbitais atômicos de valência de elementos do bloco

2.1.1 Este Edital é destinado a pessoas jurídicas, de direito público ou privado, com o mínimo de um ano em funcionamento e sem registro de inadimplência junto ao governo

Almanya'da olduğu gibi, burada da bu terimin hiçbir ayrım gütmeden, modern eğilimleri simgeleyen tüm sanatçılar için geçerli olduğu anlaşılıyor.. SSCB'de ilk halk

Na apresentação dos dados estatísticos, ficou demonstrada à todos os participantes a dimensão da pesquisa, abrangendo o setor produtivo como um todo, enfocando a produção

Como eles não são caracteres que possam ser impressos normalmente com a função print(), então utilizamos alguns comandos simples para utilizá-los em modo texto 2.. Outros

As pontas de contato retas e retificadas em paralelo ajustam o micrômetro mais rápida e precisamente do que as pontas de contato esféricas encontradas em micrômetros disponíveis

Código Descrição Atributo Saldo Anterior D/C Débito Crédito Saldo Final D/C. Este demonstrativo apresenta os dados consolidados da(s)

Importante também endurecer a fiscali- zação e punição, para que não haja venda de medicamentos que não são de acesso livre, sem prescrição médica.. O uso indevido