Create a Streaming Application with Kafka
This guide describes how to use the Graal Development Kit for Micronaut (GDK) to create a streaming application that demonstrates how to use the Micronaut® Streaming API. The application consists of two Micronaut microservices that use Kafka Streams to communicate with each other in an asynchronous and decoupled way.
Kafka Streams is a client library for building applications and microservices, where the input and output data are stored in an Apache Kafka cluster. It combines writing and deploying Java applications on the client with the benefits of Kafka’s server-side cluster technology.
Prerequisites #
- JDK 17 or higher. See Setting up Your Desktop.
- A Docker-API compatible container runtime such as Rancher Desktop or Docker installed if you will be running Kafka in a container.
- The GDK CLI. See Setting up Your Desktop. (Optional.)
Follow the steps below to create the application from scratch. However, you can also download the completed example:
A note regarding your development environment
Consider using Visual Studio Code, which provides native support for developing applications with the Graal Development Kit extension.
Note: If you use IntelliJ IDEA, enable annotation processing.
1. Create the Microservices #
The two microservices are:
- Books returns a list of books. It uses a domain consisting of a book name and an International Standard Book Number (ISBN). It also publishes a message to Kafka every time a book is accessed.
- Analytics connects to Kafka to update the analytics for every book (a counter). It also exposes an endpoint to retrieve the counter.
1.1. Create the Books Microservice #
-
Open the GDK Launcher in advanced mode.
- Create a new project using the following selections.
- Project Type: Application (Default)
- Project Name: books
- Base Package: com.example.publisher
- Clouds: None
- Build Tool: Gradle (Groovy) or Maven
- Language: Java (Default)
- Test Framework: JUnit (Default)
- Java Version: 17 (Default)
- Micronaut Version: (Default)
- Cloud Services: Streaming
- Features: Awaitility Framework, GraalVM Native Image, Micronaut Serialization Jackson Core, and Reactor
- Sample Code: No
- Click Generate Project, then click Download Zip. The GDK Launcher creates an application with the default package
com.example.publisher
in a directory named books. The application ZIP file will be downloaded to your default downloads directory. Unzip it, open it in your code editor, and proceed to the next steps.
Alternatively, use the GDK CLI as follows:
gdk create-app com.example.publisher.books \
--services=streaming \
--features=awaitility,graalvm,reactor,serialization-jackson \
--build=gradle \
--jdk=17 \
--lang=java \
--example-code=false
gdk create-app com.example.publisher.books \
--services=streaming \
--features=awaitility,graalvm,reactor,serialization-jackson \
--build=maven \
--jdk=17 \
--lang=java \
--example-code=false
1.1.1. Book Domain Class
The GDK Launcher created a Book
domain class in a file named src/main/java/com/example/publisher/Book.java, as follows:
package com.example.publisher;
import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;
import java.util.Objects;
@Serdeable
public class Book {
private final String isbn;
private final String name;
@Creator
public Book(String isbn, String name) {
this.isbn = isbn;
this.name = name;
}
public String getIsbn() {
return isbn;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "Book{" +
"isbn='" + isbn + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Book other = (Book) o;
return Objects.equals(isbn, other.isbn) &&
Objects.equals(name, other.name);
}
@Override
public int hashCode() {
return Objects.hash(isbn, name);
}
}
1.1.2. BookService
To keep this guide simple there is no database persistence: the Books microservice keeps the list of books in memory. The GDK Launcher created a class named BookService
in src/main/java/com/example/publisher/BookService.java with the following contents:
package com.example.publisher;
import jakarta.annotation.PostConstruct;
import jakarta.inject.Singleton;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
@Singleton
public class BookService {
private final List<Book> bookStore = new ArrayList<>();
@PostConstruct
void init() {
bookStore.add(new Book("1491950358", "Building Microservices"));
bookStore.add(new Book("1680502395", "Release It!"));
bookStore.add(new Book("0321601912", "Continuous Delivery"));
}
public List<Book> listAll() {
return bookStore;
}
public Optional<Book> findByIsbn(String isbn) {
return bookStore.stream()
.filter(b -> b.getIsbn().equals(isbn))
.findFirst();
}
}
1.1.3. BookController
The GDK Launcher created a controller to access Book
instances in a file named src/main/java/com/example/publisher/BookController.java with the following contents:
package com.example.publisher;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import java.util.List;
import java.util.Optional;
@Controller("/books") // <1>
class BookController {
private final BookService bookService;
BookController(BookService bookService) { // <2>
this.bookService = bookService;
}
@Get // <3>
List<Book> listAll() {
return bookService.listAll();
}
@Get("/{isbn}") // <4>
Optional<Book> findBook(String isbn) {
return bookService.findByIsbn(isbn);
}
}
1 The @Controller
annotation defines the class as a controller mapped to the root URI /books
.
2 Use constructor injection to inject a bean of type BookService
.
3 The @Get
annotation maps the listAll
method to an HTTP GET request on /books
.
4 The @Get
annotation maps the findBook
method to an HTTP GET request on /books/{isbn}
.
1.1.4. BookControllerTest
The GDK Launcher created a test for BookController
to verify the interaction with the Analytics microservice in a file named src/test/java/com/example/publisher/BookControllerTest.java with the following contents:
package com.example.publisher;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.core.type.Argument;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.client.HttpClient;
import io.micronaut.http.client.annotation.Client;
import io.micronaut.http.client.exceptions.HttpClientResponseException;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import jakarta.inject.Inject;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.ConcurrentLinkedDeque;
import static io.micronaut.configuration.kafka.annotation.OffsetReset.EARLIEST;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.TestInstance.Lifecycle.PER_CLASS;
@MicronautTest
@TestInstance(PER_CLASS) // <1>
class BookControllerTest {
private static final Collection<Book> received = new ConcurrentLinkedDeque<>();
@Inject
AnalyticsListener analyticsListener; // <2>
@Inject
@Client("/")
HttpClient client; // <3>
@Test
void testMessageIsPublishedToKafkaWhenBookFound() {
String isbn = "1491950358";
Optional<Book> result = retrieveGet("/books/" + isbn); // <4>
assertNotNull(result);
assertTrue(result.isPresent());
assertEquals(isbn, result.get().getIsbn());
await().atMost(5, SECONDS).until(() -> !received.isEmpty()); // <5>
assertEquals(1, received.size()); // <6>
Book bookFromKafka = received.iterator().next();
assertNotNull(bookFromKafka);
assertEquals(isbn, bookFromKafka.getIsbn());
}
@Test
void testMessageIsNotPublishedToKafkaWhenBookNotFound() throws Exception {
assertThrows(HttpClientResponseException.class, () -> {
retrieveGet("/books/INVALID");
});
Thread.sleep(5_000); // <7>
assertEquals(0, received.size());
}
@AfterEach
void cleanup() {
received.clear();
}
@KafkaListener(offsetReset = EARLIEST)
static class AnalyticsListener {
@Topic("analytics")
void updateAnalytics(Book book) {
received.add(book);
}
}
private Optional<Book> retrieveGet(String url) {
return client.toBlocking().retrieve(HttpRequest.GET(url), Argument.of(Optional.class, Book.class));
}
}
1 Classes that implement TestPropertyProvider
must use this annotation to create a single class instance for all tests.
2 Dependency injection for the AnalyticsListener
class declared later in the file. This is a listener class that replicates the functionality of the class of the same name in the Analytics microservice.
3 Dependency injection for an HTTP client that the Micronaut framework will implement at compile to make calls to BookController
.
4 Use the HttpClient
to retrieve the details of a Book
, which will trigger sending a message.
5 Wait a few seconds for the message to arrive; it should happen very quickly, but the message will be sent on a separate thread.
6 Verify that the message was received and that it has the correct data.
7 Wait a few seconds to ensure no message is received.
1.1.5. AnalyticsClient
The GDK Launcher created a client interface to send messages to the streaming service in a file named src/main/java/com/example/publisher/AnalyticsClient.java with the contents shown below. (Micronaut generates an implementation for the client interface at compilation time.)
package com.example.publisher;
import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.annotation.Topic;
import reactor.core.publisher.Mono;
@KafkaClient
public interface AnalyticsClient {
@Topic("analytics") // <1>
Mono<Book> updateAnalytics(Book book); // <2>
}
1 Set the name of the topic.
2 Send the Book
POJO. Micronaut will automatically convert it to JSON before sending it.
1.1.6. AnalyticsFilter
Sending a message to the streaming service is as simple as injecting AnalyticsClient
and calling its updateAnalytics
method.
The goal is to send a message every time the details of a book are returned from the Books microservice or, in other words, every time there is a call to http://localhost:8080/books/{isbn}
.
To achieve this, the GDK Launcher created an Http Server Filter in a file named src/main/java/com/example/publisher/AnalyticsFilter.java as follows:
package com.example.publisher;
import io.micronaut.http.HttpRequest;
import io.micronaut.http.MutableHttpResponse;
import io.micronaut.http.annotation.Filter;
import io.micronaut.http.filter.HttpServerFilter;
import io.micronaut.http.filter.ServerFilterChain;
import reactor.core.publisher.Flux;
import org.reactivestreams.Publisher;
@Filter("/books/?*") // <1>
class AnalyticsFilter implements HttpServerFilter { // <2>
private final AnalyticsClient analyticsClient;
AnalyticsFilter(AnalyticsClient analyticsClient) { // <3>
this.analyticsClient = analyticsClient;
}
@Override
public Publisher<MutableHttpResponse<?>> doFilter(HttpRequest<?> request,
ServerFilterChain chain) { // <4>
return Flux
.from(chain.proceed(request)) // <5>
.flatMap(response -> {
Book book = response.getBody(Book.class).orElse(null); // <6>
if (book == null) {
return Flux.just(response);
}
return Flux.from(analyticsClient.updateAnalytics(book)).map(b -> response); // <7>
});
}
}
1 Annotate the class with @Filter
and define the Ant-style matcher pattern to intercept all calls to the desired URIs.
2 The class must implement HttpServerFilter
.
3 Dependency injection for AnalyticsClient
.
4 Implement the doFilter
method.
5 Call the request; this will invoke the controller action.
6 Get the response from the controller and return the body as an instance of the Book
class.
7 If the book is retrieved, use the client to send a message.
1.2. Create the Analytics Microservice #
-
Open the GDK Launcher in advanced mode.
- Create a new project using the following selections.
- Project Type: Application (Default)
- Project Name: analytics
- Base Package: com.example.consumer
- Clouds: None
- Build Tool: Gradle (Groovy) or Maven
- Language: Java (Default)
- Test Framework: JUnit (Default)
- Java Version: 17 (Default)
- Micronaut Version: (Default)
- Cloud Services: Streaming
- Features: Awaitility Framework, GraalVM Native Image and Micronaut Serialization Jackson Core
- Sample Code: No
- Click Generate Project, then click Download Zip. The GDK Launcher creates an application with the default package
com.example.consumer
in a directory named analytics. The application ZIP file will be downloaded to your default downloads directory. Unzip it, open it in your code editor, and proceed to the next steps.
Alternatively, use the GDK CLI as follows:
gdk create-app com.example.consumer.analytics \
--services=streaming \
--features=awaitility,graalvm,serialization-jackson \
--build=gradle \
--jdk=17 \
--lang=java \
--example-code=false
gdk create-app com.example.consumer.analytics \
--services=streaming \
--features=awaitility,graalvm,serialization-jackson \
--build=maven \
--jdk=17 \
--lang=java \
--example-code=false
1.2.1. Domain Classes
The GDK Launcher created a Book
domain class in a file named src/main/java/com/example/consumer/Book.java, as shown below. (This Book
POJO is the same as the one in the Books microservice. In a real application this would be in a shared library but to keep things simple, just duplicate it.)
package com.example.consumer;
import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;
import java.util.Objects;
@Serdeable
public class Book {
private final String isbn;
private final String name;
@Creator
public Book(String isbn, String name) {
this.isbn = isbn;
this.name = name;
}
public String getIsbn() {
return isbn;
}
public String getName() {
return name;
}
@Override
public String toString() {
return "Book{" +
"isbn='" + isbn + '\'' +
", name='" + name + '\'' +
'}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Book other = (Book) o;
return Objects.equals(isbn, other.isbn) &&
Objects.equals(name, other.name);
}
@Override
public int hashCode() {
return Objects.hash(isbn, name);
}
}
The GDK Launcher also created a BookAnalytics
domain class in a file named src/main/java/com/example/consumer/BookAnalytics.java, as follows:
package com.example.consumer;
import io.micronaut.core.annotation.Creator;
import io.micronaut.serde.annotation.Serdeable;
@Serdeable
public class BookAnalytics {
private final String bookIsbn;
private final long count;
@Creator
public BookAnalytics(String bookIsbn, long count) {
this.bookIsbn = bookIsbn;
this.count = count;
}
public String getBookIsbn() {
return bookIsbn;
}
public long getCount() {
return count;
}
}
1.2.2. AnalyticsService
To keep this guide simple there is no database persistence: the Analytics microservice keeps book analytics in memory. The GDK Launcher created a class named AnalyticsService
in src/main/java/com/example/consumer/AnalyticsService.java with the following contents:
package com.example.consumer;
import jakarta.inject.Singleton;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Singleton
public class AnalyticsService {
private final Map<Book, Long> bookAnalytics = new ConcurrentHashMap<>(); // <1>
public void updateBookAnalytics(Book book) { // <2>
bookAnalytics.compute(book, (k, v) -> {
return v == null ? 1L : v + 1;
});
}
public List<BookAnalytics> listAnalytics() { // <3>
return bookAnalytics
.entrySet()
.stream()
.map(e -> new BookAnalytics(e.getKey().getIsbn(), e.getValue()))
.collect(Collectors.toList());
}
}
1 Keep the book analytics in memory.
2 Initialize and update the analytics for the book passed as parameter.
3 Return all the analytics.
1.2.3. AnalyticsServiceTest
The GDK Launcher created a test for the AnalyticsService
class, in a file named src/test/java/com/example/consumer/AnalyticsServiceTest.java, as follows:
package com.example.consumer;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.micronaut.test.extensions.junit5.annotation.MicronautTest;
import org.junit.jupiter.api.Test;
import jakarta.inject.Inject;
import java.util.List;
@MicronautTest
class AnalyticsServiceTest {
@Inject
AnalyticsService analyticsService;
@Test
void testUpdateBookAnalyticsAndGetAnalytics() {
Book b1 = new Book("1491950358", "Building Microservices");
Book b2 = new Book("1680502395", "Release It!");
analyticsService.updateBookAnalytics(b1);
analyticsService.updateBookAnalytics(b1);
analyticsService.updateBookAnalytics(b1);
analyticsService.updateBookAnalytics(b2);
List<BookAnalytics> analytics = analyticsService.listAnalytics();
assertEquals(2, analytics.size());
assertEquals(3, findBookAnalytics(b1, analytics).getCount());
assertEquals(1, findBookAnalytics(b2, analytics).getCount());
}
private BookAnalytics findBookAnalytics(Book b, List<BookAnalytics> analytics) {
return analytics
.stream()
.filter(bookAnalytics -> bookAnalytics.getBookIsbn().equals(b.getIsbn()))
.findFirst()
.orElseThrow(() -> new RuntimeException("Book not found"));
}
}
1.2.4. AnalyticsController
The GDK Launcher created a Controller to create an endpoint for the Analytics microservice in a file named src/main/java/com/example/consumer/AnalyticsController.java, as follows:
package com.example.consumer;
import io.micronaut.http.annotation.Controller;
import io.micronaut.http.annotation.Get;
import java.util.List;
@Controller("/analytics") // <1>
class AnalyticsController {
private final AnalyticsService analyticsService;
AnalyticsController(AnalyticsService analyticsService) {
this.analyticsService = analyticsService;
}
@Get // <2>
List<BookAnalytics> listAnalytics() {
return analyticsService.listAnalytics();
}
}
1 The @Controller
annotation defines the class as a controller mapped to the root URI /analytics
.
2 The @Get
annotation maps the listAnalytics
method to an HTTP GET request on /analytics
.
The application doesn’t expose the method updateBookAnalytics
created in AnalyticsService
. This method will be invoked when reading messages from Kafka.
1.2.5. AnalyticsListener
The GDK Launcher created a class to act as a consumer of the messages sent to the streaming service by the Books microservice. The Micronaut framework implements logic to invoke the consumer at compile time. The AnalyticsListener
class is in a file named src/main/java/com/example/consumer/AnalyticsListener.java, as follows:
package com.example.consumer;
import io.micronaut.configuration.kafka.annotation.KafkaListener;
import io.micronaut.configuration.kafka.annotation.Topic;
import io.micronaut.context.annotation.Requires;
import io.micronaut.context.env.Environment;
@Requires(notEnv = Environment.TEST) // <1>
@KafkaListener // <2>
class AnalyticsListener {
private final AnalyticsService analyticsService; // <3>
AnalyticsListener(AnalyticsService analyticsService) { // <3>
this.analyticsService = analyticsService;
}
@Topic("analytics") // <4>
void updateAnalytics(Book book) {
analyticsService.updateBookAnalytics(book); // <5>
}
}
1 Do not load this bean in the test
environment: you can run tests without access to a streaming service.
2 Annotate the class with @KafkaListener
to indicate that this bean consumes messages from Kafka.
3 Constructor injection for AnalyticsService
.
4 Annotate the method with @Topic
and specify the topic name.
5 Call AnalyticsService
to update the analytics for the book.
1.2.6. Change the port for the Analytics Microservice
The Books and Analytics microservices are both run on your local machine, so they must run on different ports. Change the port that Analytics runs on by editing the src/main/resources/application.properties file so that it has the following contents:
micronaut.server.port=8081
kafka.enabled=true
2. Enable Kafka Streaming #
In this section you enable Kafka streaming and configure both microservices to send messages in an asynchronous and decoupled way. A fast way to start Kafka is via a container—follow the steps below to create one. (Alternatively, you can install and run a local Kafka instance.)
-
Copy the following contents into a file named docker-compose.yml in a directory named docker/
version: '2' services: zookeeper: image: confluentinc/cp-zookeeper ports: - 2181:2181 # <1> environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 kafka: image: confluentinc/cp-kafka depends_on: - zookeeper ports: - 9092:9092 # <2> environment: KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
1 ZooKeeper uses port 2181 by default, but you can change the value if necessary.
2 Kafka uses port 9092 by default, but you can change the value if necessary.
-
Start ZooKeeper and Kafka as follows:
docker-compose up
Note: to shutdown the container, use the command
docker-compose down
. -
To run the microservices locally, add the
bootstrap.servers
configuration to each microservice’s application.properties file.kafka.bootstrap.servers=localhost:9092
3. Test the Microservices #
Use the following command to test Books the Analytics microservices respectively:
4. Run the Microservices #
4.1. Start the Books Microservice #
To run the Books microservice, use the following command, which starts the application on port 8080.
4.2. Start the Analytics Microservice #
To run the Analytics microservice, use the following command, which starts the application on port 8081.
4.3. Test the Microservices #
Use curl
to test the microservices, as follows.
-
Retrieve the list of books:
curl http://localhost:8080/books
[{"isbn":"1491950358","name":"Building Microservices"},{"isbn":"1680502395","name":"Release It!"},{"isbn":"0321601912","name":"Continuous Delivery"}]
-
Retrieve the details of a specified book:
curl http://localhost:8080/books/1491950358
{"isbn":"1491950358","name":"Building Microservices"}
-
Retrieve the analytics:
curl http://localhost:8081/analytics
[{"bookIsbn":"1491950358","count":1}]
Update the curl
command to the Books microservice to retrieve other books and repeat the invocations, then re-run the curl
command to the Analytics microservice to see that the counts increase.
5. Generate a Native Executable Using GraalVM #
The GDK supports compiling Java applications ahead-of-time into native executables using GraalVM Native Image. You can use the Gradle plugin for GraalVM Native Image building/Maven plugin for GraalVM Native Image building. Packaged as a native executable, it significantly reduces application startup time and memory footprint.
Prerequisites: Make sure you have installed a GraalVM JDK. The easiest way to get started is with SDKMAN!. For other installation options, visit the Downloads section.
Generate a native executable for each microservice, using the following commands.
-
Generate a Native Executable for the Books microservice, as follows:
./gradlew nativeCompile
The native executable is created in the build/native/nativeCompile/ directory and can be run with the following command:
build/native/nativeCompile/books
./mvnw package -Dpackaging=native-image
The native executable is created in the target/ directory and can be run with the following command:
target/books
-
Generate a Native Executable for the Analytics microservice, as follows:
./gradlew nativeCompile
The native executable is created in the build/native/nativeCompile/ directory and can be run with the following command:
build/native/nativeCompile/analytics
./mvnw package -Dpackaging=native-image
The native executable is created in the target/ directory and can be run with the following command:
target/analytics
Start the native executables for the two microservices and run the same curl
requests as before to check that everything works as expected.
You can see that the microservices behave identically as if you run them from JAR files, but with reduced startup time and smaller memory footprint.
Summary #
In this guide you created a streaming application with the Micronaut framework and Kafka Streams.The communication between two microservices acting as a producer and consumer ran asynchronously. Then you packaged these microservices into native executables with GraalVM Native Image for their faster startup and reduced memory footprint.