• Nenhum resultado encontrado

Processamento de dados em "tempo real"

N/A
N/A
Protected

Academic year: 2021

Share "Processamento de dados em "tempo real""

Copied!
47
0
0

Texto

(1)

Processamento

de dados

em "tempo real"

Eiti Kimura

QConSP19

(2)

IT Coordinator and Software Architect at Movile

Msc. in Electrical Engineering

Apache Cassandra MVP (2014/2015 e 2015/2016)

Cassandra Summit Speaker (2014 e 2015)

Strata Hadoop World Singapore Speaker (2016)

Spark Summit Speaker (2017)

RedisConf Speaker (2018)

Eiti Kimura

(3)
(4)

+1 Bilhão

De mensagens por mês

(5)
(6)

SUMÁRIO

Introdução ao Apache Spark

Nova API Structured Streaming

Caso de uso de uma aplicação de

processamento em "tempo real"

Lições aprendidas

(7)

Introdução ao Apache Spark

Apache Spark™ is a fast and general engine

for large-scale data processing.

(8)

Em que o Apache Spark é usado?

Processos de ETL

Consultas Interativas (SQL)

Análise de dados avançada

Machine Learning

(9)
(10)

Structured Stream do Spark

API de alto nível para desenvolvimento

de aplicações contínuas de

processamento de stream de dados,

integrando com storages de forma

consistente e tolerante a falhas.

(11)

Apache Spark Structured Stream

Nova

API de alto nível

• Junção de dados contínua com conteúdo estático

• Integrações com diversas fontes de dados

• Tolerante a falhas (

checkpoints

)

(12)

Processamento Batch com DataFrames

input = spark.read

.format("csv")

.load("source-path")

result = input

.select("device", "signal")

.where("signal > 15")

result.write

.format("parquet")

.save("dest-path")

Leitura de um arquivo CSV

Aplicação de filtros e seleção

(13)

Processamento Streaming com DataFrames

input = spark.

readStream

.format("csv")

.load("source-path")

result = input

.select("device", "signal")

.where("signal > 15")

result.

writeStream

.format("parquet")

.

start

("dest-path")

Leitura de um arquivo CSV Stream

Aplicação de filtros e seleção

Escrita em formato parquet Stream

Substitui

read

por

readStream

Código

não

muda!

Substitui

write

por

writeStream

(14)
(15)
(16)
(17)
(18)

Tipos de Entrada de Dados (

Input Sources

)

Distributed File

System, Dir

Apache Kafka

Stream

Socket

Connection

(19)

Modos de

Output,

saída de dados

Complete: todas as linhas resultantes do processamento

são direcionadas para saída de dados

Update: somente as linhas que sofreram alterações ao

longo da última execução

Append: somente as novas linhas geradas no

processamento

(20)

Tipos de Saída de Dados (

Output Sinks

)

Distributed

(21)

Tolerância a Falhas com Checkpoints

Checkpointing: gravação de

metadados (ex: offsets) em write

ahead logs em disco (S3/HDFS)

para recuperação em caso de

falhas.

(22)

Tratamento de dados desordenados

(23)

USE CASE

Processador de dados em tempo

quase real

(24)

Subscription &

Billing System a.k.a

(25)

+110 milhões

De transações por dia

4

Grandes

operadoras no

(26)
(27)
(28)
(29)
(30)
(31)
(32)

Amostra de Dados CSV (Gzipped)

838,2,5500000000,100015,"{""authCode"":""3215"",""transactionIdAuth"":""101170622042837 4938""}",SUBSCRIPTION_050,0.5,11,0,1,14,Subscription renew.,2017-07-18

13:22:59.518,,,19,PRE,false,Charge Fail. CTN[31984771092][PRE][GSM]: Without Credit.,,,,0,458,,engine2dc2,23,3,5,2017-07-18

13:22:59.544,,FE1952B0-571D-11E7-8A17-CA2EE9B22EAB,NT0359

838,2,5500000000,100008,"{""authCode"":""9496"",""transactionIdAuth"":""117170703192540 9718""}",SUBSCRIPTION_099,0.99,11,0,1,14,Subscription renew.,2017-07-18

13:22:58.893,,,19,PRE,false,Charge Fail. CTN[21976504467][PRE][GSM]: Without Credit.,,,,0,1074,,engine2dc2,24,3,5,2017-07-18

