• Nenhum resultado encontrado

Spark, Hadoop e Microservices: extraindo informações de milhões de eventos. Abril 2017

N/A
N/A
Protected

Academic year: 2021

Share "Spark, Hadoop e Microservices: extraindo informações de milhões de eventos. Abril 2017"

Copied!
64
0
0

Texto

(1)

Spark, Hadoop e Microservices:

extraindo informações de milhões

de eventos

(2)

Abril 2017

(3)

Um pouco sobre o negócio da

SoundCloud

(4)

CREATORS querem um público.

▶ O aumento de listeners gera a oportunidade para que creators sejam ouvidos.

LISTENERS querem conteúdo original

▶ SoundCloud oferece diferente modelos de subscriptions - incluindo um modelo gratuito.

Crescimento dos CREATORS impulsiona o crescimento de LISTENING TIME.

(5)

Identidade Audiência Ganhar dinheiro

(6)
(7)

Uma plataforma, muitos requisitos

Creators decidem como distribuir seu conteúdo:

Monetização por território.

● Disponível somente para subscribers.

Listeners utilizam SoundCloud app em diferentes

plataformas: IOs, Android, Web, ChromeCast.

Em background, SoundCloud precisa entregar o

conteúdo para os usuários e extrair/armazenar métricas (de forma durável) das requisições.

(8)

Nosso foco hoje será a pipeline de

monetização da SoundCloud!

(9)

A monetização é derivada do

número de plays

(10)

Número de plays é importante:

zoom na track timeline

Como armazenar 37 segundos ouvidos de uma track?

Listener pode ou não continuar escutando a track.

(11)

