Easy methods to Set Up Kafka Integration Take a look at – Grape Up

Do you take into account unit testing as not sufficient resolution for conserving the applying’s reliability and stability? Are you afraid that in some way or someplace there’s a potential bug hiding within the assumption that unit exams ought to cowl all circumstances? And likewise is mocking Kafka not sufficient for challenge necessities? If even one reply is  ‘sure’, then welcome to a pleasant and simple information on how one can arrange Integration Assessments for Kafka utilizing TestContainers and Embedded Kafka for Spring!

What’s TestContainers?

TestContainers is an open-source Java library specialised in offering all wanted options for the combination and testing of exterior sources. It implies that we’re capable of mimic an precise database, internet server, and even an occasion bus setting and deal with that as a dependable place to check app performance. All these fancy options are hooked into docker photos, outlined as containers. Do we have to take a look at the database layer with precise MongoDB? No worries, we’ve a take a look at container for that. We cannot additionally overlook about UI exams – Selenium Container will do something that we really want.
In our case, we’ll concentrate on Kafka Testcontainer.

What’s Embedded Kafka?

Because the title suggests, we’re going to cope with an in-memory Kafka occasion, prepared for use as a traditional dealer with full performance. It permits us to work with producers and customers, as standard, making our integration exams light-weight. 

Earlier than we begin

The idea for our take a look at is easy – I wish to take a look at Kafka shopper and producer utilizing two completely different approaches and examine how we will make the most of them in precise circumstances. 

Kafka Messages are serialized utilizing Avro schemas.

Embedded Kafka – Producer Take a look at

The idea is simple – let’s create a easy challenge with the controller, which invokes a service technique to push a Kafka Avro serialized message.

Dependencies:

dependencies 
implementation "org.apache.avro:avro:1.10.1"
implementation("io.confluent:kafka-avro-serializer:6.1.0")
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'
implementation('org.springframework.cloud:spring-cloud-stream:3.1.1')
implementation('org.springframework.cloud:spring-cloud-stream-binder-kafka:3.1.1')

implementation('org.springframework.boot:spring-boot-starter-web:2.4.3')
implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation('org.springframework.cloud:spring-cloud-stream-test-support:3.1.1')
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Additionally price mentioning unbelievable plugin for Avro. Right here plugins part:

plugins 
	id 'org.springframework.boot' model '2.6.8'
	id 'io.spring.dependency-management' model '1.0.11.RELEASE'
	id 'java'
	id "com.github.davidmc24.gradle.plugin.avro" model "1.3.0"

Avro Plugin helps schema auto-generating. It is a must-have.

Hyperlink to plugin: https://github.com/davidmc24/gradle-avro-plugin

Now let’s outline the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "kind": "report",
  "title": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "avro.java.string": "String"
    

  ]

Our ProducerService might be targeted solely on sending messages to Kafka utilizing a template, nothing thrilling about that half. Essential performance will be completed simply utilizing this line:

ListenableFuture<SendResult<String, RegisterRequest>> future = this.kafkaTemplate.ship("register-request", kafkaMessage);

We are able to’t overlook about take a look at properties:

spring:
  principal:
    allow-bean-definition-overriding: true
  kafka:
    shopper:
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.widespread.serialization.StringDeserializer
      value-deserializer: com.grapeup.myawesome.myawesomeconsumer.widespread.CustomKafkaAvroDeserializer
    producer:
      auto.register.schemas: true
      key-serializer: org.apache.kafka.widespread.serialization.StringSerializer
      value-serializer: com.grapeup.myawesome.myawesomeconsumer.widespread.CustomKafkaAvroSerializer
    properties:
      particular.avro.reader: true

As we see within the talked about take a look at properties, we declare a customized deserializer/serializer for KafkaMessages. It’s extremely advisable to make use of Kafka with Avro – don’t let JSONs keep object construction, let’s use civilized mapper and object definition like Avro.

