Processamento em Stream

2022-01-17

Para falarmos de Stream Processing precisamos primeiro falar rapidamente de Batch Processing ou processamento em lote.

##Processamento em lote

Processamento em lote é uma técnica onde você lê um conjunto de dados de entrada e produz um conjunto de dados de saída. A saída é conhecida como informação derivada ou dado derivado -- o resultado do processamento.

Uma das propriedades desse tipo de processamento de dados é a reproducibilidade. Dado o mesmo input, o processador de dados deve retornar a mesma saída. É como uma função pura, mas a nível de sistema, como se um programa fosse resumido a uma função pura.

Um bom exemplo de processamento em lote são problemas de leet coding, como os problemas do URI ou HackerRank.

Nestas plataformas você recebe um conjunto de entrada e o seu objetivo é construir o processador, que vai acabar gerando informação derivada -- o resultado do processamento.

É especialmente esperado que a saída do processador seja possível de ser reconstruída.

Outro exemplo de batch processing é o seu Cronjob maroto que executa de hora em hora varrendo o banco de dados e produzindo alguma coisa.

Pooorém, a realidade é um pouco diferente. A informação produzida por pessoas ou máquinas é constante. A informação flui ao longo do tempo e espaço através de diversos computadores conectados em uma rede.

É totalmente possível processar essa quantidade de informação de forma particionada. Fatie pedaços de informação e processe ela sobre demanda. Funciona!

O problema com essa abordagem é que você perde resolução. Você só obtém resultados após o processamento de cada fatia. Caso alta resolução de informação for uma necessidade, você pode diminuir cada vez mais o pedaço de dados que você precisa processar; por exemplo, processar uma hora de dados, um minuto ou um segundo. Quanto maior a resolução, ou seja, menor é o tamanho do pedaço de dados que você precisa processar, mais próximo você chega do processamento em stream de dados.

##Processamento em Stream

Conhecemos hoje como processamento em stream como o processamento de informação que representa um evento. Um evento é um pedaço de informação que representa algo único que possa ter acontecido.

Por exemplo, um evento de produto adicionado no carrinho pode ser emitido por algum serviço (conhecido como produtor -- producer, publisher, sender, etc) que cuida do carrinho de compras de um usuário e pode ser consumido por N outros serviços (conhecidos como consumidores -- consumer, listener, subscriber, etc), como um serviço que calcula dados analíticos de intensão de compra.

Digamos que esse evento que representa um produto adicionado no carrinho tenha todas as informações necessárias para identificar quem adicionou o produto no carrinho, qual produto foi adicionado e quando. Geralmente um cookie com um identificador único pode ser usado para saber quem é aquele usuário e o que mais ele colocou no carrinho durante aquela sessão, mesmo que a pessoa nem tenha uma conta ou faça a compra na loja online.

Estes eventos que representam produtos sendo adicionados ou removidos de um carrinho de compras podem ser categorizados e agrupados usando alguma abstração. Chamamos essas abstrações de tópicos. São estruturas que agrupam eventos produzidos por algum programa e servem os mecanismos de difusão que entregam estes eventos para todos os programas que consomem eventos desse tópico.

##Exemplo com Node.js e EventEmitter

É possível simular esse esquema de produtor/consumidores facilmente usando a biblioteca event, nativa do Nede.

Vamos começar importando a classe EventEmitter do módulo event:

public/code/streams.js
const EventEmiter = require('events')

O próximo passo é criar um classe que vai emular um tópico. Essa classe vai servir como abstração para armazenar eventos que possam ser agrupados.

Vamos criar a classe ShoppingCartProductAddedProducer. Essa classe vai ser responsável por conectar eventos emitidos por algum local (vamos forçar a emissão de eventos) a "serviços" que vão processar estes eventos.

public/code/streams.js
const EventEmiter = require('events')

class ShoppingCartProductAddedProducer extends EventEmiter {}

