Postgres – Pooler de conexão com Cache

Postgres é sensacional, é parrudo e todo mundo sabe que é o “queridinho” para dados hoje em dia. Mas ele não faz milagre. Se você tem uma aplicação que espanca o banco com queries repetitivas (aqueles SELECTs que não mudam quase nunca, mas rodam milhares de vezes por segundo), o seu I/O vai pro espaço, a CPU sobe e a latência vira um pesadelo.

Recentemente, trabalhei em uma adaptação do PG_Dog (um proxy/pooler focado em Postgres) para resolver exatamente esse problema. O “pulo do gato”? Coloquei uma camada de cache chave-valor usando #Redis em paralelo ao pooler.

Por que colocar cache no Pooler?

Se a sua aplicação é legada ou se você tem um time de dev que não quer (ou não pode) mexer no código para implementar cache, você resolve isso no “meio do caminho”. O PG_Dog intercepta a query, vê se ela já foi feita antes e entrega o resultado na velocidade da memória.

A vantagem de usar Redis com Sharding em paralelo é que a gente não cria um novo gargalo. Se um nó de Redis ficar cheio ou sobrecarregado, a carga está distribuída entre vários shards.

Como a mágica acontece?

Basicamente, a estrutura funciona assim:

  1. Identificação: A aplicação manda a query pro PG_Dog.
  2. Hashing: O PG_Dog gera um hash único baseado no SQL e nos parâmetros. Esse hash é a chave.
  3. Check de Cache: Ele consulta os #shards do Redis em paralelo.
  4. Cache Hit: Se o dado estiver lá, ele devolve pro usuário em <5ms. O Postgres nem acorda.
  5. Cache Miss: Se não estiver, ele executa no Postgres, popula o Redis para a próxima e entrega o resultado.

Invalidação?

Claro que temos, funciona assim:

  1. Identificação: A aplicação manda o DML ou DDL pro PG_Dog.
  2. Check de Cache: Ele consulta os #shards do Redis em paralelo.
  3. Cache Delete Se a tabela estiver lá ele deleta a chave ou chaves envolvidas.
  4. No Commit: Se uma transação for aberta mas não comitada eu não limpo o cache.

Por que cache externo?

A vantagem de ter um pooler é poder escalar ele horizontalmente, afinal, não queremos gerar um ponto único de falha no ambiente.

Mas, estalar ele horizontal implica em trazer um outro problema para a mesa, um select pode criar um cache para aquele select, mas e se um DML ou DDL usar um outro pooler para fazer a alteração, como invalidar esse cache para evitar um falso positivo? por isso a ideia de usar um Redis (ou qualquer outro cache que use a mesma tecnologia) externo ao pooler.

