Motivadores para ler esse artigo.
- Experiência própria e vivenciada em momentos de caos e momentos de analise tranquila.
- O que busquei para me aprofundar.
- O que aprendi sobre como o spark funciona para otimização.
- O que o Databricks tem de 'plus' para otimização.
- Boas práticas que podem evitar necessidade de tunning e refatoração.
Introdução
Eu sempre tive um grande contato com banco de dados relacional e posteriormente com sistemas distribuídos como o Spark. Inicialmente me aprofundei mais nos SGBD tanto para montar query complexas como administração e principalmente como montar um script performatico para o SGBD. Quando comecei a trabalhar mais com Spark e posteriormente com Databricks inicialmente não tive problemas de performance para os cenários que tive que construir, mas conforme a área de bigdata foi realmente se tornando bigdata já comecei a ter problemas de performance em rotinas que aumentava 30% a cada semana e isso me fez buscar como o spark funciona 'por baixo dos panos', principalmente por que eu já conhecia como um SGBD funcionava e isso ajudou a compreender alguns conceitos que vou trazer aqui.
Breve explicação dos componentes do Apache Spark
Vamos ser breves já que quero que este artigo foque nos cenários de análise de performance, técnicas e boas práticas.
Spark Core:
Esse componente é a base do Spark, ele é responsável pelo gerenciamento de memória, tarefas, recover de falhas, gerenciamento I/O, ou seja ele manipula o RDD. Portanto, é um cara que tem boa parte do cluster.
Executors:
Esse componente é o trabalhador real do ecosistema do spark (cluster), ele quem recebe as ordens (tarefas) de escrita ou leitura, que pode ser em disco, memória ou ambos (vou explicar mais à frente por que isso entra em cenários de performance).
Workers:
Os workers são literalmente o que é para quem já esta familiarizada com computação distribuída, são os nós do cluster, portanto é ele que 'hospeda' os executors que mencionei acima, cada worker pode conter um ou mais executors. Ele é o responsável por gerenciar os recursos alocados aos executors, como se o executor fosse um auxiliar e o worker o almoxarifado. E se ele é o almoxarifado a quem ele se reporta?
Cluster Manager:
Esse é o gerente, ele gerencia recursos (Memória e CPU) para os workers, ele quem decide quantos executors será para cada aplicação e o quanto de recurso vai ser alocado, gerencia as tarefas enviadas pelo 'chefe' dele que vou explicar mais abaixo, e por ser um cargo mais elevado de responsabilidade ele monitora também o estado do cluster para recover de falhas, redistribuir tarefas conforme necessário. (OBS: existem vários tipos de gerenciadores de cluster: Yarn, mesos, kubernetes e o mais simples que é standalone).
SparkContext:
Bom esse é dígamos assim o chefão ou a porta de entrada, digo porta de entrada por que qualquer aplicação Spark vai passar por ele, ele que permite que a aplicação interaja com o cluster, ou seja, os workers e executors, ele que permite e gerência as tarefas entre os workers e desta forma ele gerencia toda a aplicação a nível de configuração, quantidade de executors e recursos como memoria. Precisa saber como está a execução das tarefas? fala com esse chefe aqui.
Portanto, de forma ilustrativa:
Agora vamos falar de peformance, tunning, rapido, veloz e tudo que você ouvir de diferentes cargos.
Quando atuava com a parte mais de banco relacional e existia problema de performance principalmente em procedures ou functions ou uma query de uma aplicação eu analisava os seguintes aspectos:
- Em que momento está rodando esse script e como está o servidor nesse momento?
- Alguém esta concorrendo em recurso ou lock de tabelas?
- Ta tudo suave, ninguém lockando (bloqueio) os recursos do servidor estão bons, beleza...
- Agora deixa eu ver o script, a lógica dele esta performática? Ou seja, quem fez pensou em leitura/escrita em conjunto ou pensou em linha a linha (vicio de programação), está consultando colunas de mais que não precisava, consultas monstruosas com subquery, CTE etc.. ? Todos esses pontos eu modificava (refatoração) e testava tanto a velocidade da resposta como o uso de recursos do servidor. Por que estou explicando isso tudo, sendo que vamos falar do Apache Spark? Então.... isso se aplica também ao Spark e de forma eu diria até mais complexa, mas vamos chegar lá.
- Acho que por último, se o script estava legal eu analisava o 'caminho das pedras', ou seja, o plano de execução estimado e o plano de execução real. A partir disso, poderia entender o que o SGBD estava fazendo com suas estatísticas (histograma) e qual caminho ele presumia seguir com suas informações e qual foi a realidade, qual caminho foi seguido. E ai poderia identificar pontos como: um filtro a mais na query, um JOIN mais performático e até a criação de indice ou tabelas temporárias.
Bom, acho que é isso, agora o que esses pontos têm em comum com o Apache Spark?
- Script não pensado em manipulação em conjunto + distribuído (falei que Spark tem um 'plus' de dificuldade rsrs).
- Horário que determinada rotina esta rodando, se um Job Spark simples estiver rodando no mesmo cluster de outro Job performático (ou até não) que esta consumindo todos os recursos. (Olha uma espécie do famoso lock de SGBD aqui).
- E por fim, sim, o Apache Spark tem um plano de execução, sendo mais preciso, ele tem os seguintes estágios:
- Plano lógico.
- Plano Físico.
- Estratégia de execução.
- Às vezes mostra o custo estimado.
Resumindo o que é cada um, apesar do nome já da para ter uma noção:
Plano Lógico:
Representa a consulta original como uma série de operações lógicas. É a forma abstrata da consulta, sem considerar como ela será realmente executada. Inclui informações sobre as operações que serão realizadas, como filtragem, seleção, junção, agregação e as 'coisinhas' erradas também rsrs.
Plano Físico:
Detalha como o Spark executará realmente a consulta. Isso inclui a ordem das operações e quais algoritmos serão utilizados (estilo os algorítimos do SGBD). Pode incluir detalhes sobre como os dados serão particionados e distribuídos entre os executores.
Estratégias de Execução:
O plano físico pode mostrar diferentes estratégias de execução que o Spark pode usar, como "Broadcast Join" ou "Shuffle Hash Join", dependendo da operação e do tamanho dos dados. Vou explicar também sobre os principais algorítimos do plano de execução, calma...
Custo Estimado:
Embora não seja sempre exibido, alguns planos podem incluir estimativas de custo para diferentes partes do plano, ajudando a entender qual parte do processamento pode ser mais custosa.
Formas de ver o plano de execução do Apache Spark
Temos a forma 'raiz' que seria textualmente, utilizando o comando explain()
e ele tera uma saída semelhante a essa abaixo mostrando um filtro simples e um dataframe:
== Physical Plan ==
*(2) Filter (Value > 1)
+- *(2) Project [Name#0, Value#1]
+- *(1) Scan ExistingRDD[Name#0, Value#1]
E objetivamente temos como analisar via interface, através da Spark UI, no databricks é faço acessar ela, seja nas execuções das células, seja no job ou no cluster. No Apache Spark é diretamente o IP na porta padrão 4040.
A Spark UI é dividida em várias seções úteis:
Jobs: Mostra uma lista de todos os jobs executados na aplicação. Cada job corresponde a uma ação (action) no seu código.
Stages: Exibe os estágios (stages) que compõem cada job. Os estágios são subdivisões do trabalho que podem ser executadas em paralelo.
Tasks: Detalha as tarefas individuais dentro de cada estágio, incluindo informações sobre o tempo de execução e o status das tarefas.
Storage: Fornece informações sobre o uso de memória e armazenamento dos RDDs (Resilient Distributed Datasets).
Environment: Mostra as propriedades do ambiente de execução, incluindo configurações do Spark e variáveis do sistema.
Executors: Exibe informações sobre os executores criados para a aplicação, incluindo uso de memória, disco e estatísticas de desempenho.
Aqui eu fui hierárquico, tá? É nessa ordem que a coisa funciona.
Eu quero imagens coloca na tela!!
Algoritimos do Spark e como saber quem são os ofensores de tunning:
Primeiramente, vou explicar os principais algorítimos que são demonstrados tanto na interface do Spark UI como no plano de execução, seja o lógico ou o plano físico:
OBS: Lembrando que datasets é o mesmo aqui que uma tabela Spark ;)
1. Vamos começar com o mais famoso Scan:
- FileScan: Lê dados de arquivos de entrada. Pode ser otimizado para diferentes formatos de arquivo como parquet, ORC, CSV, JSON e sei la quais outros.
2. Join (Esse da uns B.O):
Broadcast Hash Join: Utilizado quando um dos datasets é pequeno o suficiente para ser transmitido para todos os nós do cluster, evitando o Shuffle (vou explicar mais sobre esse danado, mas resumidamente é uma operação de embaralhamento dos dados para junção final).
Shuffle Hash Join: Ambos os datasets (tabelas se preferir) são embaralhos para que as chaves correspondentes fiquem na mesma partição. É usado quando os datasets são grandes e não podem ser transmitidos para outros nós.
Sort Merge Join: Requer que ambos os datasets sejam ordenados antes do Join. É eficiente para grandes datasets que já estão particionados e ordenados, ou seja o join esta sendo feito por colunas particionadas e também ordenadas (ex:
df.write.partitionBy("coluna1").sortBy("coluna2").parquet("caminho/para/salvar/particionado")
3. Aggregation (sum, count, group by etc...):
HashAggregate: utiliza uma tabela hash para agregar dados. É eficiente para grande conjunto de dados que cabem na memória.
SortAggregate. Agrega dados após ordená-los. É usado quando os dados não cabem na memória.
4. Shuffle (Atenção nesse cara):
- Shuffle: Redistribui dados entre partições para operações que exigem reorganização, como joins e agregações. É uma operação cara em termos de I/O e rede.
5. Exchange:
- Muda a distribuição dos dados entre as partições. Pode ser usado para balancear a carga de trabalho entre os nós do cluster. (uma estratégia para balancear e fugir do shuffle)
6. Project:
- Seleciona um subconjunto de colunas de um DataFrame ou RDD.
7. Filter:
- Aplica condições para filtrar linhas de dados.
8. Sort:
- Ordena os dados com base em uma ou mais colunas.
Todos esses algoritimos acima podem ser observados como eu disse anteriormente através do comando explain()
.
Cenários reais de problemas com Shuffle:
1. Operações de Join e GroupBy
Operações como join() e groupByKey() acionam frequentemente o shuffle, que redistribui os dados entre as partições. Isso pode resultar em:
Alto uso de E/S de disco: O shuffle gera muitos arquivos intermediários, o que pode saturar o disco local dos executores.
Carga elevada na rede: A quantidade de dados transferidos entre executores pode ser substancial, dependendo do número de conexões necessárias (número de mappers multiplicado pelo número de reducers)
- Identificação: No Spark UI, na aba Stage, verifique os valores de Shuffle Read Size/Records e Shuffle Spill (Disk). Um alto volume nessas métricas indica um problema potencial.
- Desbalanceamento de Partições (Data Skew) Quando os dados são distribuídos desigualmente entre as partições, algumas tarefas podem demorar muito mais do que outras, resultando em um desempenho geral comprometido. A identificação é a mesma, vai na spark UI vai no job referente ao trecho que esta demorando (aqui entra um ponto de boa prática que vou dizer mais abaixo) e verifica o stage travado (esta como running, mas não avança) e veja as métricas de Shuffle, em geral, alto volume em memória e começando a ter volume em disco conforme você for dando um refresh indica que esse desbalanceamento topou a memória e começou a escrita no disco e obviamente disco é mais lento, aí senta e chora se deixar esse cenário.
Mitigação
- Para mitigar problemas relacionados ao shuffle:
Reduza operações que causam shuffle: Sempre que possível, minimize o
Uso de groupByKey() e prefira reduceByKey(). Ajuste o número de
Partições: Use
spark.sql.shuffle.partitions
para ajustar o número de Partições durante operações de shuffle. Utilize técnicas como Broadcast Joins: Para unir grandes conjuntos de Dados com conjuntos menores, evitando assim o shuffle desnecessário.
Métricas do shuffle na Spark UI:
Como o shuffle atua e por que ele é custoso:
Por ultimo e talvez o mais importante - Boas práticas:
Grande maioria trabalha com notebook devido a grande popularidade do Databricks, jupyter notebook e Google Colab. Portanto, divida cada consulta ou transformação em celulas separadas, isso facilita identificar qual parte esta o problema de performance. Deixar tudo junto ficam vários jobs e dificuldade saber qual é a etapa.
Faça uso de Merge ao inves de Overwrite, eu sei que é mais trabalhoso, mas é mais lógico e performatico, visto que o Merge vai utilizar menos I/O do que um 'dump' overwrite de toda a tabela novamente no datalake.
Use cache() ou persist() para armazenar dados intermediários em memória, especialmente se eles forem reutilizados em várias operações. Isso pode reduzir o tempo de recomputação e melhorar o desempenho.
Caso você não saiba o spark roda em uma JVM portanto é nativamente Java, quando você cria as famosas UDF - User Definition Function com python você deixa uma espécie de "caixa preta" para o Spark, impedindo otimizações automáticas. Sempre que possível, use funções embutidas do Spark SQL, otimizadas para desempenho.
Bom, acho que escrevi tudo que estava em mente, gosto de escrever artigos porque me ajuda a relembrar alguns cenários. Pretendo gravar um vídeo mostrando tudo isso, na prática com algum dado público, provavelmente vou pegar no kaggle então me siga no linkedin para acompanhar tudo referente ao mundo de dados, inteligência artifical e desenvolvimento de software
--> https://www.linkedin.com/in/airton-lira-junior-6b81a661
Me segue lá no LinkedIn dá uma força, gosto de feedbacks também sou totalmente aberto para melhorar o compartilhamento de conhecimento.
Se você leu até aqui, parabéns!!! Espero que domine todos os problemas de performance. Próximo artigo, vou abordar as vantagens com o Databricks então segue lá no LinkedIn para ficar sabendo. Obrigado!!
Top comments (0)