Microservices with Kafka

By Nobukimi Sasaki (2023-04-23)

Introduction:
This is my study note composed of three services (Spring Boot) with a message broker (kafka).
There services are independent to each other and communicating asynchronously.

Entire project structure:
There are four Spring Boots project under one folder

Service A: order-service
Whenever receiving the HTTP request, the order-service create a event and publish the event to the message broker (Kafka).

Service B (stock-service) & C (email-service)
Just consume the message from the message broker (Kafka) and Service B & C don’t know about Service A (order-service). They are all independent to each other.

Spring Boot for DTO object (base-domains)
The DTO classes for the message that being referred from Service A, B, and C.

Service A – order-service detail:

1. OrderController::placeOrder receives the request from HTTP client
2. OrderController::placeOrder::sendMessage calls Kakfa Producer
3. OrderProducer::sendMessage calls KafkaTemplate::send

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.topic.name=order_topics

At Service A, this property configures a Kafka Server (line 1), and topic name (line 4)

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>net.javaguides</groupId>
			<artifactId>base-domains</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

At Service A, B, and C, the dependency is Kakfa and DTO project (base-domains)

import net.javaguides.basedomains.dto.Order;
import net.javaguides.basedomains.dto.OrderEvent;
import net.javaguides.orderservice.kafka.OrderProducer;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.UUID;

@RestController
@RequestMapping("/api/v1")
public class OrderController {

    private OrderProducer orderProducer;

    public OrderController(OrderProducer orderProducer) {
        this.orderProducer = orderProducer;
    }

    // Rest End Point API for testing
    @PostMapping("/orders")
    public String placeOrder(@RequestBody Order order){

        order.setOrderId(UUID.randomUUID().toString());

        OrderEvent orderEvent = new OrderEvent();
        orderEvent.setStatus("PENDING");
        orderEvent.setMessage("order status is in pending state");
        orderEvent.setOrder(order);

        orderProducer.sendMessage(orderEvent);

        return "Order placed successfully ...";
    }
}

At Service A, OrderController::placeOrder receives the request from HTTP client, and sendMessage calls Kakfa Producer

import net.javaguides.basedomains.dto.OrderEvent;

import org.apache.kafka.clients.admin.NewTopic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Service
public class OrderProducer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderProducer.class);

    private NewTopic topic;

    private KafkaTemplate<String, OrderEvent> kafkaTemplate;

    // Parameter constructor in this class
    public OrderProducer(NewTopic topic, KafkaTemplate<String, OrderEvent> kafkaTemplate) {
        this.topic = topic;
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendMessage(OrderEvent event){
        LOGGER.info(String.format("Order event => %s", event.toString()));

        // create Message
        Message<OrderEvent> message = MessageBuilder
                .withPayload(event)
                .setHeader(KafkaHeaders.TOPIC, topic.name())
                .build();
        kafkaTemplate.send(message);
    }
}

At Service A, OrderProducer::sendMessage calls KafkaTemplate::send

Service B (stock-service) and C (email-service) Service detail:

Service B (stock-service) and C (email-service) has kafka consumer to receive the message.

server.port=8081
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: stock
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics

At Service B (stock-service), Line 1 – the port is 8081, and group id is “stock”

server.port=8082
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: email
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
spring.kafka.topic.name=order_topics

At Service C (email-service), Line 1 – the port is 8082, and group id is “email”
Both Service B & C, Line 2 – register the kafka server port 9092, and line 8, the topic name of “order_topics”

import net.javaguides.basedomains.dto.OrderEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class OrderConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(OrderConsumer.class);

    @KafkaListener(
            topics = "${spring.kafka.topic.name}"
            ,groupId = "${spring.kafka.consumer.group-id}"
    )
    public void consume(OrderEvent event){
        LOGGER.info(String.format("Order event received in stock service => %s", event.toString()));

        // business logic
    }
}

Both Service B & C have this consumer class.

DTO object (base-domains) Details:

This DTO classes are referenced from Service A, B, and C


Example of RESTFul client

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.