Agora vamos criar dois processadores de eventos. Um deles será o responsável por calcular dados analíticos baseado nos eventos processados, chamado de AnalyticsService e o outro... bom, o outro vai servir só para ilustrar que podemos fazer a difusão de um evento para ser processado por N processadores de eventos, chamado de XService.

public/code/streams.js
const EventEmiter = require('events')

class ShoppingCartProductAddedProducer extends EventEmiter {}

class AnalyticsService {
  handle(event) {
    console.log(`[Analytics] Processing event ${JSON.stringify(event)}`)
  }
}

class XService {
  handle(event) {
    console.log(`[X] Processing event ${JSON.stringify(event)}`)
  }
}

Note que são classes comuns. A única coisa que essas classes fazem é expor um método em comum chamado handle que recebe um evento. Dica: se você estiver implementando algo profissionalmente, use TypeScript e faça seus serviços equivalentes a AnalyticsService e XService de tal forma que implementam uma interface.

Podemos agora criar um produtor -- algo que vai emitir eventos; e conectar os nossos consumidores a ele.

public/code/streams.js
const EventEmiter = require('events')

class ShoppingCartProductAddedProducer extends EventEmiter {}

class AnalyticsService {
  handle(event) {
    console.log(`[Analytics] Processing event ${JSON.stringify(event)}`)
  }
}

class XService {
  handle(event) {
    console.log(`[X] Processing event ${JSON.stringify(event)}`)
  }
}

const producer = new ShoppingCartProductAddedProducer()
producer.on('product-added-to-cart', new AnalyticsService().handle)
producer.on('product-added-to-cart', new XService().handle)

A base está pronta, agora precisamos emitir eventos. Podemos fazer isso usando o método emit herdado de EventEmitter:

public/code/streams.js
const EventEmiter = require('events')

class ShoppingCartProductAddedProducer extends EventEmiter {}

class AnalyticsService {
  handle(event) {
    console.log(`[Analytics] Processing event ${JSON.stringify(event)}`)
  }
}

class XService {
  handle(event) {
    console.log(`[X] Processing event ${JSON.stringify(event)}`)
  }
}

const producer = new ShoppingCartProductAddedProducer()
producer.on('product-added-to-cart', new AnalyticsService().handle)
producer.on('product-added-to-cart', new XService().handle)

producer.emit('product-added-to-cart', { sku: 'abc123', visitorId: 'xxxxyyyy' })
producer.emit('product-added-to-cart', { sku: '999sby', visitorId: 'xxxxyyyy' })

Ao executar o arquivo acima, obtemos a seguinte saída:

$ node public/code/streams.js
[Analytics] Processing event {"sku":"abc123","visitorId":"xxxxyyyy"}
[X] Processing event {"sku":"abc123","visitorId":"xxxxyyyy"}
[Analytics] Processing event {"sku":"999sby","visitorId":"xxxxyyyy"}
[X] Processing event {"sku":"999sby","visitorId":"xxxxyyyy"}

Veja que tanto AnalyticsService#handler quanto XService#handler foram invocados. Isso exemplifica a possibilidade de realizar a difusão de eventos para N consumidores onde cada consumidor vai fazer o precisa ser feito e bola pra frente.

Você pode acessar o código de exemplo aqui.

Na vida real, provavelmente AnalyticsService e XService seriam microsserviços conectados a algum sistema de mensageria, como Apache Kafka, RabbitMQ, Azure Service Bus, Google Pub/Sub, etc; product-added-to-cart seria o nome de algum tópico e ShoppingCartProductAddedProducer seria algum nó na rede, também conectado ao mesmo sistema de mensageria, mas atuando como produtor de eventos.

##Aprenda mais

Se você trabalha com sistemas distribuídos e é responsável por tomar decisões. Recomendo demais a leitura do livro Designing Data-Intensive Applications. Deixei um link logo abaixo.

Também vale a pena dar uma olhada na documentação do Node caso você tenha ficado intrigado com o módulo event.

##Referências

#arquitetura

📝 Edite esta página