Meu nome é Elton Minetto

Usando CloudEvents em Go

Em ambientes complexos é relativamente comum a adoção de uma arquitetura orientada a eventos (Event-driven architecture, ou EDA) para aumentar a escalabilidade e reduzir o acoplamento entre os componentes/serviços.

Mas ao mesmo tempo que esta abordagem resolve uma série de problemas, um dos desafios enfrentados pelos times é a padronização dos eventos a fim de garantir compatibilidade entre todos os componentes. Para mitigar este desafio podemos fazer uso do projeto CloudEvents.

O projeto tem como objetivo ser uma especificação para a padronização e descrição de eventos trazendo consistência, acessibilidade e portabilidade. Outra vantagem é que além de ser uma especificação o projeto fornece uma série de SDKs para acelerar a adoção entre os times.

Neste post quero demonstrar o uso do SDK de Go (com uma participação especial do SDK de Python) para ilustrar o uso em um projeto fictício.

Vamos considerar um ambiente composto por dois microsserviços, o user, que faz a gestão de usuários (CRUD) e um serviço de auditoria, que armazena acontecimentos importantes no ambiente para futura análise.

O código do serviço user ficou da seguinte forma:

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"time"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/protocol"
	"github.com/go-chi/chi/v5"
	"github.com/go-chi/httplog"
	"github.com/google/uuid"
)

const auditService = "http://localhost:8080/"

func main() {
	logger := httplog.NewLogger("user", httplog.Options{
		JSON: true,
	})
	ctx := context.Background()
	ceClient, err := cloudevents.NewClientHTTP()
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}

	r := chi.NewRouter()
	r.Use(httplog.RequestLogger(logger))
	r.Post("/v1/user", storeUser(ctx, ceClient))

	http.Handle("/", r)
	srv := &http.Server{
		ReadTimeout:  30 * time.Second,
		WriteTimeout: 30 * time.Second,
		Addr:         ":3000",
		Handler:      http.DefaultServeMux,
	}
	err = srv.ListenAndServe()
	if err != nil {
		logger.Panic().Msg(err.Error())
	}
}

type userRequest struct {
	ID       uuid.UUID
	Name     string `json:"name"`
	Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		oplog := httplog.LogEntry(r.Context())

		var ur userRequest
		err := json.NewDecoder(r.Body).Decode(&ur)
		if err != nil {
			w.WriteHeader(http.StatusBadRequest)
			oplog.Error().Msg(err.Error())
			return
		}
		ur.ID = uuid.New()
		//TODO: store user in a database

		// Create an Event.
		event := cloudevents.NewEvent()
		event.SetSource("github.com/eminetto/post-cloudevents")
		event.SetType("user.storeUser")
		event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

		// Set a target.
		ctx := cloudevents.ContextWithTarget(context.Background(), auditService)

		// Send that Event.
		var result protocol.Result
		if result = ceClient.Send(ctx, event); cloudevents.IsUndelivered(result) {
			oplog.Error().Msgf("failed to send, %v", result)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		return
	}
}

No código é possível ver a criação de um evento e o envio para o serviço de auditoria, que ficou da seguinte forma:

package main

import (
	"context"
	"fmt"
	"log"

	cloudevents "github.com/cloudevents/sdk-go/v2"
)

func receive(event cloudevents.Event) {
	// do something with event.
	fmt.Printf("%s", event)
}

func main() {
	// The default client is HTTP.
	c, err := cloudevents.NewClientHTTP()
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}
	if err = c.StartReceiver(context.Background(), receive); err != nil {
		log.Fatalf("failed to start receiver: %v", err)
	}
}

Executando os dois serviços é possível ver o funcionamento, ao enviar uma request para o user:

curl -X "POST" "http://localhost:3000/v1/user" \
     -H 'Accept: application/json' \
     -H 'Content-Type: application/json' \
     -d $'{
  "name": "Ozzy Osbourne",
  "password": "12345"
}'

O output do user é:

