Back in 2015 and 2016, I wrote two blogs that went step by step to develop a microservice/Netty architecture with fully working code called Whirlpool.
A lot has changed in the years since, so recently I decided to come back to the project, update it with the latest versions of Kafka and Netty, and add a React UI to it (rather than the vanilla JavaScript version it used before). In addition, I also added Windows Subsystem for Linux (WSL) scripts in addition to the Mac and Linux scripts that were there before and made all of the scripts more robust.
This blog will be about the work that went into all of those updates, plus a look at the new React UI. This provides an excellent view into what it takes to update an outdated microservices application implemented with Kafka (version .9 –>3.0) and Netty (4.1.3->5.0.0-alpha2), bringing all versions up to date and adding a React UI. By the end you’ll be familiar with the latest versions of these frameworks, know some “gotchas” to avoid, as well as understand how to integrate WebSockets into React.
Diving Into The Code
Both previous posts can be considered prerequisites on Whirlpool as reading before diving into this blog, as this one builds on those. (You can find them here and here.)
Once you read those, download the source code from here, read the README.md, load the project in your favorite IDE so you can have the code handy, and let’s get going.
Pro tip! Like every developer, I always try to write robust code that doesn’t have memory leaks, orphaned threads, excessive CPU consumption, etc. It turns out that this project had all of those things and more due to my limited knowledge of the technologies I was learning when I wrote this project. I knew enough to get that sample running, but not enough to do it completely correctly. I’ve learned a great deal since then and applied that knowledge to make this project much better. If you’ve forked this project or grabbed any code from it (and you are welcome to do that!), you’ll want to revisit that code and compare it to the latest version.
Kafka
Even though Kafka went from version 0.9 to 3.0, the API did not change very much. The major change was moving from one Kafka producer thread per connection to just one producer thread for the entire server. All connections now share a ConcurrentLinkedQueue which they can write to, which tells the producer thread to send a message to the Kafka bus. There was no reason to have one per client connection since there is only one Kafka bus anyway.
You’ll notice some increased sleep times across the service threads to keep from overwhelming the sites I’m relying on for the data that powers each service. Since the Yahoo APIs I was using have gone away, I had to rewrite both the stock service and the weather service. Both now use screen-scraping to retrieve their data because this is a sample and I didn’t want to have to register for any free services to get this working. The stock service relies on Yahoo’s finance website, whereas the weather service relies on CNN’s weather pages. Screen scraping isn’t recommended because those pages can change at any time, which will break the scrapers.
ThreadFactory Tip
I learned a few new things about using java.util.concurrent.ThreadFactory
. The biggest is that each factory only needs to be created once, then reused when needed to create new threads of that factory type. I was creating a new factory from scratch every time using com.google.common.util.concurrent.ThreadFactoryBuilder
(but there are many others that could have been used), which led to each thread of the factory type being identified as thread 0.
After I changed to a single instance of each factory, the thread identifiers then correctly began to increment as new threads were created. Another improvement is that shutdown() should be called on the Executor that starts each thread so that the executor goes away after the thread ends. If it doesn’t, a thread leak results because the thread is never completely ended and removed.
Netty
Let’s switch over to Netty. I had several misunderstandings about how Netty went about accepting new requests, which led to quite a few problems.
The issue was that these problems were hiding – everything looked fine until I used the Mac activity monitor to watch it more closely. Each time a connection ended, no threads would close. Each time a new connection started, many threads would start. Running the project for short periods of time to test didn’t set off any alarms because I would shut down the project before the extra threads caused problems.
Another issue was that Netty’s worker thread group seemed to grow without bounds. Let’s look at how I fixed these issues.
Fixing the worker threads was pretty easy. I simply told the group what the maximum number of threads should be by asking how many processors were available, and then using that.
final EventLoopGroup workerGroup = new NioEventLoopGroup(Runtime.getRuntime().availableProcessors());
You can also see that the producer’s request queue and thread are being created right at the start of the server execution because as I mentioned earlier they are now shared between all connections.
I changed the names of the items added to the channel pipeline, and the order, as we want the aggregator to finish building up the full message before it hands the work off to the string decoder, encoder, and finally the custom handler that is the main code for the server. At this point, the producer thread is running, and the queue has been created, so we now pass that to the server handler.
What I didn’t understand before is that initChannel()
is called for each connection that is created, not once when Netty starts up. So, each connection gets its own copy of each object in the pipeline.
Whirlpool Service Handler
With that in mind, let’s look at the changes made to WhirlpoolServerHandler
. Compared with the source from years ago, the class static variable ‘channels’ has disappeared. It is now handled in WhirlpoolMessageHandler
, and we’ll see why when we get there.
The wsMessageHandler
is now created in the constructor instead of at the class level so we can pass the requestQueue
that talks to the Kafka producer thread to it.
The next change to highlight is important. In the handlerRemoved()
method a call is now made to wsMessageHandler
to ask it to shut down. This is what was missing before that caused all of the thread leaks. handlerRemoved()
is called by Netty when a client connection ends so things can be cleaned up. Since wsMessageHandler
was never informed about the connection dropping, it never asked its threads to end. Because those threads were still attached, Netty could not fully release the channels for the connection.
Most of the other changes in WhirlpoolServerHandler
are just due to changes in the Netty API. There is one more fix to point out, however. When logging out, the code used to look for the first channel that had the user’s id set in the channel attribute. When it was found, the channel’s remove()
method was called, and the WebSocket was closed. However, what I didn’t realize is that multiple channels can be created for a connection. At times after logging out the same user couldn’t log back in because the code still thought that the user was connected. Only after changing to look through all of the channels in the group and removing all of the channels that had the attribute set did this issue get resolved.
Whirlpool Message Handler
So, on to WhirlpoolMessageHandler
. This class receives the completed message and decides what to do with it. This class is where the messages are given to the producer, who sends the request to Kafka topics, where they are routed to the separate microservices, who put response messages on the Kafka bus, and then those are read by the consumer thread. If the message is for the user tied to the connection, it is then sent over the WebSocket back to the client.
I’ll only touch on things here that I haven’t before because some of the same fixes from earlier were also applied here.
First, a new volatile class variable shutdownHandler
was added. This is used to tell the Kafka consumer thread that the connection is gone so no more messages will be going back to the client, and therefore the consumer and thread should shut down. Previously there was no way to tell that thread to go away without shutting down the server.
Second, a new channel group is created for each connection in the constructor. Even though all channel groups have the same name, that is fine because Netty specifically allows that to happen. Channel groups are not tracked by their names – it is just a convenience field. A channel group for each connection separates all of the channels by user, improving information hiding and logical grouping.
Third, the producer class has been extracted to its own file since it is no longer created for each thread.
Fourth, a new type of message ‘ALL’ has been added for broadcasting a command to all services at once. This is being used for the new ‘refresh’ command we’ll discuss later.
Finally fifth, down at the bottom, when the Kafka consumer is closed it is limited to 10 seconds, or else it gives up and closes anyway. Previously it could wait forever.
So, that takes care of the server changes. All of these things together have resulted in a server that is stable, fast, uses fewer threads, and also less memory. If the UI seems slow to receive messages, it is because of the long sleep times I have across the server. You can play with these to see just how fast Netty and Kafka can work together. You could maybe even add a “Counter” service. You’d pass a number to it and a delay time between counts. The service would then send data to the client, incrementing from 1-N with the delay time between each send. I just thought about adding that service after finishing up all the other changes, so I didn’t have time to do that. With the other services as a guide, I am certain you can learn a lot by adding a new service.
React
Creating a new React UI was a lot of fun. I would be remiss if I didn’t give a shout-out to the instructor of the course I took to learn React (and React-Native, too). Maximilian Schwarzmüller teaches courses on Udemy. My favorite for React is this course. I highly recommend it for anyone interested in improving their React skills!
I used the command
npx create-react-app whirlpool --template cra-template-pwa
from the src/main/ui folder to get the UI code started. This automatically sets up everything needed to build and run the project. It wasn’t 100% necessary for this project since it heavily relies on being online, but I used the cra-template-pwa template to also include PWA features that are built into React, like service workers and offline support. These settings are specified in index.js, which I did not touch.
App.js
Moving on to App.js, this is the main component for the UI. In larger applications, App would be mostly clean with just a small component tree, React-Redux defined, and other top-level logic. This is a small application, so a lot of functionality is embedded here. You’ll only find functional components using Hooks in this application as I wanted to be consistent and not use a mix of class and functional components.
After the user enters their username (which can be anything since we don’t have a database), loginHandler
is called. This uses the built-in JavaScript fetch()
call to send the info to the server as a normal HTTP POST. If a successful response is received, some state data is stored and the WebSocket is started.
In the onopen
function called by the WebSocket, the first command is sent asking for a refresh of all current subscriptions. The first time no data will be received, but if the user reloads the page, a cookie that is set during login enables automatically logging back in. If the user previously had set up subscriptions, the services would send the current data for those subscriptions back to the UI, enabling faster refresh than waiting for the services to wake up and check for subscriptions.
Each message sent by the server over the WebSocket is received by the onmessage
function. This function decodes the message and updates the relevant context data. Since other components are listening to that context as well, when the data lists are updated, the components refresh.
When the user logs out or the server shuts down unexpectedly, onerror
is called. This allows us to set a flag so we don’t infinitely try to look at the cookie and keep trying to re-login.
// This effect checks to see if we have a cookie set so we can automatically login useEffect(() => { const username = checkCookie(); if (username && !isLoggedIn && !serverDown) { authenticated(username); } }, [isLoggedIn, authenticated, serverDown]);
Here is the useEffect
hook that allows us to automatically login if the cookie is set. The hook only executes if the isLoggedIn
or serverDown
flags change. The authenticated
function is surrounded by the useCallback
hook, so it will never change. However, my IDE complains if I don’t put it there.
When the user logs out, all of the subscriptions are removed, and then the logout API is called. In order to know when all of those subscriptions have been removed by the server, another useEffect
hook is used.
// This effect looks to see if all subscriptions have been removed while logging out. // When that happens, then we can finish up and close everything cleanly. useEffect(() => { const checkFinishedLogout = async () => { // if we are logging out, check to see if all of the subscriptions have been updated // from the result of the websocket remove subscription calls. if everything is done, // finish the logout if (isLoggingOut && !stockList.length && !upDownList.length && !weatherList.length) { const response = await fetch('/api/logout', { method: 'POST', headers: { 'Accept': 'application/json', 'Content-Type': 'application/json;charset=utf-8', 'Content-Length': 2 }, body: {} }); if (response.status === 200) { await response.json(); if (websocket) { websocket.close(); setWebsocket(null); } deleteCookie(clientName); setClientName(null); } else { alert(`Error: ${response.status}`); } } }; checkFinishedLogout(); }, [clientName, isLoggingOut, stockList, upDownList, weatherList, websocket]);
The dependencies on this hook – stockList
, upDownList
, and weatherList
– are what makes it function. As each subscription is stopped, the WebSocket receives that information and updates the according list. Each time any list changes, the hook reruns. Once all three lists are empty, we know the server has finished removing all of the subscriptions, so we can safely log off.
The rest of the React code is pretty vanilla. The ‘Main’ component is where the logged-in UI is rendered. It uses the ServiceList
component to render each of the data sets for the three services. ServiceList
then uses the Stock UpDown
, and Weather components to render each item in their respective lists.
Scripts
The bash scripts I’ve included will hopefully make your life a lot easier when it comes to starting up and shutting down. The scripts check to ensure Maven and Java are installed and have the correct versions. They download Kafka for you, install it, and configure it. They then build the project and start both the services and the server. The kill script shuts everything back down and cleans up.
Note: for the WSL script, I’m using tmux. As such, you won’t see new windows appear as each process starts because they are starting as background processes. If you run ps -ef
you’ll see all the Java processes running in the background. The scripts run without issues on WSL1. For WSL2, I noticed some errors when starting the scripts, but oddly it doesn’t seem to keep them from working correctly.
For Mac, use the basic Mac terminal so it can execute the osascript
to create new tabs for you and to close them as well (you may have to allow script execution depending on what release of MacOS you are using).
For Linux, x-terminal-emulator
is used to kick off new shells.
Conclusion
I hope you’ve enjoyed this discussion about Netty, Kafka, and React. As you can see, React plays well with WebSockets. By using it instead of the normal REST madness, applications can be more responsive while doing away with the headaches of figuring out how to push data out to clients.
With a stable, fast server you could now take this code and add Kafka clustering, a database, HTTPS, security, and have a great start at producing a production quality microservice architecture with WebSockets all the way to the client. That sounds like a lot, and it is, but at least the foundation is solid.
If you run into any snags, please create an issue for me in my github project at https://github.com/jwboardman/whirlpool/issues.
Thanks for reading!