Spark, Hadoop e Microservices:
extraindo informações de milhões
de eventos
Abril 2017
Um pouco sobre o negócio da
SoundCloud
CREATORS querem um público.
▶ O aumento de listeners gera a oportunidade para que creators sejam ouvidos.
LISTENERS querem conteúdo original
▶ SoundCloud oferece diferente modelos de subscriptions - incluindo um modelo gratuito.
Crescimento dos CREATORS impulsiona o crescimento de LISTENING TIME.
Identidade Audiência Ganhar dinheiro
Uma plataforma, muitos requisitos
▶ Creators decidem como distribuir seu conteúdo:
● Monetização por território.
● Disponível somente para subscribers.
▶ Listeners utilizam SoundCloud app em diferentes
plataformas: IOs, Android, Web, ChromeCast.
▶ Em background, SoundCloud precisa entregar o
conteúdo para os usuários e extrair/armazenar métricas (de forma durável) das requisições.
Nosso foco hoje será a pipeline de
monetização da SoundCloud!
A monetização é derivada do
número de plays
Número de plays é importante:
zoom na track timeline
Como armazenar 37 segundos ouvidos de uma track?
Listener pode ou não continuar escutando a track.
`
Ao iniciar uma track um evento PlayEvent é emitido. A cada X segundos a track timeline emite um evento. message PlayEvent { required uint64 at = 1; optional string identifier = 2; }
message ListeningEvent {
required uint64 at = 1; optional string play_ref = 2; }
Conforme o progresso da track, ListeningEvent são emitidos. //////////////////////////////////////////////// //////////////////////////////////////////////// //////////////////////////////////////////////// //////////////////////////////////////////////// //////////////////////////////////// //////////////////////////////////// //////////////////////////////////// //////////////////////////////////// //////////////////////////////////// ////////////////////////////////////
Vamos abrir um parênteses e explorar
um pouco mais a estrutura de um
Um evento pode ser:
Um evento não é:
▶ Uma ação realizada por um usuário (sound_played). ▶ Uma ação realizada por um sistema
(user_assigned_to_experiment).
▶ Uma alteração de estado de um objeto persistente (user_updated).
▶ Logs operacionais que são examinados em caso de erros.
▶ Um objeto persistente (como uma track ou user).
Que nos permite:
▶ Emitir eventos.
▶ Consumir eventos em tempo real.
▶ Ler eventos de long-term storage.
▶ Desacoplar triggers de ações.
▶ Analisar comportamento de usuários e sistemas.
▶ Implementar funcionalidades orientada a dados.
O que podemos fazer com
eventos?
▶ Event type ▶ UUID ▶ Timestamp ▶ Encoding ▶ Protobuf-encoded de acordo com um schema Metadata Payload
Anatomia de um evento na
SoundCloud
message Event { required uint64 at = 1; optional string identifier = 2; }Fechando parênteses!
Vamos voltar para a timeline
emissora de eventos.
Ao iniciar uma track um evento
PlayEvent é emitido.
A cada X segundos a track timeline emite um evento:
ListeningEvent.
message PlayEvent {
required uint64 at = 1; optional string identifier = 2; }
message ListeningEvent {
required uint64 at = 1; optional string play_ref = 2; }
Conforme o progresso da track,
A duração de uma play é uma informação importante. Não
é preciso criar um evento para indicar que uma track não
está mais em execução.
Algo como StopEvent é desnecessário Essa variável é importante para
identificação de spam e para a regra de monetização
Esse comportamento da track
timeline permite:
Menos dependência de ações do listener: ▶ Não precisa apertar o botão .
Menos corner cases para serem tratados:
▶ Pause & Play em outra track.
▶ Aplicativo foi fechado ou parou de funcionar.
1
Em contrapartida:
Surge a necessidade de
agregar
ListeningEvent
por
play_ref. Também deve ser
feito um join do resultado da
message SoundPlayedEvent {
required uint64 at = 1; optional Context context = 2; optional User actor = 3; optional Playable sound = 4; optional uint32 duration = 5; } message PlayEvent { required uint64 at = 1; optional string id = 2; } message ListeningEvent { required uint64 at = 1; optional string play_ref = 2; }
Entender a estrutura dos dados é
essencial em Big Data. Vamos
continuar nessa linha e explorar um
pouco mais como esses dados são
Track DB User DB Upload de
novas tracks novos usuáriosRegistro de
Publisher DB Registro de novos publishers
?
Arquitetura com isolamento de
responsabilidades: Microservices
Track DB User DB Upload de
novas tracks novos usuáriosRegistro de
Publisher DB Registro de novos publishers
?
Funcionalidade com dependência
transversal de Microservices
Agora o relatório de uso pode ser construído sem dependência de microservices. A dependência está nos dados.
Desafio: não existe padronização no armazenamento. Exemplo: Relatório de uso direcionado para o Parceiro X
Track DB User DB Upload de
novas tracks novos usuáriosRegistro de
Publisher DB Registro de novos publishers
?
Dependência transversal dos
dados dos Microservices
Exemplo: Relatório de uso direcionado para o Partner X
precisa dos metadados da track, do usuário e do publisher Agora o relatório de uso pode ser construído sem dependência
da API dos microservices. A dependência está nos dados.
Track DB User DB Upload de novas tracks Registro de novos usuários Publisher DB Registro de novos publishers ?
Flush dos dados para o HDFS
Track DB User DB Upload de novas tracks Registro de novos usuários Publisher DB Registro de novos publishers ? ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// ///////////////////////// /////////////////////////
Track metadata User metadata Publisher metadata
Agora o relatório de uso pode ser construído sem dependência de microservices. A dependência é de dados e o acesso a
Consistência eventual dos dados
Agora imagina inconsistência
em $$$$
▶ Não significa que às vezes é consistente e às vezes não. Significa que mais cedo ou mais tarde será consistente - você não tem controle sobre isso.
▶ Algumas funcionalidades toleram uma fração de inconsistência - algoritmo de recomendação.
▶ O cálculo de royalties é baseado em plays. Se
os dados são inconsistentes o dinheiro também se torna inconsistente. Não existe meio dinheiro. Hello auditor!!!
Adotamos um processo de curadoria dos dados.
É importante identificar onde consistência é um requisito
para aumentar a confiabilidade e não afetar o desempenho
de serviços que toleram uma fração de inconsistência.
Validação de dados, preenchimento de dados obrigatórios que estão faltando -
consistência
Entenda como seus dados são gerados e como eles são consumidos. Transação distribuída (XA) não é
sua única opção para garantir consistência
Se o seu dado se transforma em dinheiro, sua pipeline precisa ser
Vocês se lembram dessa fórmula?
SoundPlayedEvent = PlayEvent +
HDFS
Agregar
ListeningEvent e fazer join com
PlayEvent Build SoundPlayedEvent (protobuf schema) message SoundPlayedEvent { required uint64 at = 1; optional Context context = 2; optional User actor = 3; optional Playable sound = 4; optional uint32 duration = 5; }
> hdfs dfs -ls /semantic-events/playduration/2017 Found 2 items
/semantic-events/playduration/2017/01-01 /semantic-events/playduration/2017/01-02
> hdfs dfs -ls /semantic-events/playduration/2017/01-01 Found 3 items /semantic-events/playduration/2017/01-01/part-r-00000 /semantic-events/playduration/2017/01-01/part-r-00001 /semantic-events/playduration/2017/01-01/part-r-00002 > hdfs dfs -cat /semantic-events/playduration/2017/01-01/part-r-00000 SEQ^F!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^A^ A)org.apache.hadoop.io.compress.SnappyCodec^@^@^@^@ϑ'^^2<B7><CA>Ⱥ^N< E5><91>...
Sequence File
Binary File
allPlays.map(play => Writer[EnrichedPlay].write(play) match {case Good(bytesWritable) => play.at -> bytesWritable
case Bad(errors) => sys.error(errors.toString)
})
Pipeline de monetização funciona
baseado em plays e metadados:
▶ Conteúdo monetizável: métricas das plays
▶ Relatórios de uso para parceiros em diferentes períodos: diários, semanais, mensais, etc.
▶ Relatório de músicas mais tocadas para agências de ranking.
Legacy pipeline
Usage reports
metrics loader
playsplays
plays Spam Filter audited-playsplaysplays
Enricher playsplays subscriptions playsplays track-rules-set playsplays enriched-plays Aggregator playsplays aggregated-plays Usage reports Repoting DB monetizable-usage Royalties Calculator $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ $$$ User snapshot Publisher snapshot
Gerar CSV Filtrar / Agrupar / Agregar plays de um mês HDFS Filtrar / Agrupar / Agregar plays de um mês Filtrar / Agrupar / Agregar plays de um mês
Gerar CSV
Filtrar / Agrupar / Agregar plays de
um mês HDFS
Gerar DDEX Gerar XML
val plays = PlayAggregator .group(monetizablePlays)
Gerar CSV
HDFS
Gerar DDEX Gerar XML Job relatório de uso Publishers Ler dados e agrupar por quadrimestre Gerar CSV message PlayAggregation { required uint64 id = 1; optional string name = 2; } Filtrar / Agrupar / Agregar plays de um mês Job relatório de uso Partner X Job relatório de uso Label Y
Agregar plays não é suficiente.
É preciso enriquecer as plays.
Um evento de play é um dado muito
simples. Ele não contém informação
suficiente para ser utilizado como input da pipeline de monetização. A play é um simples “agregador de ids”. Mas é através de uma play que montamos
Renovação de assinatura mensal Alteração de copyright de uma track HDFS HDFS HDFS Alteração de policy de uma track Diferentes microservices geram diferentes tipos de dados em um ambiente persistente.
Renovação de assinatura mensal Alteração de copyright de uma track HDFS HDFS HDFS Alteração de policy de uma track /events/2017/01-01/01_00/plays_audited /events/2017/01-01/01_00/track_rightsholders_changed /events/2017/01-01/01_00/subscription_activated /events/2017/01-01/01_00/track_ruleset_changed
Track DB Play Enricher Job HDFS User DB Publisher service HDFS message RichPlay {
required Play play = 1; optional Subscription subscription = 2; optional TrackMetadata track = 3; optional UserMetadata user = 4; optional PublihserMetadata publisher = 5; }
User DB Track DB
A tarefa de enriquecimento poderia
ser bem mais complicada
Play Enricher Job Audited Plays HDFS Publisher Service HTTP API
Track Metadata
User Metadata
Mas ela se torna mais fácil quando
os dados estão padronizados
Play Enricher Job Audited Plays HDFS Publisher Metadata
Relatório Label B
Relatório Label A Relatório Label N
???
Diferentes data sources mas um único
source of truth
Filtro e enriquecimento
dos dados para preparar dados - abastecer - para
diversos jobs
Possibilidade de utilizar o framework adequado
Relatório Label B
Relatório Label A Relatório Label N
???
Pipeline possui um ponto único de falha. Enricher
Play é o gargalo. Se ele
falha então o abastecimento dos dados falhará. Consequentemente todos os jobs dependentes também falharão.
X
Relatório Label B
Relatório Label A Relatório Label N
Cuidado com dados específicos na camada de dados comum. Dados
específicos podem impactar jobs que não dependem desse dado.
Quanto mais data
sources mais custoso é a operação de join.
Dado grande, mais custo para fazer shuffle
Projeções dos dados
Projeção está diretamente ligado com
seleção de dados. Por exemplo, dado uma
fonte de dados com muitos atributos (fat
data), selecione apenas o que é
estritamente necessário para realizar a
tarefa.
Job 2
Play
Múltiplas visões de play
Job 1 Play Play Enricher Job Play Enricher Job Play Enricher Job Enriched Play Job N ??? Play
...
select * from enriched_plays select play, subscription, userfrom enriched_plays
select play, subscription, track, user from enriched_plays
Job 1 Play Play Enricher Job Play Enricher Job Play Enricher Job Enriched Play Job N Play
...
Múltiplas visões de play
case class Play(
subs: Subscription, user: User)
case class Play(
subs: Subscription, user: User, track: Track)
case class Play(...)
???
Job 2
▶ Coesão: menos dados para ler então é mais fácil
para entender. Seja minimalista, selecione apenas o necessário.
▶ Performance: menos dados na memória - somente
os dados necessários são projetados. Portanto menos tráfego na rede quando realizar operações que exigem shuffle.
Por quê devo projetar meus dos
dados?
Vamos agora explorar um pouco
mais dos detalhes técnicos:
escolher o momento certo para
fazer shuffle.
Ainda dá tempo para falar sobre o
que é Spark?
▶ Engine para processamento de dados em larga escala - big data.
▶ Promessa: executar programas 100x mais rápido que o Hadoop MapReduce em memória ou 10x mais rápido no disco.
▶ Spark roda em cima do Hadoop - yarn, standalone ou em cloud services.
Conceito de Map / Reduce
D1 map() <key, value>
reduce() result
<key, value>
▶ map() lê dados e gera <key, value> - usuário define
a função de transformação.
▶ reduce() lê <key, value> e gera result - usuário
D1 map() D2 map() D3 map() Dn map() reduce() reduce() O 1 O n Fase de agrupamento (resultados intermediários)
Operações: Narrow x Wide
▶ Narrow operations: operações que não implicam
em transferência de dados entre nodes através da rede.
▶ Wide operations: operações que transferem
map, filter, union, join com dados co-partionados
groupByKey, join com dados não partionados
Partição Partição Partição Partição Partição Partição Partição Partição Partição Partição Partição Partição Executor Executor Executor Executor
Melhorando o desempenho do
seus Spark jobs
▶ Existem diversas variáveis que podem ser ajustadas em um spark job: número de executors, número de cores, memória, etc.
▶ Uma das variáveis mais importantes é o número de
partições. Controlar esse fator gera impacto direto
no gerenciamento de troca de dados através da rede - shuffle.
Redistribuição de dados:
Collapse vs Repartition
▶ Collapse: é uma operação narrow. Se você reduzir
de 1000 partições para 100 partições não haverá shuffle. Cada uma das novas 100 partições
reivindicarão 10 das partições atuais.
▶ Repartition: é uma operação wide. Redistribuí os
dados utilizando shuffle. Se você está reduzindo o número de partições, considere utilizar collapse.
Quando redistribuir meus dados?
val monetizablePlays = allPlays .filter(isMusic)
.filter(isLongPlay)
.filter(isPolicyMonetize)
.coalesce(???) // ou .repartition(???)
val tracks = allPlays
.map(keyByTrackId(_) -> 1L) .reduceByKey(_ + _)
val publishers = allPublishers .map(keyByTrackId(_))
.alignPartitioningTo(tracks) tracks.leftOuterJoin(publishers)
▶ Após realizar filtro: ▶ Antes de realizar join:
alinhar o número de partições dos datasets antes de aplicar o join
criar partições com tamanho entre 64 e 100 MB, conforme recomendado pela
Constante vigilância no crescimento
dos dados e parâmetros dos jobs
Os dados são dinâmicos. Os
parâmetros selecionados para
execução eficiente de um job no
período X podem ser ineficazes
Potencial problema:
Para construir o estado atual de uma entidade,
basta aplicar todos os eventos - em ordem
cronológica.
O tempo não para e os eventos também não. Quanto maior a volatilidade dos dados, mais cedo um job precisará ser ajustado. Previsibilidade da quantidade dos dados evitará maiores dores de cabeça.
Manter o comportamento previsível
da pipeline é uma batalha
Redução do efeito nocivo do tempo na pipeline. Geração de snapshots adequam-se a velocidade
de alteração dos dados.
Track Ruleset Snapshots
Jan Fev Mar Abr Mai Jun Jul Ago Set Out Nov Dez
Janela (deslizante) no tempo Quantidade máxima de eventos necessários Checkpoint do estado