Serializer:

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        tremendous();
        tremendous.schemaRegistry = new MockSchemaRegistryClient();
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        tremendous(new MockSchemaRegistryClient());
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map<String, ?> props) 
        tremendous(new MockSchemaRegistryClient(), props);
    

Deserializer:

public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        tremendous();
        tremendous.schemaRegistry = new MockSchemaRegistryClient();
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer) 
        tremendous(new MockSchemaRegistryClient());
    

    public CustomKafkaAvroSerializer(SchemaRegistryClient consumer, Map<String, ?> props) 
        tremendous(new MockSchemaRegistryClient(), props);
    

And we’ve every thing to begin writing our take a look at.

@ExtendWith(SpringExtension.class)
@SpringBootTest
@AutoConfigureMockMvc
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ActiveProfiles("take a look at")
@EmbeddedKafka(partitions = 1, matters = "register-request")
class ProducerControllerTest {

All we have to do is add @EmbeddedKafka annotation with listed matters and partitions. Utility Context will boot Kafka Dealer with supplied configuration similar to that. Remember that @TestInstance must be used with particular consideration. Lifecycle.PER_CLASS will keep away from creating the identical objects/context for every take a look at technique. Price checking if exams are too time-consuming.

Shopper<String, RegisterRequest> consumerServiceTest;
@BeforeEach
void setUp() 
DefaultKafkaConsumerFactory<String, RegisterRequest> shopper = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties();

consumerServiceTest = shopper.createConsumer();
consumerServiceTest.subscribe(Collections.singletonList(TOPIC_NAME));

Right here we will declare the take a look at shopper, based mostly on the Avro schema return kind. All Kafka properties are already supplied within the .yml file. That shopper might be used as a examine if the producer really pushed a message.

Right here is the precise take a look at technique:

@Take a look at
void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto request = RegisterRequestDto.builder()
                .id(12)
                .deal with("tempAddress")
                .construct();

        mockMvc.carry out(
                submit("/register-request")
                      .contentType("utility/json")
                      .content material(objectMapper.writeValueAsBytes(request)))
                .andExpect(standing().isOk());

      ConsumerRecord<String, RegisterRequest> consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, TOPIC_NAME);

        RegisterRequest valueReceived = consumedRegisterRequest.worth();

        assertEquals(12, valueReceived.getId());
        assertEquals("tempAddress", valueReceived.getAddress());
    

To begin with, we use MockMvc to carry out an motion on our endpoint. That endpoint makes use of ProducerService to push messages to Kafka. KafkaConsumer is used to confirm if the producer labored as anticipated. And that’s it – we’ve a totally working take a look at with embedded Kafka.

Take a look at Containers – Shopper Take a look at

TestContainers are nothing else like unbiased docker photos prepared for being dockerized. The next take a look at state of affairs might be enhanced by a MongoDB picture. Why not maintain our information within the database proper after something occurred in Kafka stream?

Dependencies will not be a lot completely different than within the earlier instance. The next steps are wanted for take a look at containers:

testImplementation 'org.testcontainers:junit-jupiter'
	testImplementation 'org.testcontainers:kafka'
	testImplementation 'org.testcontainers:mongodb'

ext 
	set('testcontainersVersion', "1.17.1")


dependencyManagement 
	imports 
		mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"
	

Let’s focus now on the Shopper half. The take a look at case might be easy – one shopper service might be liable for getting the Kafka message and storing the parsed payload within the MongoDB assortment. All that we have to find out about KafkaListeners, for now, is that annotation:

@KafkaListener(matters = "register-request")

By the performance of the annotation processor, KafkaListenerContainerFactory might be accountable to create a listener on our technique. From this second our technique will react to any upcoming Kafka message with the talked about subject.

Avro serializer and deserializer configs are the identical as within the earlier take a look at.

Relating to TestContainer, we must always begin with the next annotations:

