Meu nome é Elton Minetto

Processando arquivos parquet em Go

Neste post vou falar sobre um formato relativamente novo de arquivo de dados, e como usá-lo em Go.

O formato chama-se Parquet, e atualmente é um projeto apoiado pela Apache Foundation. Trata-se de um formato binário de arquivos, com a finalidade de armazenar e facilitar o processamento de dados na forma de colunas. Ele suporta diferentes tipos de compressão e é bastante usado no ambiente de data science e big data, com ferramentas como o Hadoop.

Na Codenation estamos usando este formato para armazenar dados estatísticos em buckets do S3 facilitando o processamento paralelo, usando Lambda Functions, sem sobrecarregar nossos servidores de bancos de dados.

Neste post vou mostrar como gerar e processar arquivos neste formato usando a linguagem Go.

O primeiro passo é criar uma struct que vai representar os dados que vamos processar neste exemplo:

type user struct {
  ID        string    `parquet:"name=id, type=UTF8, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

O detalhe importante neste código são as tags que declaram como cada campo da struct vai ser tratada no momento da geração do arquivo parquet. Para fazer o processamento dos dados estou usando o pacote github.com/xitongsys/parquet-go e no repositório é possível ver mais exemplos das tags disponíveis.

Vamos agora gerar o nosso primeiro arquivo no formato parquet:

package main

import (
  "fmt"
  "log"
  "time"
  "github.com/bxcodec/faker/v3"
  "github.com/xitongsys/parquet-go-source/local"
  "github.com/xitongsys/parquet-go/parquet"
  "github.com/xitongsys/parquet-go/reader"
  "github.com/xitongsys/parquet-go/writer"
)

type user struct {
  ID        string    `parquet:"name=id, type=UTF8, encoding=PLAIN_DICTIONARY"`
  FirstName string    `parquet:"name=firstname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  LastName  string    `parquet:"name=lastname, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Email     string    `parquet:"name=email, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Phone     string    `parquet:"name=phone, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Blog      string    `parquet:"name=blog, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Username  string    `parquet:"name=username, type=UTF8, encoding=PLAIN_DICTIONARY"`
  Score     float64   `parquet:"name=score, type=DOUBLE"`
  CreatedAt time.Time //wont be saved in the parquet file
}

const recordNumber = 10000

func main() {
  var data []*user
  //create fake data
  for i := 0; i < recordNumber; i++ {
    u := &user{
      ID:        faker.UUIDDigit(),
      FirstName: faker.FirstName(),
      LastName:  faker.LastName(),
      Email:     faker.Email(),
      Phone:     faker.Phonenumber(),
      Blog:      faker.URL(),
      Username:  faker.Username(),
      Score:     float64(i),
      CreatedAt: time.Now(),
    }
    data = append(data, u)
  }
  err := generateParquet(data)
  if err != nil {
    log.Fatal(err)
  }

}

func generateParquet(data []*user) error {
  log.Println("generating parquet file")
  fw, err := local.NewLocalFileWriter("output.parquet")
  if err != nil {
    return err
  }
  //parameters: writer, type of struct, size
  pw, err := writer.NewParquetWriter(fw, new(user), int64(len(data)))
  if err != nil {
    return err
  }
  //compression type
  pw.CompressionType = parquet.CompressionCodec_GZIP
  defer fw.Close()
  for _, d := range data {
    if err = pw.Write(d); err != nil {
      return err
    }
  }
  if err = pw.WriteStop(); err != nil {
    return err
  }
  return nil
}

Com o arquivo gerado podemos fazer o processo inverso, lendo como no exemplo abaixo:

func readParquet() ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }
  pr, err := reader.NewParquetReader(fr, new(user), recordNumber)
  if err != nil {
    return nil, err
  }
  u := make([]*user, recordNumber)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}

O exemplo acima é apenas didático, pois estou lendo o arquivo todo e colocando todos os 10000 registros em memória, o que pode ser um problema quando estivermos falando de gigabytes de dados. Na prática o ideal é usarmos as funções que o pacote fornece, para buscar apenas parte do arquivo:

func readPartialParquet(pageSize, page int) ([]*user, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return nil, err
  }
  pr, err := reader.NewParquetReader(fr, new(user), int64(pageSize))
  if err != nil {
    return nil, err
  }
  pr.SkipRows(int64(pageSize * page))
  u := make([]*user, pageSize)
  if err = pr.Read(&u); err != nil {
    return nil, err
  }
  pr.ReadStop()
  fr.Close()
  return u, nil
}

Outra vantagem deste formato, como a definição deixa bem claro, é o fato dela ser focada no tratamento das colunas do arquivo. Desta forma, podemos pegar apenas a coluna Score e calcular sua média:

func calcScoreAVG() (float64, error) {
  fr, err := local.NewLocalFileReader("output.parquet")
  if err != nil {
    return 0.0, err
  }
  pr, err := reader.NewParquetColumnReader(fr, recordNumber)
  if err != nil {
    return 0.0, err
  }
  num := int(pr.GetNumRows())

  data, _, _, err := pr.ReadColumnByPath("parquet_go_root.score", num)
  if err != nil {
    return 0.0, err
  }
  var result float64
  for _, i := range data {
    result += i.(float64)
  }
  return (result / float64(num)), nil
}

O objetivo deste post era apresentar este formato relativamente novo e que pode ser muito útil para a transferência de dados, substituindo arquivos csv ou json em projetos de diferentes escalas. É possível aprofundar-se na documentação do formato e também do pacote para encontrar exemplos mais complexos e detalhados, mas espero ter trazido uma novidade útil para alguns projetos em Go.

O código completo do exemplo apresentado neste post pode ser encontrado neste repositório.