13:22:58.928,,3ADF36D0-6040-11E7-9619-A2D6E78E4511,NT0360

703,2,5500000000,100004,"{""authCode"":""6838"",""transactionIdAuth"":""118170706120694 8526""}",SUBSCRIPTION_299,2.99,11,0,1,14,Subscription renew.,2017-07-18

13:22:59.246,,,19,PRE,false,Charge Fail. CTN[84994640470][PRE][GSM]: Without Credit.,,,,0,748,,engine2dc2,24,3,5,2017-07-18 13:22:59.254, NT0299

(33)

Fonte de Entrada de Dados

val streamReader = spark.readStream .format("csv")

.option("header", false)

.option("mode", "DROPMALFORMED")

.schema(ReadSchemas.csvTransactionSchema) .load("hdfs://YOUR_PATH/20*/*/*/*.gz")

val conf = new SparkConf().setAppName("Structured Streaming")

val spark = SparkSession.builder()

.config(conf).getOrCreate()

fragmento de código Scala

(34)

Estrutura na definição de um

Schema

StructField(

"origin_id"

, IntegerType,

true

)

Nome do campo

Tipo do campo

(35)

Definição de Schema de Leitura de Dados

// the csv data schema

def csvTransactionLogSchema = StructType { StructType(Array(

StructField("id", StringType, true),

StructField("application_id", IntegerType, true),

StructField("carrier_id", IntegerType, true),

StructField("phone", StringType, true),

StructField("price", DoubleType, true),

StructField("origin_id", IntegerType, true), . . .

)) }

(36)

Processamento (Spark SQL API) v1

val query = streamReader

.withColumn("date", $"creation_date".cast("date"))

.withColumn("successful_charges", when($"transaction_status_id" === 2, 1)) .withColumn("no_credit", when($"transaction_status_id" === 0, 1).otherwise(0)) .withColumn("error", when($"transaction_status_id" === 3).otherwise(0))

.filter("carrier_id IN (1,2,4,5)")

.filter("transaction_status_id NOT IN (5, 6)") .filter("transaction_action_id IN (0, 1)") .withWatermark("creation_date", "3 hour")

.groupBy($"carrier_id", window($"creation_date", "5 minutes").as("window")) .agg($"carrier_id",

avg($"response_time").as("avg_response_time"),

sum($"successful_charges").as("successful_charges"), sum($"no_credit").as("no_credit"),

sum($"error").as("error"),

count($"carrier_id").as("total_attempts"))

select case when

filtering

(37)

Processamento (Spark SQL API) v2

streamReader

.withWatermark(

"creation_date"

,

"3 hour"

)

.createOrReplaceTempView(

"

transaction_temp_table

"

)

val streamReader = spark.readStream

.format("csv") .schema(ReadSchemas.csvTransactionSchema)

(38)

Processamento (Spark SQL) v2

val query : DataFrame = spark.sql( """

SELECT carrier_id, TO_DATE(creation_date) as record_date, HOUR(creation_date) as hour_of_day, WINDOW(creation_date, "5 minutes").start as start_date,

AVG(response_time) as avg_response_time , SUM(CASE

WHEN transaction_status_id = 2 THEN 1 ELSE 0

END) as successful_charges, SUM(CASE

WHEN transaction_status_id = 0 THEN 1 ELSE 0 END) as no_credit, count(carrier_id) as total_attempts FROM transaction_raw WHERE carrier_id IN (1,2,4,5) AND transaction_action_id IN (0, 1) AND transaction_status_id NOT IN (5, 6)

GROUP BY carrier_id, TO_DATE(creation_date), HOUR(creation_date), WINDOW(creation_date, "5 minutes").start

""")

SELECT carrier_id, TO_DATE(creation_date) as record_date, HOUR(creation_date) as hour_of_day, WINDOW(creation_date, "5 minutes").start as start_date,

AVG(response_time) as avg_response_time , SUM(CASE

WHEN transaction_status_id = 2 THEN 1 ELSE 0

END) as successful_charges, SUM(CASE

WHEN transaction_status_id = 0 THEN 1 ELSE 0 END) as no_credit, count(carrier_id) as total_attempts FROM transaction_temp_table WHERE carrier_id IN (1,2,4,5) AND transaction_action_id IN (0, 1) AND transaction_status_id NOT IN (5, 6)