@SpringBootTest
@ActiveProfiles("take a look at")
@Testcontainers
public class AbstractIntegrationTest {

Throughout startup, all configured TestContainers modules might be activated. It means that we are going to get entry to the complete working setting of the chosen supply. As instance:

@Autowired
non-public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Container
public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"));

@Container
static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017);

On account of booting the take a look at, we will anticipate two docker containers to begin with the supplied configuration.

What is de facto necessary for the mongo container – it provides us full entry to the database utilizing only a easy connection uri. With such a function, we’re in a position to have a look what’s the present state in our collections, even throughout debug mode and ready breakpoints.
Have a look additionally on the Ryuk container – it really works like overwatch and checks if our containers have began accurately.

And right here is the final a part of the configuration:

@DynamicPropertySource
static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.kafka.shopper.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers);
   registry.add("spring.information.mongodb.uri", mongoDBContainer::getReplicaSetUrl);


static 
   kafkaContainer.begin();
   mongoDBContainer.begin();

   mongoDBContainer.waitingFor(Wait.forListeningPort()
           .withStartupTimeout(Length.ofSeconds(180L)));


@BeforeTestClass
public void beforeTest() 

   kafkaListenerEndpointRegistry.getListenerContainers().forEach(
           messageListenerContainer -> 
               ContainerTestUtils
                       .waitForAssignment(messageListenerContainer, 1);

           
   );


@AfterAll
static void tearDown() 
   kafkaContainer.cease();
   mongoDBContainer.cease();

DynamicPropertySource provides us the choice to set all wanted setting variables in the course of the take a look at lifecycle. Strongly wanted for any config functions for TestContainers. Additionally, beforeTestClass kafkaListenerEndpointRegistry waits for every listener to get anticipated partitions throughout container startup.

And the final a part of the Kafka take a look at containers journey – the principle physique of the take a look at:

@Take a look at
public void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("register-request", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct());

   //Await KafkaListener
   TimeUnit.SECONDS.sleep(5);
   Assertions.assertEquals(1, taxiRepository.findAll().measurement());



non-public KafkaProducer<String, RegisterRequest> createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties());


non-public void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   strive (KafkaProducer<String, RegisterRequest> producer = createProducer()) 
       Arrays.stream(registerRequests)
               .forEach(registerRequest -> 
                           ProducerRecord<String, RegisterRequest> report = new ProducerRecord<>(topicName, registerRequest);
                           producer.ship(report);
                       
               );
   

The customized producer is liable for writing our message to KafkaBroker. Additionally, it’s endorsed to present a while for customers to deal with messages correctly. As we see, the message was not simply consumed by the listener, but additionally saved within the MongoDB assortment.

Conclusions

As we will see, present options for integration exams are fairly simple to implement and keep in tasks. There isn’t a level in conserving simply unit exams and relying on all traces lined as an indication of code/logic high quality. Now the query is, ought to we use an Embedded resolution or TestContainers? I counsel initially specializing in the phrase “Embedded”. As an ideal integration take a look at, we need to get an virtually supreme copy of the manufacturing setting with all properties/options included. In-memory options are good, however principally, not sufficient for big enterprise tasks. Undoubtedly, the benefit of Embedded providers is the straightforward strategy to implement such exams and keep configuration, simply when something occurs in reminiscence.
TestContainers on the first sight may seem like overkill, however they provide us an important function, which is a separate setting. We don’t must even depend on present docker photos – if we wish we will use customized ones. It is a large enchancment for potential take a look at eventualities.
What about Jenkins? There isn’t a motive to be afraid additionally to make use of TestContainers in Jenkins. I firmly suggest checking TestContainers documentation on how simply we will arrange the configuration for Jenkins brokers.
To sum up – if there is no such thing as a blocker or any undesirable situation for utilizing TestContainers, then don’t hesitate. It’s all the time good to maintain all providers managed and secured with integration take a look at contracts.