{"level":"info","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpRequest":{"header":{"accept":"application/json","content-length":"52","content-type":"application/json","user-agent":"curl/8.7.1"},"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user","scheme":"http"},"timestamp":"2024-11-28T15:52:27.947355-03:00","message":"Request: POST /v1/user"}
{"level":"warn","service":"user","httpRequest":{"proto":"HTTP/1.1","remoteIP":"[::1]:50894","requestID":"Macbook-Air-de-Elton.local/3YUAnzEbis-000001","requestMethod":"POST","requestPath":"/v1/user","requestURL":"http://localhost:3000/v1/user"},"httpResponse":{"bytes":0,"elapsed":2.33225,"status":0},"timestamp":"2024-11-28T15:52:27.949877-03:00","message":"Response: 0 Unknown"}

E o output do serviço de auditoria, demonstrando o recebimento do evento.

❯ go run main.go
Context Attributes,
  specversion: 1.0
  type: user.storeUser
  source: github.com/eminetto/post-cloudevents
  id: 5190bc29-a3d5-4fca-9a88-85fccffc16b6
  time: 2024-11-28T18:53:17.474154Z
  datacontenttype: application/json
Data,
  {
    "id": "8aadf8c5-9c4e-4c11-af24-beac2fb9a4b7"
  }

Para validar o objetivo da portabilidade, usei o SDK de Python para implementar uma versão do serviço de auditoria:

from flask import Flask, request

from cloudevents.http import from_http

app = Flask(__name__)


# create an endpoint at http://localhost:/3000/
@app.route("/", methods=["POST"])
def home():
    # create a CloudEvent
    event = from_http(request.headers, request.get_data())

    # you can access cloudevent fields as seen below
    print(
        f"Found {event['id']} from {event['source']} with type "
        f"{event['type']} and specversion {event['specversion']}"
    )

    return "", 204


if __name__ == "__main__":
    app.run(port=8080)

E o output da aplicação mostra o recebimento do evento, sem necessidade de alteração no serviço user:

(.venv) eminetto@Macbook-Air-de-Elton audit-python % python3 main.py
 * Serving Flask app 'main'
 * Debug mode: off
WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead.
 * Running on http://127.0.0.1:8080
Press CTRL+C to quit
Found ce1abe22-dce5-40f0-8c82-12093b707ed7 from github.com/eminetto/post-cloudevents with type user.storeUser and specversion 1.0
127.0.0.1 - - [28/Nov/2024 15:59:31] "POST / HTTP/1.1" 204 -

O exemplo anterior serve ao propósito de apresentar os SDKs do CloudEvents, mas ele fere um princípio das arquiteturas baseadas em eventos que é diminuir o acoplamento. A aplicação user conhece e está vinculada à aplicação de auditoria, o que não é uma prática recomendável. Podemos melhorar esta situação usando outros recursos do CloudEvents, como pub/sub ou adicionando algo como o Kafka. O exemplo a seguir usa o Kafka para desacoplar as duas aplicações.

O primeiro passo foi criar um docker-compose.yaml para usarmos o Kafka:

services:
  kafka:
    image: bitnami/kafka:latest
    restart: on-failure
    ports:
      - 9092:9092
    environment:
      - KAFKA_CFG_BROKER_ID=1
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - KAFKA_CFG_NUM_PARTITIONS=3
      - ALLOW_PLAINTEXT_LISTENER=yes
    depends_on:
      - zookeeper

  zookeeper:
    image: bitnami/zookeeper:latest
    ports:
      - 2181:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes

A próxima alteração foi no serviço user:

package main

import (
	"context"
	"encoding/json"
	"log"
	"net/http"
	"time"

	"github.com/IBM/sarama"
	"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/go-chi/chi/v5"
	"github.com/go-chi/httplog"
	"github.com/google/uuid"
)

const (
	auditService = "127.0.0.1:9092"
	auditTopic   = "audit"
)