O container do pooler fica pequeno e você escala o cache da melhor forma possível. (Shard, Cluster, Single, aí é com você).

    Configuração (Mão na massa)

    Não adianta só falar, tem que mostrar como configura. No arquivo de configuração do seu PG_Dog (que agora aceita múltiplos backends de cache), a coisa fica mais ou menos assim:

    YAML

    # Exemplo de config do PG_Dog com Redis Sharding
    [result_cache]
    enabled = true
    # Redis/Valkey/Dragonfly connection URL.
    redis_url = "redis://127.0.0.1:6379"
    # TTL (seconds). If omitted, PgDog applies a default.
    expire_seconds = 30
    # Don't cache very large results.
    max_entry_bytes = 524288
    # Redis key prefix.
    key_prefix = "pgdog:result_cache"
    # Optional allow/deny lists (regex) to control what gets cached.
    # Unsafe lists take precedence over safe lists.
    cache_safe_schema_list = []
    cache_unsafe_schema_list = []
    cache_safe_table_list = []
    cache_unsafe_table_list = []
    
    

    Na configuração acima você pode ver que da pra personalizar como no PG_Pool2, não fazer cache de tabelas específicas ou de schemas específicos, e por sinal, da pra usar regex caso precise.

    Conclusão

    Essa adaptação transforma o PG_Dog em uma ferramenta de aceleração ativa. Você ganha fôlego no banco de dados principal, economiza em instância de nuvem (RDS/CloudSQL).

    O código está aberto para quem quiser testar, quebrar ou melhorar lá no meu GitHub:

    👉 github.com/bigleka/pgdog

    Ainda estou em fase de testes totalmente Alfa, se alguém tiver coragem de começar a testar e ir encontrando erros é só avisar.

    Postgres – PG_Pool2 Sharding Cache

    Não vou entrar na discussão se o pgbouncer ou o pg_pool2 é isso ou aquilo, melhor ou pior, cada um tem suas qualidades e cada um sabe onde aperta o calo.

    A ideia desse post é mostrar que é possível montar uma estrutura de shard de cache para o pg_pool2 para poder ter balanceamento do pool e conseguir alguma consistência do cache ganhar proveito com isso.

    Começando pelo básico… O pg_pool2 tem uma configuração de cache, ele pode usar um serviço próprio ou o memcached (com tanta opção por aí eles continuam batendo nisso). Ele faz um controle até que bom do cache, invalida se alguma tabela sofre DML, não faz cache se vc escolher algum schema ou tabela específica, no final ele é bem honesto não garante 100% mas tenta entregar alguma coisa.

    Qual seria a vantagem desse cache? se sua aplicação é mais antiga, você não tem como alterar o fonte ou seus devs são preguiçosos e não entendem o porque de usar um cache você consegue fazer isso de forma precária no meio do caminho, diminuindo a carga de consultas repetitivas contra seu banco de dados.

    Existem outras formas de resolver? Até que sim, mas nesse post só vai ter essa.

    Como vamos trabalhar com uma configuração de dois #poolers de conexão, cada #pooler vai executar dois serviços do #memcached.

    O serviço na porta padrão 11211 vai trabalhar como um proxy de conexão enquanto o serviço executando na porta 11212 vai ser o serviço que vai realmente hospedar a chave valor.

    Como o serviço do #pgpool é responsável por criar, consultar e apagar as chaves ele precisa ser capaz de invalidar uma chave devido a uma operação de DML/DDL que passe por ele, se cada pool tiver seu próprio #memcached um não sabe que o outro existe e não tem como invalidar uma possível chave que possa gerar inconsistência.

    O serviço de #proxy do #memcached que vamos usar é o do próprio #memcached
    https://docs.memcached.org/features/proxy/examples/

    mas precisamos ajustar o arquivo de configuração para executar a subida do serviço em uma porta diferente da porta padrão, neste caso vamos fazer a configuração para usar a porta 11212.

    arquivo de configuração do memcached:

    # memcached default config file
    # 2003 - Jay Bonci <jaybonci@debian.org>
    # This configuration file is read by the start-memcached script provided as
    # part of the Debian GNU/Linux distribution.
    
    # Run memcached as a daemon. This command is implied, and is not needed for the
    # daemon to run. See the README.Debian that comes with this package for more
    # information.
    -d
    # Log memcached's output to /var/log/memcached
    logfile /var/log/memcached.log
    
    # Be verbose
    -v
    
    # Be even more verbose (print client commands as well)
    -vv
    
    # Start with a cap of 64 megs of memory. It's reasonable, and the daemon default
    # Note that the daemon will grow to this size, but does not start out holding this much
    # memory
    #-m 64
    -m 12288
    # Default connection port is 11211
    -p 11212
    
    # Run the daemon as root. The start-memcached will default to running as root if no
    # -u command is present in this config file
    -u memcache
    
    # Specify which IP address to listen on. The default is to listen on all IP addresses
    # This parameter is one of the only security measures that memcached has, so make sure
    # it's listening on a firewalled interface.
    #-l 127.0.0.1
    -l 0.0.0.0
    # Limit the number of simultaneous incoming connections. The daemon default is 1024
    # -c 1024
    
    # Lock down all paged memory. Consult with the README and homepage before you do this
    # -k
    
    # Return error when memory is exhausted (rather than removing items)
    # -M
    
    # Maximize core file limit
    # -r
    
    # Use a pidfile
    -P /var/run/memcached/memcached.pid
    

    agora para o ajuste de configuração de proxy, vamos fazer o seguinte:

    instalar dependências

    apt-get install gcc make libevent-dev
    

    baixar a versão compilável

    wget https://memcached.org/latest
    

    copiar, remover a versão baixada que ficou com um nome estranho e descompactar

    cp latest memcached-1.6.39.tar.gz
    rm latest
    tar xzvf memcached-1.6.39.tar.gz
    

    acessar o diretório e configurar a nova versão

    cd memcached-1.6.39.tar.gz
    ./configure --enable-proxy
    

    executar o make, make test e make install (esse processo demora)

    make
    make test
    make install
    

    confirmar que foi habilitado o proxy

    memcached --help | grep proxy
    

    criar um arquivo mc1proxy.lua com o seguinte conteúdo

    pools{
        main = {
            backends = {
                "127.0.0.1:11212",
                "OUTRO_IP:11212",
            }
        }
    }
    
    
    routes{
        default = route_direct{
            child = "main"
        }
    }
    

    **Esse OUTRO_IP é o IP usado no outro nó do instance group do #pgbouncer

    iniciar os serviços do #memcached nos 2 ou mais nós, na porta 11212

    Baixar o protocolo de roteamento no diretório do #memcached

    wget https://raw.githubusercontent.com/memcached/memcached-proxylibs/main/lib/routelib/routelib.lua
    

    iniciar o serviço para ver se tudo funciona

    memcached -o proxy_config=routelib,proxy_arg=mc1proxy.lua -p 11211 -u root
    

    até esse momento os serviços devem estar rodando.
    para testar executar telnet no IP do servidor do pgbouncer na porta 11211

    telnet IP_BOUNCER 11211
    

    a tela deve ficar escura, executar o comando

    watch proxyevents
    

    caso alguma coisa não esteja funcionando ele vai começar a colocar na tela as mensagens de erro
    para parar pressione CTRL + ]

    Então, basicamente, o seu NLB vai fazer sua aplicação acessar um dos pools, esse pool vai acessar o endereço local na porta padrão do memcached que é o proxy e quando ele fizer cache de alguma consulta ele vai fazer o shard desse cache em algum memcached.

    Bonus 1

    O script abaixo é para ser usado na GCP, ele deve ser colocado na secret para ser carregado na maquina caso você use Instance Groups, para ter flexibilidade em adicionar ou remover maquinas do pool.

    A ideia dele é ser executado de forma cíclica a cada X minutos ou segundos, ele vai identificar se existe maquina nova no pool de maquinas do Instance Group, adicionar, remover ou não fazer nada no arquivo Lua do proxy e recarregar o serviço caso precise.

    https://github.com/bigleka/gcp/blob/main/gcp_memcached_balancer.sh

    Postgres – Criar estatísticas

    O Postgres é “tão evoluído” e blá blá blá, mas não consegue ter uma rotina de criação automática de estatísticas. Parece que voltamos para antes dos anos 2000.

    Bom, não vou explicar a importância da criação de estatísticas, isso você já deveria saber, se não sabe, pergunta para sua IA favorita.

    A código abaixo vai ler a tabela pg_stat_statements e, a partir dela, eu tento fazer um monte de regex para separar das querys a parte do SARG para criar as estatísticas.

    É preciso adicionar a extensão pg_stat_statements:

    create extension pg_stat_statements;

    Dependendo do seu workload logo em seguida você já vai ter acesso a alguma coisa, mas para ter dados melhores, é melhor deixar o tempo passar para acumular mais informações.

    Depois de algum tempo, quando você rodar o script abaixo, deve ter um resultado mais interessante para criar as estatísticas:

    WITH normalized AS (SELECT queryid
                             , query
                             -- Remove DO $ ... $ e normaliza espaços
                             , lower(regexp_replace(
                regexp_replace(query, '^do \$\$|\\$\$;$', '', 'gi'),
                '\s+', ' ', 'g'
                                     )) AS norm_query
                             , total_exec_time
                             , calls
                        FROM pg_stat_statements
                        WHERE query ILIKE 'select%'
                          AND calls > 10
                          AND query ~* ' where ')
       , tables AS (SELECT n.queryid
                         , n.query
                         , n.norm_query
                         , n.total_exec_time
                         , n.calls
                         -- Captura tabela e alias
                         , (regexp_match(n.norm_query,
                                         '(?:from|join)\s+([a-z0-9_\.]+)(?:\s+as\s+|\s+)?([a-z0-9_]+)?'))[1] AS table_obj_full
                         , (regexp_match(n.norm_query,
                                         '(?:from|join)\s+([a-z0-9_\.]+)(?:\s+as\s+|\s+)?([a-z0-9_]+)?'))[2] AS table_alias
                         , COALESCE(
                (regexp_match(n.norm_query, '(?:from|join)\s+([a-z0-9_\.]+)(?:\s+as\s+|\s+)?([a-z0-9_]+)'))[2],
                split_part((regexp_match(n.norm_query, '(?:from|join)\s+([a-z0-9_\.]+)'))[1], '.', 2)
                           )                                                                                 AS main_identifier_raw
                         , split_part(
                lower(
                        regexp_replace(
                                (regexp_match(n.norm_query, '(?:from|join)\s+([a-z0-9_\.]+)'))[1],
                                '[^a-z0-9_\.]',
                                '',
                                'g'
                        )
                ),
                '.',
                CASE WHEN (regexp_match(n.norm_query, '(?:from|join)\s+([a-z0-9_\.]+)'))[1] LIKE '%.%' THEN 2 ELSE 1 END
                           )                                                                                 AS table_name
                    FROM normalized n
                    WHERE (regexp_match(n.norm_query, '(?:from|join)\s+([a-z0-9_\.]+)'))[1] IS NOT NULL)
       , where_clauses AS (SELECT t.queryid
                                , t.table_name
                                , lower(t.main_identifier_raw) AS main_identifier
                                , (regexp_match(
                t.norm_query,
                'where\s+(.*?)(?:\sgroup by|\sorder by|\slimit|;|$)'
                                   ))[1]                       AS where_block
                                , t.total_exec_time
                                , t.calls
                           FROM tables t)
       , where_columns AS (SELECT wc.table_name
                                , wc.main_identifier
                                , CASE
                                      WHEN rm.m[1] LIKE '%.%' THEN lower(split_part(rm.m[1], '.', 1))
                                      ELSE NULL END                                 AS column_prefix
                                , rm.m[1]                                           AS full_column_name
                                , regexp_replace(rm.m[1], '^[a-z0-9_]+\.', '', 'i') AS column_name
                                , wc.total_exec_time
                           FROM where_clauses wc
                                    CROSS JOIN LATERAL regexp_matches(
                                   wc.where_block,
                                   '([a-z_][a-z0-9_\.]*)\s*(=|>|<|>=|<=|<>|in\b|like\b)',
                                   'gi'
                                                       ) AS rm(m))
       , distinct_columns AS (SELECT table_name
                                   , column_name
                                   , SUM(total_exec_time) AS total_cost
                              FROM where_columns
                              WHERE column_name IS NOT NULL
                                AND (
                                  column_prefix IS NULL
                                      OR column_prefix = main_identifier
                                      OR column_prefix = table_name
                                  )
                              GROUP BY table_name, column_name)
       , validated_columns AS (SELECT dc.table_name
                                    , dc.column_name
                                    , dc.total_cost
                               FROM distinct_columns dc
                                        JOIN information_schema.columns isc
                                             ON lower(isc.table_name) = lower(dc.table_name)
                                                 AND lower(isc.column_name) = lower(dc.column_name)
                               WHERE dc.table_name NOT LIKE 'pg_%')
       , family AS (SELECT vc.table_name
                         , (SELECT array_agg(c)
                            FROM (SELECT column_name AS c
                                  FROM validated_columns sub
                                  WHERE sub.table_name = vc.table_name
                                  ORDER BY total_cost DESC
                                  LIMIT 8) sub2)       AS columns
                         , COUNT(DISTINCT column_name) AS occurrences
                         , SUM(total_cost)             AS total_table_cost
                    FROM validated_columns vc
                    GROUP BY vc.table_name
                    HAVING COUNT(DISTINCT column_name) > 1)
       , correlation_estimate AS (SELECT f.*
                                       , (array_length(f.columns, 1) ^ 1.3) * ln(total_table_cost + 5) AS score
                                  FROM family f)
       , existing_ext_stats AS (SELECT cls.relname                                         AS table_name
                                     , st.stxname                                          AS stat_name
                                     , array_agg(att.attname ORDER BY att.attname)::text[] AS stat_columns
                                FROM pg_statistic_ext st
                                         JOIN pg_class cls ON cls.oid = st.stxrelid
                                         JOIN pg_attribute att
                                              ON att.attrelid = cls.oid
                                                  AND att.attnum = ANY (st.stxkeys)
                                GROUP BY cls.relname, st.stxname)
       , dedup AS (SELECT ce.*
                        , 's_' || regexp_replace(ce.table_name, '[^a-z0-9_]', '', 'g') ||
                          '_' || substr(md5(array_to_string(ce.columns, ',')), 1, 8) AS stat_name
                        , EXISTS (SELECT 1
                                  FROM existing_ext_stats es
                                  WHERE es.table_name = ce.table_name
                                    AND es.stat_columns = ce.columns::text[])        AS already_exists
                   FROM correlation_estimate ce)
    
    SELECT table_name
         , columns
         , occurrences
         , round(score::numeric, 2)                                AS score
         , stat_name
         , 'CREATE STATISTICS IF NOT EXISTS ' ||
           quote_ident(stat_name) ||
           ' ON ' ||
           array_to_string(
                   ARRAY(SELECT quote_ident(c) FROM unnest(columns) c), ', '
           ) ||
           ' FROM ' ||
           quote_ident(table_name) ||
           ';'                                                     AS suggested_cmd
         , 'Filtros mais custosos sobre (' ||
           array_to_string(columns, ', ') ||
           ') em ' || table_name ||
           '. Ocorrências: ' || occurrences ||
           '. Custo total estimado: ' || total_table_cost || 'ms.' AS justification
    FROM dedup
    WHERE NOT already_exists
      AND table_name NOT LIKE 'pg_%'
      AND table_name NOT IN ('table_constraints', 'columns', 'tables')
      AND array_length(columns, 1) > 1
    ORDER BY score DESC, total_table_cost DESC, occurrences DESC;

    Postgres – vacuum e analyze verbose

    Depois de passar algum tempo analisando a saída do vacuum e do analyze, para entre 800 tabelas descobrir quais estão com algum possível problema, montei esse script que, novamente, abre uma tela para fazer essa análise e indicar possíveis necessidades de atenção.

    https://github.com/bigleka/pg-scripts/blob/main/vacuum_analyzer.py

    Ele consegue analisar:

    • Vacuum Analyze ou
    • Verbose Analyze ou
    • ou os dois ao mesmo tempo.

    É bem simples, execute o vacuum verbose e o analyze verbose no seu aplicativo favorito e depois copie e cole o resultado, clique em “Analisar” e pronto, temos alguma coisa para analisar.

    GCP – Clonar instâncias entre projetos

    Imagine o seguinte cenário:

    Você organizou a estrutura da sua conta GCP para trabalhar com projetos distintos;

    Fez as TAGs para poder ter os valores corretos por projeto e saber exatamente quanto cada um custa;

    Agora precisa copiar uns bancos que estão em um servidor CloudSQL de um projeto para outro projeto, fácil certo? Até descobrir que a GCP não faz de forma fácil.

    Mas tem outras formas, da pra gerar um backup do banco, mandar para o bucket, copiar o arquivo de um bucket para outro e depois restaurar, ou usar um servidor intermediário, faz a mesma coisa mas manda o arquivo para esse servidor e restaurado no outro, claro que dá, além de tomar tempo e mesmo que você consiga fazer em paralelo, quantos bancos consegue fazer? o servidor aguenta? o GCP tem limite de IO e operações de Leitura/Gravação no CloudSQL, além de inevitavelmente ser lento.

    Eles tem a opção de restaurar um backup da instância em cima de outra instância no mesmo projeto, então deveria ter a opção de fazer a mesma coisa em projetos distintos, no final das contas eles tem, mas tem que ser feito por meio de requisição de API.

    No link abaixo montei um python que vai carregar suas credenciais, listar os projetos que você tem direito de acesso, lista as instâncias, backups das instâncias e ai te ajuda a restaurar esse backup em uma outra instância em qualquer outro CloudSQL de qualquer outro projeto.

    https://github.com/bigleka/gcp/blob/main/gcp_restore_cloud_database_in_another_project.py

    O app é bem simples:

    • Clique em “Carregar Credenciais GCP”;
    • localize o arquivo json gerado pelo comando.
    • gcloud auth application-default login
    • ele vai carregar a lista dos projetos que você tem acesso nas duas listas de “Origem” e “Destino”;
    • Clicando no projeto ele vai listar as instâncias na combobox abaixo da lista dos projetos.
    • Clicando na instância de banco ele vai listar os backups disponíveis, selecione o backup que você quer restaurar na outra instância.
    • Agora faça a mesma coisa no “Destino”, selecione o projeto e a instância.
    • ELE VAI SOBRESCREVER a instância de destino, apagando e restaurando o backup por cima da instância do destino.
    • e clique em “Restaurar”.

    Como o processo é assíncrono, caso você queira acompanhar o status da restauração, vá para o console da GCP e acompanhe de lá.

    GCP – Copiar flags entre instâncias

    Para quem já trabalhou com o GCP CloudSQL sabe a dor de cabeça que é não ter uma forma fácil de gerenciar as flags das instâncias ou uma forma fácil de copiar as flags de uma instância para outra.

    Parece que eles gostam muito de dificultar uma administração que deveria ser simples.

    Bom, no link abaixo eu montei um outro python que carrega a lista de projetos que você tem acesso, lista as instâncias e as flags, ai você pode escolher quais vão ser aplicadas na outra instância e até editá-las antes de aplicar.

    Trabalho em progresso:

    Ainda estou tentando listar quais as flags que são obrigatórias de ter reboot, então, por enquanto, assuma que todas vão causar algum tipo de reboot no destino (mesmo que não causem).

    https://github.com/bigleka/gcp/blob/main/gcp_copy_cloudsql_flags.py

    Eu sou muito fã de linha de comando, mas como as pessoas gostam de interface gráfica, esse python usa o tkinter para montar uma tela zoada para você ser mais feliz.

    Basicamente, deixei todas as dependências que precisam ser instaladas na parte comentada do código.