Processamento
de dados
em "tempo real"
Eiti Kimura
QConSP19
●
IT Coordinator and Software Architect at Movile
●Msc. in Electrical Engineering
●
Apache Cassandra MVP (2014/2015 e 2015/2016)
●Cassandra Summit Speaker (2014 e 2015)
●
Strata Hadoop World Singapore Speaker (2016)
●Spark Summit Speaker (2017)
●
RedisConf Speaker (2018)
Eiti Kimura
+1 Bilhão
De mensagens por mês
SUMÁRIO
●
Introdução ao Apache Spark
●
Nova API Structured Streaming
●
Caso de uso de uma aplicação de
processamento em "tempo real"
●
Lições aprendidas
Introdução ao Apache Spark
Apache Spark™ is a fast and general engine
for large-scale data processing.
Em que o Apache Spark é usado?
●
Processos de ETL
●
Consultas Interativas (SQL)
●
Análise de dados avançada
●
Machine Learning
Structured Stream do Spark
API de alto nível para desenvolvimento
de aplicações contínuas de
processamento de stream de dados,
integrando com storages de forma
consistente e tolerante a falhas.
Apache Spark Structured Stream
•
Nova
API de alto nível
• Junção de dados contínua com conteúdo estático
• Integrações com diversas fontes de dados
• Tolerante a falhas (
checkpoints
)
Processamento Batch com DataFrames
input = spark.read
.format("csv")
.load("source-path")
result = input
.select("device", "signal")
.where("signal > 15")
result.write
.format("parquet")
.save("dest-path")
Leitura de um arquivo CSV
Aplicação de filtros e seleção
Processamento Streaming com DataFrames
input = spark.
readStream
.format("csv")
.load("source-path")
result = input
.select("device", "signal")
.where("signal > 15")
result.
writeStream
.format("parquet")
.
start
("dest-path")
Leitura de um arquivo CSV Stream
Aplicação de filtros e seleção
Escrita em formato parquet Stream
Substitui
read
por
readStream
Código
não
muda!
Substitui
write
por
writeStream
Tipos de Entrada de Dados (
Input Sources
)
Distributed File
System, Dir
Apache Kafka
Stream
Socket
Connection
Modos de
Output,
saída de dados
●
Complete: todas as linhas resultantes do processamento
são direcionadas para saída de dados
●
Update: somente as linhas que sofreram alterações ao
longo da última execução
●
Append: somente as novas linhas geradas no
processamento
Tipos de Saída de Dados (
Output Sinks
)
Distributed
Tolerância a Falhas com Checkpoints
Checkpointing: gravação de
metadados (ex: offsets) em write
ahead logs em disco (S3/HDFS)
para recuperação em caso de
falhas.
Tratamento de dados desordenados
USE CASE
Processador de dados em tempo
quase real
Subscription &
Billing System a.k.a
+110 milhões
De transações por dia
4
Grandes
operadoras no
Amostra de Dados CSV (Gzipped)
838,2,5500000000,100015,"{""authCode"":""3215"",""transactionIdAuth"":""101170622042837 4938""}",SUBSCRIPTION_050,0.5,11,0,1,14,Subscription renew.,2017-07-18
13:22:59.518,,,19,PRE,false,Charge Fail. CTN[31984771092][PRE][GSM]: Without Credit.,,,,0,458,,engine2dc2,23,3,5,2017-07-18
13:22:59.544,,FE1952B0-571D-11E7-8A17-CA2EE9B22EAB,NT0359
838,2,5500000000,100008,"{""authCode"":""9496"",""transactionIdAuth"":""117170703192540 9718""}",SUBSCRIPTION_099,0.99,11,0,1,14,Subscription renew.,2017-07-18
13:22:58.893,,,19,PRE,false,Charge Fail. CTN[21976504467][PRE][GSM]: Without Credit.,,,,0,1074,,engine2dc2,24,3,5,2017-07-18
13:22:58.928,,3ADF36D0-6040-11E7-9619-A2D6E78E4511,NT0360
703,2,5500000000,100004,"{""authCode"":""6838"",""transactionIdAuth"":""118170706120694 8526""}",SUBSCRIPTION_299,2.99,11,0,1,14,Subscription renew.,2017-07-18
13:22:59.246,,,19,PRE,false,Charge Fail. CTN[84994640470][PRE][GSM]: Without Credit.,,,,0,748,,engine2dc2,24,3,5,2017-07-18 13:22:59.254, NT0299
Fonte de Entrada de Dados
val streamReader = spark.readStream .format("csv")
.option("header", false)
.option("mode", "DROPMALFORMED")
.schema(ReadSchemas.csvTransactionSchema) .load("hdfs://YOUR_PATH/20*/*/*/*.gz")
val conf = new SparkConf().setAppName("Structured Streaming")
val spark = SparkSession.builder()
.config(conf).getOrCreate()
fragmento de código Scala
Estrutura na definição de um
Schema
StructField(
"origin_id"
, IntegerType,
true
)
Nome do campo
Tipo do campo
Definição de Schema de Leitura de Dados
// the csv data schema
def csvTransactionLogSchema = StructType { StructType(Array(
StructField("id", StringType, true),
StructField("application_id", IntegerType, true),
StructField("carrier_id", IntegerType, true),
StructField("phone", StringType, true),
StructField("price", DoubleType, true),
StructField("origin_id", IntegerType, true), . . .
)) }
Processamento (Spark SQL API) v1
val query = streamReader.withColumn("date", $"creation_date".cast("date"))
.withColumn("successful_charges", when($"transaction_status_id" === 2, 1)) .withColumn("no_credit", when($"transaction_status_id" === 0, 1).otherwise(0)) .withColumn("error", when($"transaction_status_id" === 3).otherwise(0))
.filter("carrier_id IN (1,2,4,5)")
.filter("transaction_status_id NOT IN (5, 6)") .filter("transaction_action_id IN (0, 1)") .withWatermark("creation_date", "3 hour")
.groupBy($"carrier_id", window($"creation_date", "5 minutes").as("window")) .agg($"carrier_id",
avg($"response_time").as("avg_response_time"),
sum($"successful_charges").as("successful_charges"), sum($"no_credit").as("no_credit"),
sum($"error").as("error"),
count($"carrier_id").as("total_attempts"))
select case when
filtering
Processamento (Spark SQL API) v2
streamReader
.withWatermark(
"creation_date"
,
"3 hour"
)
.createOrReplaceTempView(
"
transaction_temp_table
"
)
val streamReader = spark.readStream
.format("csv") .schema(ReadSchemas.csvTransactionSchema)
Processamento (Spark SQL) v2
val query : DataFrame = spark.sql( """
SELECT carrier_id, TO_DATE(creation_date) as record_date, HOUR(creation_date) as hour_of_day, WINDOW(creation_date, "5 minutes").start as start_date,
AVG(response_time) as avg_response_time , SUM(CASE
WHEN transaction_status_id = 2 THEN 1 ELSE 0
END) as successful_charges, SUM(CASE
WHEN transaction_status_id = 0 THEN 1 ELSE 0 END) as no_credit, count(carrier_id) as total_attempts FROM transaction_raw WHERE carrier_id IN (1,2,4,5) AND transaction_action_id IN (0, 1) AND transaction_status_id NOT IN (5, 6)
GROUP BY carrier_id, TO_DATE(creation_date), HOUR(creation_date), WINDOW(creation_date, "5 minutes").start
""")
SELECT carrier_id, TO_DATE(creation_date) as record_date, HOUR(creation_date) as hour_of_day, WINDOW(creation_date, "5 minutes").start as start_date,
AVG(response_time) as avg_response_time , SUM(CASE
WHEN transaction_status_id = 2 THEN 1 ELSE 0
END) as successful_charges, SUM(CASE
WHEN transaction_status_id = 0 THEN 1 ELSE 0 END) as no_credit, count(carrier_id) as total_attempts FROM transaction_temp_table WHERE carrier_id IN (1,2,4,5) AND transaction_action_id IN (0, 1) AND transaction_status_id NOT IN (5, 6)
GROUP BY carrier_id, TO_DATE(creation_date), HOUR(creation_date), WINDOW(creation_date, "5 minutes").start
Fonte de Saída de Dados
val jdbcWriter = new JDBCSink(resource, username, password)
val foreachStream = query
.select($"carrier_id", $"date", $"hour_of_day", $"start_date", $"end_date") .writeStream
.foreach(jdbcWriter)
.outputMode(OutputMode.Update())
.trigger(Trigger.ProcessingTime("2 minute"))
.option("checkpointLocation", "hdfs://YOUR_PATH/checkpoint-complete/") .start
foreachStream.awaitTermination()
Result table
em consolidação
+--+---+---+---+---+---+---+---+---+---+
|id|record_date|hour |start |end |avg_resp_tm |success |no_credit|error|tot_att|
+--+---+---+---+---+---+-- ---+---+---+---+
|1 |2017-07-18 |13 |
13:20 |13:25
|618.8061297 | 4 |2607 |195 |2806 |
|2 |2017-07-18 |13 |13:20 |13:25 |1456.424283 | 13 |10912 |1503 |12428 |
|5 |2017-07-18 |13 |13:20 |13:25 |1161.730896 | 9 |2796 |532 |3337 |
|4 |2017-07-18 |13 |13:20 |13:25 |2950.642105 | 4 |1364 |54 |1425 |
+--+---+---+---+---+---+---+---+---+---+
Window (5 mins)
Consolidação
Watermark (ts)
Agregações
Operadoras
RESULTADOS OBTIDOS
60-90 mins
Processamento de Arquivos
via ETL persistência em
banco de analytics
3-5 mins
Processamento contínuo com
Apache Structured Stream
10-15 segundos
Consulta de consolidação
para exibição no Dashboard
< 1 segundo
Consulta para exibição no
Dashboard
ANTES
DEPOIS
mais rápido
~ 30x
de
Schema
EVOLUÇÃO
def csvSchema = StructType { StructType(Array(
StructField("id", StringType, true),
StructField("application_id", IntegerType, true),
StructField("carrier_id", IntegerType, true),
StructField("creation_date", TimestampType, true) ))}