Catalog (3) e informa o UUID que deve ser utilizado pelo recurso. O recurso passa a enviar dados ao Resource Adaptor, que (4) publica a chegada dos novos dados no Shock através do Kafka, promovendo extensão ao InterSCity. Aplicações (5) interagem com o Shock via Kafka, construindo o canal de fluxo de dados, definindo as operações a serem executadas.
Por fim, os resultados do Shock e do Data Collector (6) serão disponibilizados, podendo ser consumidos por aplicações como o microsserviço Resource Viewer, que (7) apresenta dados ao usuário final.
Assim como seguido pela equipe do InterSCity, utilizamos o Docker na gerência de configuração do novo serviço. A configuração do Spark que havia sido feita pelo time do InterSCity na criação do DataProcessor foi reutilizada, e configuramos um contêiner com o Kafka. Por fim, ligamos os contêineres configurados com o microsserviço Resource Adaptor, permitindo assim a interação entre o InterSCity e as ferramentas definidas para uso.
Iniciamos a ligação entre os projetos, etapa 1, com uma adaptação no microsserviço Resource Adaptor, que com a mudança, passou a publicar em um tópico específico no Kafka a chegada de novos dados. Essa adaptação não trouxe mudanças significativas no InterSCity, não afetando o fluxo usual da plataforma. Após, solucionamos as etapas 2 e 3 através do desenvolvimento do Shock, responsável por receber mensagens em tópicos específicos do Kafka e passá-los ao Spark Streaming. O Shock gerencia a execução de fluxo de dados do Spark, permitindo que usuários terceiros configurem fluxos de dados no Spark sem ter conhecimento técnico da ferramenta.
4.2 SHOCK
O Shock encontra-se disponível em um repositório no Gitlab1e o novo microsserviço de processamento de dados pode ser encontrado em umfork do serviço original2. O Shock abstrai o uso das diferentes ferramentas e pode ser customizado por serviços externos que definem e configuram fluxos para serem executados. A arquitetura do Shock apresenta pontos de estensão e possui um handler para a Arquitetura Kappa desenvolvido3, mas caso seja desejado o uso de outra estratégia, basta implementar um novo handler.
1 <https://gitlab.com/DGuedes/shock>
2 <https://gitlab.com/DGuedes/data-processor>
3 <https://gitlab.com/DGuedes/shock/blob/master/shock/handlers.py>
36 Capítulo 4. PROJETO E IMPLEMENTAÇÃO DO SERVIÇO DE PROCESSAMENTO
Figura 8 – Ciclo de vida do Shock dentro do InterSCity.
A Figura 8 ilustra o uso do Shock, utilizando como exemplo uma aplicação de cidades inteligentes desenvolvida pelo time do InterSCity4, que disponibiliza dados como as coordenadas, o tempo, o índice e a linha de alguns ônibus de São Paulo. Imaginando que seja desejado a criação de uma nova informação a partir dos dados dos ônibus, como a velocidade, o fluxo de uso com o Shock poderia ser: (1) um cliente que deseje utilizar o serviço de processamentos do InterSCity analisa as funções que o Shock disponibiliza para criação do fluxo, e (2) constrói o fluxo desejado através dessas funções, via Kafka.
Os recursos IoT (3) publicam no Resource Adaptor a chegada de novos dados de ônibus, para que esses dados sejam (4) publicados em um tópico específico do Kafka pelo Resource Adaptor. Após serem disponibilizados no Kafka, os dados ficam disponíveis para serem utilizados pelo Shock, que os (5) recebe nos fluxos de dados configurados para ingerir dados, e os processa através do canal construído no passo 2. Por fim, após o processamento dos dados, o Shock pode (6) disponibilizar os resultados do processamento, que pode ser reaproveitado por aplicações terceiras.
Figura 9 – Padrãoingestão, preparo, análise e publicação.
4 <https://gitlab.com/smart-city-software-platform/external-resources/tree/master/bus>
4.2. SHOCK 37
Do ponto de vista da implementação, utilizamos no Shock a arquitetura de ca-madas ingestão, tratamento, análise e publicação, cujas traduções em inglês são respecti-vamente ingest, store, analyze epublish. Essa arquitetura é apresentada na Figura 9, e a criamos para o Shock utilizando como base arquiteturas mais difundidas, como a ingest, store, analyzeevisualize, utilizada na plataforma Google Cloud Platform5, e a arquitetura collecton tier, message queuing tier, analysis tier, in-memory data store edata access tier, explicada por Psaltis (2017). No Shock, cada uma dessas camadas são classificadas como estágios de um fluxo de dados, podendo um fluxo de dados ter no máximo 4 estágios.
Diferente dos outros serviços da plataforma, as aplicações clientes podem interagir com o Shock através de uma API fornecida via o Kafka. Assim, as aplicações enviam men-sagens ao Kafka utilizando um formato semelhante ao JSON, com o padrão “arg1;{arg2}”, onde o primeirotoken, “arg1”, representa o nome da ação desejada, e o segundo, “arg2”, os argumentos da ação. Umhandler no Shock receberá a ação requisitada no métodohandle, e deve tratar a requisição recebida. Por exemplo, uma mensagem ingestion;{"stream":
"mystream", "shock_action": "socketIngestion"}, deve ser enviada caso queira-se o regis-tro de um estágio de ingestão no fluxo de dadosmystreamutilizando a estratégiasocketIngestion.
Figura 10 – Diagrama de classes do Shock.
A Figura10apresenta o diagrama das principais classes do Shock. O núcleo do Shock é representado pela classe Shock, que sustenta-se no uso de umhandler que implemente o método handle. Atualmente, um handler para o InterSCity encontra-se implementado, contudo, caso seja desejado o uso de um handler que utilize a Arquitetura Lambda, por exemplo, basta a criação de uma nova classe que herde da classeHandlere que implemente o métodohandle para
5 <https://cloud.google.com/solutions/data-lifecycle-cloud-platform>
38 Capítulo 4. PROJETO E IMPLEMENTAÇÃO DO SERVIÇO DE PROCESSAMENTO
as diferentes ações requisitadas. Por fim, uma classe Stream utiliza a arquitetura de camadas citada anteriormente, onde cabe ao handler gerenciar e interagir com os diferentes fluxos de dados criados.
1 # s o u r c e : h t t p s : / / g i t l a b . com/DGuedes/ s h o c k / b l o b / m a s t e r / s h o c k / i n g e s t i o n . py d e f s o c k e t I n g e s t i o n ( a r g s : d i c t) −> SparkDataFrame :
3 s p a r k = a r g s . g e t (" s p a r k ") h o s t = a r g s . g e t (" h o s t ")
5 p o r t = a r g s . g e t (" p o r t ")
r e t u r n s p a r k . r e a d S t r e a m .f o r m a t(" s o c k e t ") . o p t i o n (" h o s t ", h o s t ) \
7 . o p t i o n (" p o r t ", p o r t ) . l o a d ( )
9
d e f k a f k a I n g e s t i o n ( a r g s : d i c t) −> SparkDataFrame :
11 s p a r k = a r g s . g e t (" s p a r k ") t o p i c = a r g s . g e t (" t o p i c ")
13 b r o k e r s = a r g s . g e t (" b r o k e r s ")
r e t u r n s p a r k . r e a d S t r e a m .f o r m a t(" k a f k a ") \
15 . o p t i o n (" k a f k a . b o o t s t r a p . s e r v e r s ", b r o k e r s ) \ . o p t i o n (" s u b s c r i b e ", t o p i c ) . l o a d ( ) \
17 . s e l e c t E x p r ("CAST( key AS STRING) ", "CAST( v a l u e AS STRING) ")
Listagem 4.1 – Ingestão de dados no Shock via socket e Kafka.
O primeiro estágio de um fluxo de dados, a ingestão, trata-se da ingestão dos dados a partir de alguma fonte, como algum tópico do Kafka, ou algum arquivo novo. Atualmente o Shock fornece três tipos diferentes de ingestão: ingestão via tópico do Kafka, ingestão via arquivo (Parquet e JSON) e ingestão via socket. A depender da estratégia de ingestão, alguns parâmetros são necessários na configuração - numa ingestão via Kafka, por exemplo, é necessário configurar o endereço do broker e os tópicos que serão utilizados. A Listagem 4.1 apresenta o código-fonte de duas estratégias de ingestão disponíveis no Shock.
1 # s o u r c e : h t t p s : / / g i t l a b . com/DGuedes/ s h o c k / b l o b / m a s t e r / s h o c k / p r o c e s s i n g . py d e f c a s t e n t i t y ( s t r e a m : DataStreamReader , a r g s : d i c t) −> DataStreamReader :
3 j s o n _ o b j e c t s = [ ]
f o r u i n [" u u i d ", " c a p a b i l i t y ", " timestamp ", " v a l u e "] :
5 j s o n _ o b j e c t s . append ( g e t _ j s o n _ o b j e c t ( s t r e a m . v a l u e , ’ $ . ’+u ) . a l i a s ( u ) ) r e t u r n s t r e a m . s e l e c t ( j s o n _ o b j e c t s )
Listagem 4.2 – Tratamento de dados no Shock viacast.
O segundo estágio de um fluxo de dados é chamadopreparo, e tem o papel de ajuste e limpeza dos dados para que sejam utilizados pelos fluxos sem maiores complicações. Um exemplo típico é o uso de um estágio de preparo que façacast (mudanças nos tipos de dados) de dados, quando valores estão como string quando são necessários valores double. Atualmente, o Shock apresenta somente preparo de dados via cast, que são essenciais no uso do InterSCity e de consumo do Kafka, apresentados na Listagem 4.2.
4.2. SHOCK 39
# s o u r c e : h t t p s : / / g i t l a b . com/DGuedes/ s h o c k / b l o b / m a s t e r / s h o c k / p r o c e s s i n g . py
2 d e f s t r e a m F i l t e r ( s t r e a m : SparkDataFrame , a r g s : d i c t) −> SparkDataFrame : q u e r y = a r g s . g e t (" q u e r y ")
4 i f ( q u e r y ) :
r e t u r n s t r e a m . where ( q u e r y )
6 e l s e:
r a i s e(’ M i s s i n g r e q u i r e d param " q u e r y " ’)
8
10 d e f mean ( s t r e a m : SparkDataFrame , a r g s : d i c t) −> SparkDataFrame : d f 1 = s t r e a m . s e l e c t E x p r (’ c a s t ( v a l u e a s d o u b l e ) v a l u e ’,
12 ’ c a p a b i l i t y ’, ’ u u i d ’, ’ timestamp ’)
d f 2 = d f 1 . s e l e c t ( avg (" v a l u e ") )
14 r e t u r n d f 2
Listagem 4.3 – Operações de análise de dados no Shock.
O terceiro estágio de um fluxo, a análise, é o estágio principal do processamento, e permite filtros, agregações e cálculos. No Shock, a etapa de análise dos dados recebe como entrada um um fluxo de dados de dados e retorna um fluxo de dados transformado. Uma aplicação que deseje utilizar a operação de filtro no fluxo mystream com a finalidade de filtrar os valores de air_quality iguais a 31, deve fazer uma requisição no Kafka com o conteúdo processing;{"stream": "mystream", "shock_action": "streamFilter", "query": "select
* from air_quality where ’value’ == 31"}. Algumas possibilidades de estágios de análise estão contidos na Listagem 4.3.
# s o u r c e : h t t p s : / / g i t l a b . com/DGuedes/ s h o c k / b l o b / m a s t e r / s h o c k / s i n k s . py
2 d e f c o n s o l e S i n k ( s t r e a m : S t r u c t u r e d S t r e a m , a r g s : d i c t) −> OutputStream : streamName = a r g s . g e t (" s t r e a m ")
4 r e t u r n s t r e a m . w r i t e S t r e a m . outputMode (’ append ’) .f o r m a t(’ c o n s o l e ’) \ . s t a r t ( )
6 d e f p a r q u e t C o m p l e t e S i n k ( s t r e a m : S t r u c t u r e d S t r e a m , a r g s : d i c t) −>
OutputStream :
streamName = a r g s . g e t (" s t r e a m ")
8 path = a r g s . g e t (" path ") o r " / a n a l y s i s "
r e t u r n s t r e a m . w r i t e S t r e a m . outputMode (’ c o m p l e t e ’) .f o r m a t(’ memory ’) \
10 . queryName (’ a n a l y s i s ’) . s t a r t ( )
d e f memorySink ( s t r e a m : S t r u c t u r e d S t r e a m , a r g s : d i c t) −> OutputStream :
12 t a b l e = a r g s . g e t (’ t a b l e ’) i f n o t t a b l e :
14 t a b l e = ’ a n a l y s i s ’
s t r e a m . w r i t e S t r e a m . outputMode (’ c o m p l e t e ’) .f o r m a t(’ memory ’) \
16 . queryName ( t a b l e ) . s t a r t ( )
Listagem 4.4 – Estratégias de publicação de resultados presentes no Shock.
40 Capítulo 4. PROJETO E IMPLEMENTAÇÃO DO SERVIÇO DE PROCESSAMENTO
O último estágio de um fluxo de dados, conhecido como estágio de publicação, é o estágio de apuração do processamento, retornando os dados necessários para o cliente. O Shock atualmente permite a publicação via arquivo, memória econsole, e após o lançamento da versão 2.2 do Spark, permitirá a publicação via Kafka. A publicação via arquivo é limitada, pois não permite a publicação em um servidor externo. A publicação via memória também não permite, mas é interessante pois o conteúdo passa a estar disponível para ser requisitado via SparkSession, permitindo consultas com sintaxe SQL. Por fim, a publicação viaconsole está presente somente para fins de desenvolvimento, mas é possível que ocorra uma combinação com outras ferramentas que leiam da saída padrão do Sistema Operacional. Categorizamos no Shock o nomesink para as diferentes estratégias de publicação, que é o nome utilizado pelo Spark. A Listagem4.4apresenta as estratégias de publicação emconsole, Parquet e memória.
Figura 11 – Comunicação entre o Shock e aplicações.
Como alternativa às limitações de publicação dos dados, o Shock disponibiliza na API osflushes, que atuam comojobsadicionais que ingerem dados de alguma fonte (que não seja um fluxo) e publicam em outra. Essa alternativa não precisaria existir caso a publicação via Kafka já estivesse disponível no Spark, mas enquanto o lançamento da versão 2.2 não ocorre, acaba sendo uma opção válida. Nesse sentido, uma aplicação cliente do InterSCity poderia usar o Shock para realizar processamento de seus dados seguindo a interação apresentada na Figura11. Nessa interação, a aplicação faz definições no Shock via Kafka, e o Shock retorna os resultados pelos estágios de publicação ou por WebSocket.
41
5 EXEMPLO DE USO
Como forma de ilustração do uso do serviço que desenvolvemos, elaboramos uma apli-cação chamada Forensic1, que interage com o Shock via Kafka, respeitando os padrões utilizados pela API. A aplicação foi feita em Elixir, uma linguagem funcional que utiliza a máquina virtual do Erlang, utilizando oframework Phoenix, que facilita no desenvolvimento de aplicações Web.
Figura 12 – Página de configuração de um fluxo. Os parâmetros obrigatórios (topic e brokers) foram configurados.
Figura 13 – Página de visualização de resultados.
O Forensic tem como principais objetivos abstrair o Shock do usuário final, e conse-quentemente, as ferramentas deBig Data(como o Spark), ao passo em que fornece um conjunto de funcionalidades que permitem ao usuário final configurar a atuação dos fluxos, semelhante a outros serviços existentes, como o Amazon AWS2. Um usuário que deseje interagir com o Shock e o InterSCity deve (1) configurar um fluxo novo com os parâmetros desejados; (2) criar esse fluxo no Shock; (3) injetar esse fluxo no Shock; (4) e iniciar o processamento do fluxo. O Forensic abstrai esses 4 passos, facilitando o uso das ferramentas e da plataforma, aproximando usuários finais que não tenham grande conhecimento do código-fonte do Shock. A Figura12 apresenta a página de visualização e edição de um fluxo, e a Figura13a página de visualização de resultados
1 <https://gitlab.com/DGuedes/forensic>
2 <https://aws.amazon.com/kinesis/analytics/>
42 Capítulo 5. EXEMPLO DE USO
processados. Os resultados de cada dado são mostrados separadamente, onde o título representa a capacidade do recurso, e o texto centralizado o valor da respectiva capacidade.
Com o propósito de aproximar as soluções que desenvolvemos aos cenários mais reais, separamos dois casos de uso para serem resolvidos com uso do Forensic e do Shock. Em cada um desses casos ocorre uma interação entre o InterSCity, Shock e Forensic, utilizando como produtor de dados alguns coletores de São Paulo3. De maneira geral, os casos de uso abrangem os conceitos citados anteriormente, como ocast de dados, filtros, dentre outros.