top of page
Blog SO Labs na Íntegra
Writer's pictureDaniel Sanches

Data Ingestion, Cloud GCP (Argo CI/CD + GitHub + Kafka (AKHQ, MirrorMaker+)+Lake+CDC+Snapshot

Updated: Sep 12, 2022



Na era da transformação dos sistemas desenvolvidos na arquitetura monolítica para a arquitetura de microsserviços, temos visto um movimento cada vez maior das empresas interessadas em adotar a tecnologia Kafka:

Isso se deve, principalmente, pelas vantagens que ela gera aos negócios, como:

  • Melhora do desempenho do processamento de grandes volumes de dados;

  • Lidar com o evento no momento adequado;

  • Antecipação de futuros eventos;

  • Facilitação da integração entre sistemas,

  • Simplificação da infraestrutura de dados, entre outras vantagens.



Para te ajudar a entender mais sobre essa tecnologia, preparamos um post completo sobre o que

é Kafka e todas as tecnologias que ele trabalhará em conjunto (Data Ingestion, Cloud GCP (Argo CI/CD + GitHub + Kakfa (AKHQ, MirrorMaker+)+Lake+CDC+Snapshot), como ele funciona e por que utilizá-la no seu negócio.



Vamos lá?


kafka On-premise


1. Verificação de Ativação de Tabelas, Se Já tem CDC Ativo

Para adicionar tavelas do MS SQL Server acesse o gerenciador de banco de dados, SGBD do Microsoft SQL Server, conecte no servidor onde a tabela se encontra e execute a consulta abaixo:


Obs.: No nosso exemplo usamos o servidor "nome do servidor" e consultamos o banco "nome do banco". O CDC das tabelas de um banco de dados Oracle por exemplo, tem o CDC ativo por padrão, ou pode não estar ativo para nenhuma tabela, mas por padrão, é ativo para todas as tabelas.


O script para checagem de CDC ativo é:



SELECT name, is_tracked_by_cdc 
FROM "nome do servidor".sys.tables 
WHERE is_tracked_by_cdc; 

Caso a tabela tenha o valor_is_tracked_by_cdc igual a 1, significa que o CDC está ativo:


Exemplo:


Colunas:

name

VENDA

VENDA_PARCELAS

VENDA_PRODUTO

VENDA_CLIENTE

VENDA_TROCA

VENDA_VENDEDOR


id_tracked_by_cdc

1

1

1

1

1

1


id = 1, logo, cdc ativo.



2. Acessar a Interface do Servidor Kafla On-primise e Encontrar o Conector

Neste passo será necessário estar com a VPN ativa (caso esteja em um ambiente corporativo) e com a rota configurada para os servidores onde está o On-primise da organização:


Navegue utilizando a barra de ferramentas a esquerda e acessa a aba de conectores


Exemplo interativo no AKHQ


Utilizando a ferramenta de pesquisa, encontre o conector no qual será necessário adicionar uma tabela.



3. Salvar Parâmetros do Conector Localmente e Criar Parâmetros para o Snapshot

Após clicar no conector, ao expandir será possível acessar um JSON com parâmetros do conector, neste momento será necessário armazenar os dados do conector de uma maneira local (VSCode), para ser modificado nos próximos passos.


Parâmetros no AKHQ


Exemplo de um arquivo de parâmetros de conector:



4. Deletando o Conector Atual

Utilizando uma API (via Postman), execute um DELETE para o caminho abaixo, adicionando o nome do conector ao endereço do banco de dados.



http://172.17.0.204:8083/connectors/space_db-debezium-sqlserver-connector 

spacedb-debezium-sqlserver-connector é um exemplo, qualquer conector já criado pode ser deletado assim.



5. Criar Conector Temporário para Gerar o Snapshot das Novas Tabelas

Altere os parâmetros do conector


  • Mude o nome do conector adicionando alguns alias que relacione com a modificação atual (ex: data e horário atual);

  • Lista de tópicos para serem conectados, remova então todos e adicione apenas as tabelas que serão adicionadas;

  • Adicione o parâmetro:


{
	"snapshot.mode": "initial_only"
}
	
  • Altere a estrutura do JSON para o novo formato abaixo:



{
	"name": "nome-do-conecot",	
	"config": {
		... parametros do Kafka do AKHQ
}
		

Após ter o novo JSON de parâmetros, após seguir as modificações requeridas acima, execute um POST para o endereço abaixo com o novo JSON como body-raw json format


POST via Postman do JSON


Após ter o novo JSON de parâmetros, e após seguir as modificações requerida acima, execute um POST para o endereço abaixo com o novo JSON como body-raw json format.



6. Verificar no Kafka On-primise (AKHQ) se a conexão foi criada

Após acessar a interface, navegue até:

  • Tópicos e busque pela conexão recém criada, e confirme os parâmetros.


7. Checar Volumetria

Checar volumetria entre a tabela no banco (count *) e o que está disponível no Kafka:


Exemplo de análise de volumetria pelo COUNT


Quando o valor for similar (pode haver uma diferença caso tenha chegado novos records no banco de dados nesse tempo), então signitica que o snapshop foi concluído.


8. Deletar Conector Temporário

Após a volumetria ter sido confirmada, delete o conector temporário. Execute um DELETE para o caminho abaixo, adicionando o nome do conector temporário.



http://172.17.0.204:8083/connectors/<nome do conector temporário>


9. Criar Conector Original

Após deletar o conector temporário, então o desenvovledor deverá recriar o conector original:


  • No table.include.list colocar então todas as tabelas iniciais e adicionar as novas. Exemplo:



{
	"nome": "nome-do-conector",
	"config": {
		...
		"table.include.list": "dbo.VENDA_PESSOA, dbo.VENDA_POR_TAMANHO",
		...
} 
		

  • Remover o parâmetro "snapshot.mode": "initial_only"


As tabelas em negrito são as novas.


Depois, devemo então ir na aba topics e checar se os tópicos dessas tabelas foram criados:


Exemplo de tópicos no AKHQ



kafka Data Lake


Nessa etapa nós vamos ativar o MirrorMaker para realizar o espelhamento dos dados que estão no Kafka de On-premise para o Kafka do Data Lake. Para isso, pe importante se certificar de que as tabelas que deseja espelhar já estão no Kafka On-premise.



1. Desligando o Sync

Para isso, vamos acessar o Argo CD utilizando-o dentro de um POD localizado no GKE. Com esse objetivo, iniicamos o VS Code, acessamos a sua extensão do Kubernetes e selecionamos o namespace do Argo CD.


Para verificar se o namespace está realmente em uso é importante verificar se apareceu um "*" (asterisco) ao lado do nome. Em seguida, vamos executar um Port Forward para que possamos acessar o POD que está rodando na Cloud diretamente da nossa máquina local. Ainda utilizando a extensão para o Kubernetes do VS Code, vamos procurar por Workloads, Pods, argocd-server-55983jawlawd-39ksa, clicando com o botão direito em Pods e selecionando Port Foward


Exemplo de acesso ao Pods



Devemos ficar atentos a porta selecionada no processo de Port Foward que, por padrão, é a porta 8080, mas pode ser alterada conforme a necessidade.


Feito isso, devemos acessar o Argo CD por meio do seu navegador utilizando o endereço localhost: 8080 e utilizar o login e senha que são:



Login: admin
Senha: 03msu39402lkajsJIDEKShahncaW3rfsL

Interface do Argo CD


Depois de realizar o login na plataforma do Argo CD vamos selecionar o kafka-mirrormaker


Exemplo de interface


Acessando o App Details


Exemplo acessando App Details


E verificamos se o Sync está ativo, caso positivo, desativamos o Sync


Em seguida deletamos o mm-data-lake-v4


Confirmamos a ação copiando e colando o nome mm-data-lake-v4 no local indicado


2. Adicionando Novas Tabelas

Para adicionar novas tabelas devemos acessar o repositório space-architecture e verificar se existem novas atualizações que foram feitas nele. Para isso, deve-se utilizar um terminal, que pode ser o do próprio VS Code, acessar a pasta do repositório e digitar o comando



git fetch --all

será necessário que você digite a sua senha. Caso haja alguma alteração deve ser trazuda para o repositório local utilizando o comando



git pull origin main

mais uma vez será solicitado a sua senha: Agora, para fazer isso via Pull Request devemos criar uma nova branch. Para isso use o comando:



git checkout -b "NomeDaBranch"

Feito isso, devemos adicionar as tabelas que desejamos. Assim vamos navegar para o diretório applications, strimzi-mirrormaker2 e abrir o arquivo yaml mm-data-lake-yaml. Com o arquivo aberto vamos alterar o parâmetro substituindo as tabelas a serem inseridas pelas antigas.


ATENÇÃO: VAMOS PRECISAR DAS TABELAS ANTIGAS NO FUTURO, ENTÃO NÃO APAGUE AS MESMAS. VOCÊ PODE SALVAR ESSA LINHA EM OUTRO DOCUMENTO OU SIMPLESMENTE COMENTÁ-LA.


Feito isso, vamos salvar as alterações no diretório local, enviar para o repositório e solicitar que essa alteração seja incluída na branch principal (main), para isso utilize os comandos



git add .
git commit -m "Feat: Add New Tables"
git pull origin <NomeDaBranch>

Vá para o repositório dentro do GitHub e clique no botão Compere & Pull Request


Exemplo de operação em Pull Request no GitHub



Depois clique no botão Create Pull Request


Feito isso deve-se aguardar alguém com a devida autorização aceitar a modificação. Depois da solicitação de alteração ser aceita deve-se retornar ao Argo CD para reativas o Sync. Para isso clique no botão Sync, e depois em SYNCHRONIZE


Botão na Íntegra


Agora devemos ativar o Auto Sync. Vá em App Details e habilite o auto sync



3. Verificar Processo e Retornar Conjunto de Tabelas

Para verificar se os dados realmente estão sendo espelhados corretamente devemos acessar o AKHQ do GCP. Para isso, retorne ao VS Code e, dentro da extensão do Kubernetes, utilize o Namespace ingestion e realize a execução do Port Foward no Pod do AKHQ.


Como você já deve estar utilizando a porta 8080 para acessar o Argo CD altere a porta para 8081.


Agora devemos acessar o AKHQ por meio do endereço localhost: 8081 e, dentro de eih-"on-premise", ir até a seção Topics e verificar se os tópicos já foram criados. Para isso, utilize a ferramenta de busca e digite o nome dos tópicos que foram criados.


Caso todos os tópicos já estejam inseridos no eih-"on-premise", devemos retornar ao VS Code para juntar as tabelas recém inseridas com as mais antigas. Em primeiro lugar devemos atualizar o repositório local visto que a solicitação de alteração na branch principal já foi aceita. Para realizar essa alteração use:



git fetch --all
git pull origin main

Crie uma nova branch para realizar as modificações e posteriormente solicitar a inclusão de modificações na branch principal:



git checkout -b "NomeDaBranch"

Agora acesse a pasta local do repositório space-architecture, e vá em application. Em seguida strimzi-kafka, 05-kafka-mirrormaker2 e abra o arquivo yaml mm2-data-lake.yaml. Com o arquivo aberto devemos procurar o parâmetro TopicsPattern onde existam dois, um novo com as tabelas que acabamos de inserir e outro com as tabelas antigas que colocamos como comentário. Vamos copiar o nome das tabelas que acabamos de inserir, colar a mesma junto com as tabelas antigas, apagar o topicsPattern com as tabelas novas que estão sem o símbolo de comentário e remover o símbolo de comentário do parâmetro topicsPattern que havia sido transformado em comentário anteriormemte.


Repita o processo de salvar e atualizar o repositório online com as novas modificações


Para isso deve-se utilizar o conjunto de comandos:



git add .
git commit -m "Feat: Add New Tables"
git push origin <NomeDaBranch>

Vá para o repositório dentro do GitHub e clique no botão Compare & Pull Request.


Acessando o botão citado



GCP Storage Data Lake


Para ativar o Sink devemos retornar ao Argo CD, entrar na aplicação kafka-connector-sink e verificar se o SYNC está desabilitado (por padrão está sempre habilitado).


Para isso, retorne ao VSCode, acesse o namespace do Argo CD.


Realize um Port Foward no Pods... argocd-server-09390daoks39-lzm405x


Devemos ficar atentos a porta selecionada no processo de Port Foward que, por padrão, é a porta 8080, mas pode ser alterada conforme a necessidade.


Feito isso, devemos acessar o Argo CD por meio do seu navegador utilizando o endereço localhost:8080 (ou a porta escolhida durante o processo de Port Foward) e utilizar o login e senha que são (caso sejam solicitadas)



Login: admin
Senha: 03msu39402lkajsJIDEKShahncaW3rfsL

Interface do Argo CD


Depois de realizar o login na plataforma do Argo CD vamos selecionar o kafka-connector-sink-gcs


kafka-connector-sink-gcs


Na sequência será redirecionado para uma página como essa:

Operando com o Argo CD


Você irá vefificar se o Sync está ativo. Para isso clique no botão App Details, role até o final da página e verifica se ele está ativo.


Se ele tiver ativo, clique em Desabilitar.


Na segunda camada de apçicações você vai encontrar uma lista de Sinks (um Sink para cada banco de dados). Procure o Sink do banco de dados cujo seja o banco que você gostaria de efetuar a ingestão das novas tabelas e delete ele.


Obs.: Aqui exibimos um exemplo para o conector do banco SPACE_DB (nome fictício de um banco de dados da Space_One Labs que usamos para contextualizar esse case) mas no seu caso use o conector ao banco de dados de origem de suas tabelas.


Conforme a ação copiando e colando o nome do conector no local indicado para tal.


Seguindo então, devemos esperar a atualização do conector pois esse processo deverá mudar.


Done!


Encerramos então o processo dessa postagem e na sequência ficaremos então com o conclusivo do Caso de Uso.


__________________________________________________________________________________


Conclusivo do Caso de Uso

Implantação do Kafka: como utilizamos esse conjunto de tecnologias?


Tivemos a oportunidade de apoiar uma implantação de tecnologia Kafka em uma empresa do segmento de saúde (exemplo fictício).


Quando um usuário, que era segurado pela empresa de saúde, precisava realizar um procedimento médico, a abertura da solicitação era feita através de um hospital ou clínica, que submetia o processo para que a empresa de saúde realizasse a análise (se o plano do usuário cobria, custos envolvidos, etc.) para a aprovação ou não da requisição.


Antes de implantar o Kafka, o processo de aprovação levava 24hs devido ao grande olume de solicitações, formatos distintos enviados pelas credenciados e capacidade de processamento.


Após a adoção do Kafka, o processo passou a levar 3 minutos!


Além do processamento próximo do tempo real, a empresa ganhou outros benefícios operacionais, como:


  • Aumento da satisfação do usuário,

  • redução de riscos com procedimentos médicos incorretos;

  • Simplificação da integração dos dados com os credenciados;

  • Redução de gargalos na transformação dos dados.


Em um contexto no qual temos a geração de um volume crescente de informações, ter acesso a elas é fundamental para as empresas. Nesse cenário, a ingestão de dados mostra-se fundamental, pois ela proporciona a absorção e a transferência das informações para um local adequado.


Desse modo, podemos dizer que esse conceito vai ajudar o seu negócio a ter uma boa estrutura e hierarquia de dados. Essas informações ajudam os gestores a fazerem projeções assertivas a curto, médio e longo prazo.


Organizações que se dedicam em estruturar os processos relacionados à estrutura de dados obtém vantagens competitivas no mercado. Isso porque, ao ter acesso a um grande volume de informações e analisá-las, é possível obter insights que poderão ajudar no crescimento do seu negócio.


Pensando na importância desse tema, neste conteúdo explicaremos o que é a ingestão de dados, abordaremos as principais formas de realizá-la, informaremos sobre os principais desafios e discutiremos sobre os benefícios proporcionados por essa prática. Continue a leitura e acompanhe a seguir.


Os principais desafios na realidade de uma ingestão de dados


Muitos dos desafios relacionados à ingestão de dados estão relacionados à complexidade do procedimento. Nesse cenário, é fundamental entendê-los para estar preparado(a) para lidar com essas questões.


Os desafios serão grandes, mas identificando-os e realizando ações assertivas é possível que nada atrapalhe o dinamismo dos seus negócios. Acompanhe:


Maior Complexidade 


Com o aumento do volume de dados, as empresas têm encontrado dificuldades para fazer a integração e extrair valor das informações.


Processos Lentos


Escrever códigos com o objetivo de ingerir, criar e armazenar dados pode ser complicado, pois há um grande volume de informações. Por isso, os processos relacionados à ingestão de dados podem ser lentos.


Menor Confiabilidade


A ingestão incorreta de informações pode comprometer a confiabilidade, o que contribui para a interrupção da comunicação e a perda de dados. Sendo assim, a sua organização fica mais vulnerável à proteção e a segurança das informações.


Custos Superiores 


A ingestão de dados gera vários custos. Entre eles estão a infraestrutura necessária para fornecer suporte para as fontes de dados, manter uma equipe completa de cientistas da tecnologia e entre outros mecanismos complementares.


Na prática: como funciona uma ingestão de dados?


O conhecimento sobre como funciona a ingestão de dados vai ajudar as empresas a implementarem a prática de forma eficiente e eficaz. Desse modo, na sequência do conteúdo abordaremos sobre os passos necessários para implementar essa prática no seu negócio.


Antecipação das Dificuldades


Essa questão acontece porque o volume de dados aumenta. Nesse cenário, é fundamental antecipar as dificuldades para estar preparado para lidar com essas situações.


Com antecipação e organização, é possível prever problemas e assim o seu negócio não fica prejudicado e/ou paralisado durante a rotina de atividades.


Automatize os Processos


Com o aumento do volume de dados é fundamental não depender somente de tarefas manuais. Automatizar processos é um fator essencial.


Desse modo, torna-se possível economizar tempo e otimizar a produtividade. A automação ajuda a reduzir o número de erros e, consequentemente, custos superiores.


Autoatendimento e Habilidades com os Dados


O autoatendimento na ingestão de dados possibilita a capacitação dos usuários. Dessa forma, lidar com os processos e os negócios se torna uma tarefa mais simples com a intervenção mínima da equipe de TI.


Com a cultura do conhecimento, é possível que seus colaboradores gerem maiores habilidades com os dados e programe planos ativos e estratégicos para insights valiosos.


Embrulhar


Como já mencionamos anteriormente, a ingestão de dados possibilita que os dados sejam “embrulhados” e, assim, permite que a empresa trabalhe com vários tipos de dados e esquemas.


Práticas eficientes de ingestão de dados possibilita ações positivas e aprimora os processos tornando-os transparentes, rápidos, dinâmicos e livres de falhas.


__________________________________________________________________________________


Notas e Relacionados


> Inicialmente o nosso propósito com o Blog é efetuar postagens diversas, porém teremos a área separada para as postagens relacionadas ao Constructor SO, que é o nosso Portfólio de Projetos, Agiles e Scrum, em que cada membro do Constructor SO possui a sua área para os seus desenvolvimentos. Dessa forma, cada atualização da área do Constructor SO é seguida de uma postagem no blog do profissional, informando os nossos leitores e criando assim um panorama extensivo de tal trabalho lançado ou versionado;


> A priori em relação aos desenvolvimentos da Space_One Labs, a nossa ideia é lançar e trabalhar de forma aleatória vários projetos da área específica relacionada, não nos tornando assim limitados por apps ou softwares específicos;


> Todos os casos aqui descritos e desenvolvidos, para este blog, no qual me pertence, que seja da categoria "BI Case", são casos de empresas fictícias, criadas e inventadas, para contextualizar e deixar o trabalho mais vivo e realista possível.



__________________________________________________________________________________


Daniel Sanches


Engenheiro de Produção, Universo - Universidade Salgado de Oliveira, especializado em Analytics e Data, Business e Tecnologia.


SO Labs Developer Member of Research, Business Intelligence Analyst, Data Analyst and Business Analyst, Data Engineer, Blogger DS Space_One Labs | Space Members


Membro SO Labs Desenvolvedor de Pesquisas, Business Intelligence, Data Engineer, Data Analyst e Negócios

Comments


bottom of page