`

Ao iniciar uma track um evento PlayEvent é emitido. A cada X segundos a track timeline emite um evento. message PlayEvent { required uint64 at = 1; optional string identifier = 2; }

message ListeningEvent {

required uint64 at = 1; optional string play_ref = 2; }

Conforme o progresso da track, ListeningEvent são emitidos. //////////////////////////////////////////////// //////////////////////////////////////////////// //////////////////////////////////////////////// //////////////////////////////////////////////// //////////////////////////////////// //////////////////////////////////// //////////////////////////////////// //////////////////////////////////// //////////////////////////////////// ////////////////////////////////////

(12)

Vamos abrir um parênteses e explorar

um pouco mais a estrutura de um

(13)

Um evento pode ser:

Um evento não é:

▶ Uma ação realizada por um usuário (sound_played). ▶ Uma ação realizada por um sistema

(user_assigned_to_experiment).

▶ Uma alteração de estado de um objeto persistente (user_updated).

▶ Logs operacionais que são examinados em caso de erros.

▶ Um objeto persistente (como uma track ou user).

(14)

Que nos permite:

▶ Emitir eventos.

▶ Consumir eventos em tempo real.

▶ Ler eventos de long-term storage.

▶ Desacoplar triggers de ações.

▶ Analisar comportamento de usuários e sistemas.

▶ Implementar funcionalidades orientada a dados.

O que podemos fazer com

eventos?

(15)

Event type UUID Timestamp Encoding Protobuf-encoded de acordo com um schema Metadata Payload

Anatomia de um evento na

SoundCloud

message Event { required uint64 at = 1; optional string identifier = 2; }

(16)

Fechando parênteses!

Vamos voltar para a timeline

emissora de eventos.

(17)

Ao iniciar uma track um evento

PlayEvent é emitido.

A cada X segundos a track timeline emite um evento:

ListeningEvent.

message PlayEvent {

required uint64 at = 1; optional string identifier = 2; }

message ListeningEvent {

required uint64 at = 1; optional string play_ref = 2; }

Conforme o progresso da track,

(18)

A duração de uma play é uma informação importante. Não

é preciso criar um evento para indicar que uma track não

está mais em execução.

Algo como StopEvent é desnecessário Essa variável é importante para

identificação de spam e para a regra de monetização

(19)

Esse comportamento da track

timeline permite:

Menos dependência de ações do listener: ▶ Não precisa apertar o botão .

Menos corner cases para serem tratados:

▶ Pause & Play em outra track.

▶ Aplicativo foi fechado ou parou de funcionar.

1

(20)

Em contrapartida:

Surge a necessidade de

agregar

ListeningEvent

por

play_ref. Também deve ser

feito um join do resultado da

(21)

message SoundPlayedEvent {

required uint64 at = 1; optional Context context = 2; optional User actor = 3; optional Playable sound = 4; optional uint32 duration = 5; } message PlayEvent { required uint64 at = 1; optional string id = 2; } message ListeningEvent { required uint64 at = 1; optional string play_ref = 2; }

(22)

Entender a estrutura dos dados é

essencial em Big Data. Vamos

continuar nessa linha e explorar um

pouco mais como esses dados são

(23)

Track DB User DB Upload de

novas tracks novos usuáriosRegistro de

Publisher DB Registro de novos publishers

?

Arquitetura com isolamento de

responsabilidades: Microservices

(24)

Track DB User DB Upload de

novas tracks novos usuáriosRegistro de

Publisher DB Registro de novos publishers

?

Funcionalidade com dependência

transversal de Microservices

Agora o relatório de uso pode ser construído sem dependência de microservices. A dependência está nos dados.

Desafio: não existe padronização no armazenamento. Exemplo: Relatório de uso direcionado para o Parceiro X

(25)

Track DB User DB Upload de

novas tracks novos usuáriosRegistro de

Publisher DB Registro de novos publishers

?

Dependência transversal dos

dados dos Microservices

Exemplo: Relatório de uso direcionado para o Partner X

precisa dos metadados da track, do usuário e do publisher Agora o relatório de uso pode ser construído sem dependência

da API dos microservices. A dependência está nos dados.

(26)

Track DB User DB Upload de novas tracks Registro de novos usuários Publisher DB Registro de novos publishers ?

Flush dos dados para o HDFS

(27)

Track DB User DB Upload de novas tracks Registro de novos usuários Publisher DB Registro de novos publishers ? ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// /////////////////////////

Track metadata User metadata Publisher metadata

Agora o relatório de uso pode ser construído sem dependência de microservices. A dependência é de dados e o acesso a

(28)

Consistência eventual dos dados

Agora imagina inconsistência

em $$$$

▶ Não significa que às vezes é consistente e às vezes não. Significa que mais cedo ou mais tarde será consistente - você não tem controle sobre isso.

▶ Algumas funcionalidades toleram uma fração de inconsistência - algoritmo de recomendação.

▶ O cálculo de royalties é baseado em plays. Se

os dados são inconsistentes o dinheiro também se torna inconsistente. Não existe meio dinheiro. Hello auditor!!!

(29)

Adotamos um processo de curadoria dos dados.

É importante identificar onde consistência é um requisito

para aumentar a confiabilidade e não afetar o desempenho

de serviços que toleram uma fração de inconsistência.

Validação de dados, preenchimento de dados obrigatórios que estão faltando -

consistência

Entenda como seus dados são gerados e como eles são consumidos. Transação distribuída (XA) não é

sua única opção para garantir consistência

Se o seu dado se transforma em dinheiro, sua pipeline precisa ser

(30)

Vocês se lembram dessa fórmula?

SoundPlayedEvent = PlayEvent +

(31)

HDFS

Agregar

ListeningEvent e fazer join com

PlayEvent Build SoundPlayedEvent (protobuf schema) message SoundPlayedEvent { required uint64 at = 1; optional Context context = 2; optional User actor = 3; optional Playable sound = 4; optional uint32 duration = 5; }

> hdfs dfs -ls /semantic-events/playduration/2017 Found 2 items

/semantic-events/playduration/2017/01-01 /semantic-events/playduration/2017/01-02

(32)

> hdfs dfs -ls /semantic-events/playduration/2017/01-01 Found 3 items /semantic-events/playduration/2017/01-01/part-r-00000 /semantic-events/playduration/2017/01-01/part-r-00001 /semantic-events/playduration/2017/01-01/part-r-00002 > hdfs dfs -cat /semantic-events/playduration/2017/01-01/part-r-00000 SEQ^F!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^A^ A)org.apache.hadoop.io.compress.SnappyCodec^@^@^@^@ϑ'^^2<B7><CA>Ⱥ^N< E5><91>...

Sequence File

Binary File

allPlays.map(play => Writer[EnrichedPlay].write(play) match {

case Good(bytesWritable) => play.at -> bytesWritable

case Bad(errors) => sys.error(errors.toString)

})

(33)

Pipeline de monetização funciona

baseado em plays e metadados:

▶ Conteúdo monetizável: métricas das plays

▶ Relatórios de uso para parceiros em diferentes períodos: diários, semanais, mensais, etc.

▶ Relatório de músicas mais tocadas para agências de ranking.

(34)

Legacy pipeline

Usage reports

metrics loader

playsplays

plays Spam Filter audited-playsplaysplays

Enricher playsplays subscriptions playsplays track-rules-set playsplays enriched-plays Aggregator playsplays aggregated-plays Usage reports Repoting DB monetizable-usage Royalties Calculator $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ User snapshot Publisher snapshot

(35)

Gerar CSV Filtrar / Agrupar / Agregar plays de um mês HDFS Filtrar / Agrupar / Agregar plays de um mês Filtrar / Agrupar / Agregar plays de um mês

(36)

Gerar CSV

Filtrar / Agrupar / Agregar plays de

um mês HDFS

Gerar DDEX Gerar XML

val plays = PlayAggregator .group(monetizablePlays)

(37)

Gerar CSV

HDFS

Gerar DDEX Gerar XML Job relatório de uso Publishers Ler dados e agrupar por quadrimestre Gerar CSV message PlayAggregation { required uint64 id = 1; optional string name = 2; } Filtrar / Agrupar / Agregar plays de um mês Job relatório de uso Partner X Job relatório de uso Label Y

(38)

Agregar plays não é suficiente.

É preciso enriquecer as plays.

Um evento de play é um dado muito

simples. Ele não contém informação

suficiente para ser utilizado como input da pipeline de monetização. A play é um simples “agregador de ids”. Mas é através de uma play que montamos

(39)

Renovação de assinatura mensal Alteração de copyright de uma track HDFS HDFS HDFS Alteração de policy de uma track Diferentes microservices geram diferentes tipos de dados em um ambiente persistente.

(40)

Renovação de assinatura mensal Alteração de copyright de uma track HDFS HDFS HDFS Alteração de policy de uma track /events/2017/01-01/01_00/plays_audited /events/2017/01-01/01_00/track_rightsholders_changed /events/2017/01-01/01_00/subscription_activated /events/2017/01-01/01_00/track_ruleset_changed

(41)

Track DB Play Enricher Job HDFS User DB Publisher service HDFS message RichPlay {

required Play play = 1; optional Subscription subscription = 2; optional TrackMetadata track = 3; optional UserMetadata user = 4; optional PublihserMetadata publisher = 5; }

(42)

User DB Track DB

A tarefa de enriquecimento poderia

ser bem mais complicada

Play Enricher Job Audited Plays HDFS Publisher Service HTTP API

(43)

Track Metadata

User Metadata

Mas ela se torna mais fácil quando

os dados estão padronizados

Play Enricher Job Audited Plays HDFS Publisher Metadata

(44)

Relatório Label B

Relatório Label A Relatório Label N

???

Diferentes data sources mas um único

source of truth

Filtro e enriquecimento

dos dados para preparar dados - abastecer - para

diversos jobs

Possibilidade de utilizar o framework adequado

(45)

Relatório Label B

Relatório Label A Relatório Label N

???

Pipeline possui um ponto único de falha. Enricher

Play é o gargalo. Se ele

falha então o abastecimento dos dados falhará. Consequentemente todos os jobs dependentes também falharão.

X

(46)

Relatório Label B

Relatório Label A Relatório Label N

Cuidado com dados específicos na camada de dados comum. Dados

específicos podem impactar jobs que não dependem desse dado.

Quanto mais data

sources mais custoso é a operação de join.

Dado grande, mais custo para fazer shuffle

(47)

Projeções dos dados

Projeção está diretamente ligado com

seleção de dados. Por exemplo, dado uma

fonte de dados com muitos atributos (fat

data), selecione apenas o que é

estritamente necessário para realizar a

tarefa.

(48)

Job 2

Play

Múltiplas visões de play

Job 1 Play Play Enricher Job Play Enricher Job Play Enricher Job Enriched Play Job N ??? Play

...

select * from enriched_plays select play, subscription, user

from enriched_plays

select play, subscription, track, user from enriched_plays

(49)

Job 1 Play Play Enricher Job Play Enricher Job Play Enricher Job Enriched Play Job N Play

...

Múltiplas visões de play

case class Play(

subs: Subscription, user: User)

case class Play(

subs: Subscription, user: User, track: Track)

case class Play(...)

???

Job 2

(50)

Coesão: menos dados para ler então é mais fácil

para entender. Seja minimalista, selecione apenas o necessário.

Performance: menos dados na memória - somente

os dados necessários são projetados. Portanto menos tráfego na rede quando realizar operações que exigem shuffle.

Por quê devo projetar meus dos

dados?

(51)

Vamos agora explorar um pouco

mais dos detalhes técnicos:

escolher o momento certo para

fazer shuffle.

(52)

Ainda dá tempo para falar sobre o

que é Spark?

▶ Engine para processamento de dados em larga escala - big data.

▶ Promessa: executar programas 100x mais rápido que o Hadoop MapReduce em memória ou 10x mais rápido no disco.

▶ Spark roda em cima do Hadoop - yarn, standalone ou em cloud services.

(53)

Conceito de Map / Reduce

D1 map() <key, value>

reduce() result

<key, value>

map() lê dados e gera <key, value> - usuário define

a função de transformação.

reduce() lê <key, value> e gera result - usuário

(54)

D1 map() D2 map() D3 map() Dn map() reduce() reduce() O 1 O n Fase de agrupamento (resultados intermediários)

(55)

Operações: Narrow x Wide

Narrow operations: operações que não implicam

em transferência de dados entre nodes através da rede.

Wide operations: operações que transferem

(56)

map, filter, union, join com dados co-partionados

groupByKey, join com dados não partionados

Partição Partição Partição Partição Partição Partição Partição Partição Partição Partição Partição Partição Executor Executor Executor Executor

(57)

Melhorando o desempenho do

seus Spark jobs

▶ Existem diversas variáveis que podem ser ajustadas em um spark job: número de executors, número de cores, memória, etc.

Uma das variáveis mais importantes é o número de

partições. Controlar esse fator gera impacto direto

no gerenciamento de troca de dados através da rede - shuffle.

(58)

Redistribuição de dados:

Collapse vs Repartition

Collapse: é uma operação narrow. Se você reduzir

de 1000 partições para 100 partições não haverá shuffle. Cada uma das novas 100 partições

reivindicarão 10 das partições atuais.

Repartition: é uma operação wide. Redistribuí os

dados utilizando shuffle. Se você está reduzindo o número de partições, considere utilizar collapse.

(59)

Quando redistribuir meus dados?

val monetizablePlays = allPlays .filter(isMusic)

.filter(isLongPlay)

.filter(isPolicyMonetize)

.coalesce(???) // ou .repartition(???)

val tracks = allPlays

.map(keyByTrackId(_) -> 1L) .reduceByKey(_ + _)

val publishers = allPublishers .map(keyByTrackId(_))

.alignPartitioningTo(tracks) tracks.leftOuterJoin(publishers)

Após realizar filtro: Antes de realizar join:

alinhar o número de partições dos datasets antes de aplicar o join

criar partições com tamanho entre 64 e 100 MB, conforme recomendado pela

(60)

Constante vigilância no crescimento

dos dados e parâmetros dos jobs

Os dados são dinâmicos. Os

parâmetros selecionados para

execução eficiente de um job no

período X podem ser ineficazes

(61)

Potencial problema:

Para construir o estado atual de uma entidade,

basta aplicar todos os eventos - em ordem

cronológica.

O tempo não para e os eventos também não. Quanto maior a volatilidade dos dados, mais cedo um job precisará ser ajustado. Previsibilidade da quantidade dos dados evitará maiores dores de cabeça.

Manter o comportamento previsível

da pipeline é uma batalha

(62)

Redução do efeito nocivo do tempo na pipeline. Geração de snapshots adequam-se a velocidade

de alteração dos dados.

Track Ruleset Snapshots

Jan Fev Mar Abr Mai Jun Jul Ago Set Out Nov Dez

Janela (deslizante) no tempo Quantidade máxima de eventos necessários Checkpoint do estado

(63)
(64)

Estamos contratando:

Referências

Documentos relacionados

A glabela é quâsi quadrada, com- pletamente lisa e não apresenta a característica lobação, com expressão de Caiymene &#34; os lobos oculares ainda que mami- formes são um pouco

L'interprétation non-compositionnelle du diminutif peut être vérifiée dans les cas suivants: (1) lorsque l'interprétation non- compositionnelle du diminutif en

A área externa é composta por um pátio coberto e uma área descoberta com: quadra, horta, labirinto, playground, acesso a diferentes texturas no piso (folhagem,

Para compreendê-la, faz-se necessário revisitar os inúmeros autores críticos às operações urbanas ou à lógica dos grandes projetos urbanos, centrados na

Política Internacional – Sessão 1: “Teoria Política e Relações Internacionais” Local: Sala de Defesa AS-01 (Prédio de Humanidades II). Ciência política e

Agora, para completar, aca- ba de passar por uma reforma para garantir um ambiente con- fortável e aconchegante, seja para as compras cotidianas, seja para quem busca a padaria

A critério da empresa autorizada, poderá ser requerido aos participantes comprovar, por mais de um meio, a realização da compra de produtos ou serviços em lojas do

A tendência predominante, dada a polémica surgida acerca do aumento de risco de cancro mamário atribuível à terapêutica hormonal de substituição, é a de recomendar que esta