GROUP BY carrier_id, TO_DATE(creation_date), HOUR(creation_date), WINDOW(creation_date, "5 minutes").start

(39)

Fonte de Saída de Dados

val jdbcWriter = new JDBCSink(resource, username, password)

val foreachStream = query

.select($"carrier_id", $"date", $"hour_of_day", $"start_date", $"end_date") .writeStream

.foreach(jdbcWriter)

.outputMode(OutputMode.Update())

.trigger(Trigger.ProcessingTime("2 minute"))

.option("checkpointLocation", "hdfs://YOUR_PATH/checkpoint-complete/") .start

foreachStream.awaitTermination()

(40)

Result table

em consolidação

+--+---+---+---+---+---+---+---+---+---+

|id|record_date|hour |start |end |avg_resp_tm |success |no_credit|error|tot_att|

+--+---+---+---+---+---+-- ---+---+---+---+

|1 |2017-07-18 |13 |

13:20 |13:25

|618.8061297 | 4 |2607 |195 |2806 |

|2 |2017-07-18 |13 |13:20 |13:25 |1456.424283 | 13 |10912 |1503 |12428 |

|5 |2017-07-18 |13 |13:20 |13:25 |1161.730896 | 9 |2796 |532 |3337 |

|4 |2017-07-18 |13 |13:20 |13:25 |2950.642105 | 4 |1364 |54 |1425 |

+--+---+---+---+---+---+---+---+---+---+

Window (5 mins)

Consolidação

Watermark (ts)

Agregações

Operadoras

(41)

RESULTADOS OBTIDOS

60-90 mins

Processamento de Arquivos

via ETL persistência em

banco de analytics

3-5 mins

Processamento contínuo com

Apache Structured Stream

10-15 segundos

Consulta de consolidação

para exibição no Dashboard

< 1 segundo

Consulta para exibição no

Dashboard

ANTES

DEPOIS

mais rápido

~ 30x

(42)
(43)

de

Schema

EVOLUÇÃO

def csvSchema = StructType { StructType(Array(

StructField("id", StringType, true),

StructField("application_id", IntegerType, true),

StructField("carrier_id", IntegerType, true),

StructField("creation_date", TimestampType, true) ))}

(44)

Resiliência no processamento

com Streams

val

input = spark.readStream

.option(

"mode"

,

"DROPMALFORMED"

)

spark.

sqlContext

(45)

Performance Check

WARN ProcessingTimeExecutor:66 - Current batch

is falling behind. The trigger interval is 1000

milliseconds, but spent 19455 milliseconds

(46)

Apache Spark Releases

(2.3) Continuous Processing in Structured Streaming

Nova

engine de execução de queries sobre Streaming com

latência de sub-milissegundo de ponta-a-ponta

(2.4) Kafka Client upgrade

(47)

Obrigado

Perguntas?

github.com/eitikimura/structured-streaming

Referências

Documentos relacionados

A interação treinamento de natação aeróbico e dieta rica em carboidratos simples mostraram que só treinamento não é totalmente eficiente para manter abundância de

Estes resultados apontam para melhor capacidade de estabelecimento inicial do siratro, apresentando maior velocidade de emergência e percentual de cobertura do solo até os 60

Entendendo, então, como posto acima, propõe-se, com este trabalho, primeiramente estudar a Lei de Busca e Apreensão para dá-la a conhecer da melhor forma, fazendo o mesmo com o

A variação do pH da fase móvel, utilizando uma coluna C8 e o fluxo de 1,2 mL/min, permitiu o ajuste do tempo de retenção do lupeol em aproximadamente 6,2 minutos contribuindo para

Este presente artigo é o resultado de um estudo de caso que buscou apresentar o surgimento da atividade turística dentro da favela de Paraisópolis, uma

Changes in the gut microbiota appears to be a key element in the pathogenesis of hepatic and gastrointestinal disorders, including non-alcoholic fatty liver disease, alcoholic

Muitas vezes o agricultor quer tirar a soja o quanto antes da lavoura, pois segundo Holtz e Reis (2013), o maior tempo de permanência da soja na lavoura, traz um aumento das

Coletaram-se informações referentes à habilitação pro- fissional dos docentes; suas fontes de conhecimentos sobre a Guerra do Contestado; seu conhecimento referente aos redutos