Alguns meses atrás foi delegado a mim uma tarefa na qual eu não possuía muita experiencia e familiaridade com tema, se tratava da escrita de um produtor (Producer) do Apache Kafka em uma aplicação Java.
Minha primeira ideia foi utilizar as próprias bibliotecas do Kafka para implementar este Producer. Porém, após trocar algumas ideias com meus pares chegamos na conclusão que a melhor maneira era utilizar os frameworks do ecossistema Spring que visam facilitar a escrita de serviços de mensageria como o Spring For Apache Kafka
Ao utilizar o Spring for Apache Kafka ganhamos diversos benefícios como a configuração de beans através arquivo de configuração independente se é através de YAML ou Properties. E as abstrações que facilitam a inserção e o consumo de mensagens em tópicos.
Outra vantagem oferecida junto ao starter Spring For Kafka são as abstrações do Spring Kafka Test que permitem que instâncias do Zookeeper e Kafka Embedded sejam levantadas para que você teste de forma integrada seus produtores e consumidores.
Spring Kafka Test prove ferramentas que auxiliam a escrita de abstrações para facilitar como times escrevam testes integrados junto ao brooker embedded.
Para melhor entendimento de como escrever bons testes utilizando a ferramenta, imagine que em um ecomerce é necessário que os dados das vendas sejam integrados a outros sistemas através de um tópico utilizando Apache Kafka.
@Component
@Validated
public class VendaProducer {
@Value("${spring.kafka.producer.topic}")
private String topic;
@Autowired
private KafkaTemplate<String, Venda> templateMessage;
public void produzir(@Valid Venda venda){
templateMessage.send(topic, venda);
}
}
Os demais detalhes do código acima estão disponíveis no seguinte repositório.
Acima está o código do Producer implementado, que tem a responsabilidade de receber os dados relativos da venda, aplicar validações, e caso nenhuma restrição seja violada a mensagem é inserida no tópico.
Agora nos concentraremos em configurar o ambiente de testes com Spring Kafka Test para utilizar KafkaEmbeddedBroker
para escrita dos testes de integração.
Configurando o KafkaEmbeddedBroker
para fazer Testes Integrados
Então inciaremos criando uma classe de teste para o VendaProducer
, definiremos algumas propriedades para habilitar o uso do EmbeddedBroker. Inicialmente iremos definir:
- Tópico que sera utilizado
- Numero de partições
- Endereço que o broker esta disponível.
As configurações acima devem ser feitas através da anotação @EmbeddedKafka
ao nível de classe de teste. O tópico a ser utilizado pode ser informado via variável de ambiente com Spring Expresion Language (SpEL) , ou texto hardcoded. Como teremos um único broker levantado junto ao Context do Spring a sugestão é que possuam uma única partição. E quanto ao endereço o Spring Boot oferece uma propriedade que faz o bind do endereço de forma automática para você, basta definir como: spring.kafka.bootstrap-servers
.
Veja o exemplo abaixo:
@SpringBootTest
@EmbeddedKafka(
topics = "vendas",
partitions = 1,
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
)
public class VendaProducerTest {}
Agora precisaremos de uma instância do broker para escrever um consumidor para o tópico que iremos publicar uma mensagem. Uma das formas de disponibilizar acesso à abstração do broker embedded é fazer uso da Injeção de Dependência.
@SpringBootTest
@EmbeddedKafka(
topics = "vendas",
partitions = 1,
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
)
public class VendaProducerTest {
@Autowired
private EmbeddedKafkaBroker broker;
}
O próximo passo é construir junto a abstração KafkaTestUtils
um Consumidor para o tópico que publicaremos as mensagens. Vamos dividir esta etapa nas seguintes fases:
- Definição das propriedades do Consumer
- Criação da DefaultConsumerFactory
- Criação do Consumer
Definição das propriedades do Consumer
Nesta etapa iremos definir configurações básicas do consumidor, como se trabalhará em modo auto_comit, quais são os Deserializer para chave (key) e value (valor) das nossas mensagens, e também quais os pacotes onde o Deserializer escolhido ira se basear para encontrar as classes que representam mensagens no projeto.
Outra configuração que deve ser feita, é definir a política de leitura de offset, como queremos que todas as mensagens enviadas em um casos de teste sejam consumidas, o ideal é que a propriedade ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
seja definida como earliest
conforme sugerido pela própria documentação da Spring Kafka Test.
Após definir as propriedades, iremos utilizar a abstração DefaultKafkaConsumerFactory
, para obter a instância do consumidor que será utilizado nos testes.
Veja o código correspondendo na classe VendaProducerTest
demostrada abaixo:
@SpringBootTest
@EmbeddedKafka(
topics = "vendas",
partitions = 1,
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
)
public class VendaProducerTest {
@Autowired
private EmbeddedKafkaBroker broker;
private <V> Consumer<String, V> createConsumer(Class<V> classType) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TOPIC, "true", this.broker);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, V> consumerFactory = new DefaultKafkaConsumerFactory(
consumerProps, new StringDeserializer(), new JsonDeserializer<>()
);
return consumerFactory.createConsumer();
}
}
Agora estamos prontos para escrever testes integrados para Producers Kafka.
Então, voltamos a classe VendaProducerTest
, e criaremos o nosso teste, que cuidará de publicar uma mensagem no tópico de vendas, e em seguida utilizaremos o KafkaEmbeddedBroker
e nosso método createConsumer()
para validar se a mensagem foi inserida como deveria.
Na construção do teste, iremos dividir em 3 etapas: cenário, ação e validação. No cenário, cuidaremos de disponibilizar o ambiente necessário para o caso de teste executar, nesta etapa, instanciaremos um objeto que represente a mensagem de venda, e habilitaremos para que um Consumer criado atenda as mensagens do tópico especificado. No bloco de ação, iremos disparar o método produz do VendaProducer. E por fim, utilizaremos nosso Consumer para verificar se a mensagem foi inserida e se corresponde a mensagem esperada.
É importante ressaltar que hoje o EmbeddedKafkaBroker não possui ferramentas para limpeza do topico, então é necessário que o TestContext
seja reconstruído após cada método, a fim de garantir que os testes sejam independentes um aos outros em sua execução. Então adicionaremos ao nível de classe de teste a anotação @DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
.
@SpringBootTest
@EmbeddedKafka(
topics = "venda",
partitions = 1,
bootstrapServersProperty = "spring.kafka.bootstrap-servers"
)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class VendaProducerTest {
@Autowired
private EmbeddedKafkaBroker broker;
@Autowired
private VendaProducer producer;
@Test
@DisplayName("deve consumir uma mensagem na fila")
void t1() {
//cenario
Consumer<String, Venda> consumer = createConsumer(Venda.class);
this.broker.consumeFromEmbeddedTopics(consumer, "venda");
Venda venda = new Venda("PlayStation 5", BigDecimal.valueOf(5000), LocalDate.now());
//acao
producer.produzir(venda);
//validacao
ConsumerRecords<String, Venda> records = KafkaTestUtils.getRecords(consumer, Duration.ofSeconds(5));
assertThat(records)
.hasSize(1)
.allMatch(msg -> msg.value().equals(venda));
}
private <V> Consumer<String, V> createConsumer(Class<V> classType) {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(TOPIC, "true", this.broker);
consumerProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, V> consumerFactory = new DefaultKafkaConsumerFactory(
consumerProps, new StringDeserializer(), new JsonDeserializer<>()
);
return consumerFactory.createConsumer();
}
}
Testes de integração possibilitam que características relacionadas a integração do código criado com o Spring Context sejam validadas. Caso existam bugs e anomalias devido à má configuração e definição de beans através das properties ou de forma programática, os mesmos terão sua identificação antecipada na etapa de testes.
Ao testar de forma integrada os Producers estamos favorecendo que a aplicação se comporte o mais próximo possível do contexto de execução real, possibilitando que mais características referentes a integração da lógica e Apache Kafka sejam validadas, antecipando a identificação de bugs.
E por fim, não menos importante, os testes integrados permitem a validação dos efeitos colaterais do servidor, oferecendo garantia que a mensagem foi inserida no tópico como deveria, ao contrário de Mocks, onde fazemos todas as validações em memória.
Conclusão
Durante este artigo exploramos diversos benefícios das APIs do Spring Kafka Test. Estas APIs nos possibilitam com mínimo de configuração expor uma instância do ZookeeperEmbedded
e KafkaEmbeddedBroker
.
O KafkaEmbeddedBroker
permite que os testes sejam executados mais próximos do cenário de uso real da aplicação. Desta forma, podemos validar características e configurações a fim de antecipar o encontro de bugs e anomalias.
Top comments (0)