func main() {
	logger := httplog.NewLogger("user", httplog.Options{
		JSON: true,
	})
	ctx := context.Background()

	saramaConfig := sarama.NewConfig()
	saramaConfig.Version = sarama.V2_0_0_0

	sender, err := kafka_sarama.NewSender([]string{auditService}, saramaConfig, auditTopic)
	if err != nil {
		log.Fatalf("failed to create protocol: %s", err.Error())
	}

	defer sender.Close(context.Background())

	ceClient, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}

	r := chi.NewRouter()
	r.Use(httplog.RequestLogger(logger))
	r.Post("/v1/user", storeUser(ctx, ceClient))

	http.Handle("/", r)
	srv := &http.Server{
		ReadTimeout:  30 * time.Second,
		WriteTimeout: 30 * time.Second,
		Addr:         ":3000",
		Handler:      http.DefaultServeMux,
	}
	err = srv.ListenAndServe()
	if err != nil {
		logger.Panic().Msg(err.Error())
	}
}

type userRequest struct {
	ID       uuid.UUID
	Name     string `json:"name"`
	Password string `json:"password"`
}

func storeUser(ctx context.Context, ceClient cloudevents.Client) http.HandlerFunc {
	return func(w http.ResponseWriter, r *http.Request) {
		oplog := httplog.LogEntry(r.Context())

		var ur userRequest
		err := json.NewDecoder(r.Body).Decode(&ur)
		if err != nil {
			w.WriteHeader(http.StatusBadRequest)
			oplog.Error().Msg(err.Error())
			return
		}
		ur.ID = uuid.New()
		//TODO: store user in a database

		// Create an Event.
		event := cloudevents.NewEvent()
		event.SetID(uuid.New().String())
		event.SetSource("github.com/eminetto/post-cloudevents")
		event.SetType("user.storeUser")
		event.SetData(cloudevents.ApplicationJSON, map[string]string{"id": ur.ID.String()})

		// Send that Event.
		if result := ceClient.Send(
			// Set the producer message key
			kafka_sarama.WithMessageKey(context.Background(), sarama.StringEncoder(event.ID())),
			event,
		); cloudevents.IsUndelivered(result) {
			oplog.Error().Msgf("failed to send, %v", result)
			w.WriteHeader(http.StatusInternalServerError)
			return
		}

		return
	}
}

Foram necessárias poucas alterações, a maioria para fazermos a conexão com o Kafka, sendo que o evento em si não mudou.

Alteração similar foi feita no serviço de auditoria:

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/IBM/sarama"

	"github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
)

const (
	auditService = "127.0.0.1:9092"
	auditTopic   = "audit"
	auditGroupID = "audit-group-id"
)

func receive(event cloudevents.Event) {
	// do something with event.
	fmt.Printf("%s", event)
}

func main() {
	saramaConfig := sarama.NewConfig()
	saramaConfig.Version = sarama.V2_0_0_0

	receiver, err := kafka_sarama.NewConsumer([]string{auditService}, saramaConfig, auditGroupID, auditTopic)
	if err != nil {
		log.Fatalf("failed to create protocol: %s", err.Error())
	}

	defer receiver.Close(context.Background())

	c, err := cloudevents.NewClient(receiver)
	if err != nil {
		log.Fatalf("failed to create client, %v", err)
	}

	if err = c.StartReceiver(context.Background(), receive); err != nil {
		log.Fatalf("failed to start receiver: %v", err)
	}
}

O output das aplicações não teve alteração.

Com a inclusão do Kafka agora as aplicações deixam de ser acopladas e não ferimos mais os princípios de uma EDA, enquanto que mantemos as vantagens providas pelos CloudEvents.

O objetivo deste post era servir como uma introdução ao padrão, bem como demonstrar a facilidade de implementação usando os SDKs. O assunto pode ser aprofundado mas espero ter atingido o objetivo e inspirado a pesquisa e uso da tecnologia.

Se você já usa/usou CloudEvents e quizer compartilhar suas experiências nos comentários vai ser de grande utilidade.

Os códigos apresentados neste post podem ser encontrados no repositório no Github.