Use RSocket to Send Data from a Cloud Pub/Sub Subscription to a Recharts UI

Create an RSocket server with Spring Boot to stream data from a Pub/Sub subscription and to a Recharts (React) Frontend

Gabriel Fajardo
9 min readFeb 16, 2021
https://unsplash.com/photos/7ZWVnVSaafY

In this blog post, we’ll learn a bit about RSocket. We’ll focus on its “request-stream” capability. Then we’ll create a server with Spring Boot. Spring Boot provides libraries to work with RSocket as well as Google’s Pub/Sub cloud service.

Also, we’ll use Python to create an ad-hoc program that streams “mock” data into out Pub/Sub topic. This way we’ll be able to stream this data through our RSocket server and see a real-time dashboard. We’ll build the dashboard using React’s Recharts library.

If you want to follow along or just see the finished code, then here is the project repo.

For an introduction to how the Pub/Sub mechanism works, please check out the intro material from one of my previous blog posts.

Streams

Something unique about receiving real-time data is that the typical request-response communication model won’t really work. For that we need to look into other forms of communication.

Sure, we can poll Pub/Sub but that is inefficient.

In Java, there are a few ways to do this. At my previous job, we were developing a Tax service back end with Spring WebFlux. This kind of server is supported by Netty (the default), which uses its in and out channels to asynchronously stream data to and from clients and servers.

However, there is still the problem of how to communicate between applications. RSocket also runs on Netty, but also provides different implementations so different applications can communicate in a non-blocking and “reactive” way. We’ll soon see what reactive means and the benefits of RSocket.

RSocket

RSocket supports four communication models:

  • Request-Response
  • Fire-and-Forget
  • Request-Stream
  • Request-Channel

We can send real-time data with RSocket— either with “request-stream” or “request-channel”. But what is it exactly?

RSocket Protocol

RSocket is an application protocol. If you’re a bit familiar with the IP layers, then you know the lowest level is the ethernet wire, then there are different transport protocols like TCP and UDP, and towards the top there are more protocols detailing how your application interacts with other services, including HTTP.

According to the docs, RSocket is “designed for and tested with TCP, WebSocket, Aeron, and HTTP/2 streams as transport protocols”. So, it runs over the transport layer and uses it to carry “RSocket frames”.

These frames can specify the type of interaction mode to use, for example REQUEST_STREAM Frame (0x06). And more, like the KEEPALIVE and ERROR frames.

Once the request-stream frame is sent, the protocol still needs to ensure an interaction that doesn’t “cause ‘head of line blocking’, overwhelm network and application buffers, and produce more data on the server than the client can handle”.

This flow control is achieved with Reactive Stream semantics and Lease semantics. The Reactive Stream semantics is a “credit-based model” or “request(n)” that tells the server, okay give me these number of payloads. Then, this credit is enforced by the two applications as they’re both communicating with this protocol.

RSocket ensures interactions don’t “cause ‘head of line blocking’, overwhelm network and application buffers, and produce more data on the server than the client can handle”

What I particularly like, is that it is able to handle large streams of data — something that TCP handles by cycling its buffers between full and empty, thus “under-utilize the buffering drastically (as well as the throughput)”

One thing to keep in mind is that this is a very new technology. The docs say, “final testing is being done in Java and C++ implementations with goal to release 1.0 in the near future.” Early adopters include Netflix, Pivotal, Alibaba, and Facebook.

So, now that we understand a little more what this protocol is, I want to show you how to build an RSocket server and use its “request-stream” capability.

Spring Boot + RSocket

So first we need to set up a Spring Boot project. Here is a condensed version of the pom.

Next, in the src/main/resources directory I have a properties file like this (see the repo for the directory structure):

// application.yml
spring:
main:
lazy-initialization: true
rsocket:
server:
port: 7000
transport: websocket

In the src/main/java directory, make a directory structure (like what I’ve done com/fajardo/countrysentiments) and add a Spring Boot Application class like this one:

@SpringBootApplication
public class CountrySentimentApplication {

public static void main(String args[]) {
SpringApplication.run(CountrySentimentApplication.class, args);
}
}

Just a heads up: if you make the Spring Boot Application class in the “default” directory, which is src/main/java, the app won’t work. If you try it, then it will say something like you are @ComponentScanning a springframework package (e.g. if you put a @ComponentScan in the default package by mistake).

The @SpringBootApplication annotation gives us a “component scan” so we can find controllers and what not, but if the annotated class is in the default package, then this error occurs.

But hopefully, you can now run your application.

Controller + Model

Now, let’s add a controller to the /controllers directory. I’ve made a nice pattern of creating an interface and a controller implementation, and this helps for unit testing and dependency injection.

Now, the data from the Pub/Sub subscription could be anything, but in this post the data we produce is a CountrySentiment. So, we’ll call this controller the CountrySentimentsController. And to quickly test something, we’ll return some static data like this:

RSocket Client CLI

With this code in place, we can install the RSocket Client CLI and test our code with this command in the terminal:

rsc --stream --data "{\"projectId\":\"CLOUD_PROJECT_ID\", \"subscriptionId\":\"CLOUD_PULL_SUBSCRIPTION_ID\"}" --route country-sentiments ws://localhost:7000/country-sentiments

The --stream asks for an RSocket request-stream endpoint. The --data is the parameter that is accepted in our controller’s method signature, and the --route is set to the @RequestMapping value we declared.

We should see the following output in the console after we run the command:

{"country":"United States","averageSentiment":3.0}
{"country":"Costa Rica","averageSentiment":4.0}

Flux with Pub/Sub

We’re looking good at this point. Now we need to set up our own GCP projects with a topic, and a pull-subscription. Also, we need to configure our GOOGLE_APPLICATION_CREDENTIALS and GOOGLE_CLOUD_PROJECT environment variables. The second variable is specifically necessary so Spring Boot’s Pub/Sub library can find the subscription in the correct project.

With these configurations, we can now use the Pub/Sub libraries. This is our final controller code.

The emitter is a FluxSink that adds data with the next() method when there’s a new message in the Pub/Sub Subscription. The way we’re able to get this message is by declaring the subscription’s handler within the FluxSink declaration. This way, as we’re telling the subscription how to handle the message we can also add it to the sink.

These last bits of code are from Java’s Reactive Streams implementation: Reactor.

And, now you’ll see that if you test the endpoint again, the CLI tool doesn’t exit right away. Instead, it stays there waiting for more Pub/Sub data to be emitted.

Python Publisher

So now, let’s make a publisher that will continuously publish CountrySentiment data to our topic.

First, let’s create a virtual environment so we can install some google cloud dependencies.

Please follow the instructions here if you don’t know how. Basically, this will create a dedicated folder where we can install our Python packages without affecting other programs in your computer that also use Python.

If you’re not familiar with Python and don’t want to learn you can either skip this section or replicate this program with the Google Cloud Java SDK.

First install Google Cloud’s Pub/Sub Python SDK:

pip install --upgrade google-cloud-pubsub

We can use the Pub Sub library in our project now. And please remember to use the credentials file that you created.

Notice we’re reading data from a file. You can copy the file from the repo. And the repo has a utility for making your own file.

Also, I made a sleep() call after the publish() call so the information can trickle into the UI.

Before you can run the file, we need to create a settings.py file. We’ll use dotenv to create some environment variables for us. So I will show you the two files you’ll need.

# file name: .env
PROJECT_NAME=you-cloud-project-id
PUBSUB_TOPIC_NAME=your-cloud-pubsub-topic-id

This is using the dotenv package, which allows us to set environment variables through the .env file. And we’re doing this so we can exclude these settings from our version control system. In other words, we don’t want these settings to be made public. Read more about this here.

Finally, with a file of countries in place we can run this app. Then, in another terminal, start up the RSocket server and make a request to it with our CLI client.

If your server is logging the messages, then we’re ready to implement our React frontend.

Recharts Bar Chart

First, in our project directory, let’s create a react app. I’ll be using the create-react-app CLI.

So, in my root directory, I run:

npx create-react-app webapp

Remember, we need npm installed.

I’m setting the App.js file to simply this:

import CountrySentimentBarChart from './components/CountrySentimentBarChart';function App() {
return (
<CountrySentimentBarChart />
);
}
export default App;

The CountrySentimentBarChart has all the rendering and logic for getting the data. Let’s walk through the code.

Render

Let’s install the Recharts library in our project.

yarn add recharts

I’m making a few modifications to this basic chart. Our data will only be numbers from 1–4, so I set the domain on the YAxis tag as such, with an extra space.

The key should be displayed differently than the “camel casing” dataKey prop on the <Bar /> so I gave it an explicit name.

And most importantly, I’m making a new array of the state.countrySentiments so that Recharts re-renders the <Bar /> node.

RSocket Client

Next, we’ll examine the componentDidMount()getCountrySentiment(). Every time we refresh the page, React re-renders the page and once a node is rendered, the componentDidMount lifecycle method is triggered. So, at this point we have our chart displaying on the page, which makes it a good time to start receiving data.

First let’s install the rsocket-js dependencies.

yarn add rsocket-core rsocket-websocket-client

First thing we do is create an RSocketClient. And we set its transport to the RSocketWebSocketClient.

The code is generally self-explanatory, but I want to emphasize how different the request is from an http request. I would’ve expected the request to look something like ws://localhost:7000/country-sentiments and then passed the request data in the body. However, we’re using the request’s meta-data to specify the route.

And we know that RSocket establishes only one connection between server and client, instead of one for every request like http does. So, once the client calls the connect().subscribe() it looks like its making this long-lived connection.

Also, I want to point out that we can see the “request(n)” semantics when the socket makes a subscription. We’re actually telling the server to give us the maximum number of payloads that we could ask. And if I understood correctly, we could ask for more if we somehow run out of these “credits”.

Conclusion

In this post, we learned RSocket is a new technology that allows our real-time applications to communicate with each other. We looked at the reactive semantics and focused specifically on the “request-stream” interaction mode.

Then we started up a Spring Boot application where we used the rsocket-starter to bootstrap the server. Then we used the gcp-starter-pubsub to subscribe to a topic on Google’s Pub/Sub service.

We first made manual publishes to this topic, but then we created our own publisher using the Cloud Pub/Sub SDK for Python.

Finally, we got to see the request from a client. We made the request in the componentDidMount lifecycle method. There we saw the request(n) as well as the metadata that actually routes to our endpoint.

If you’re interested in using real data with this dashboard, then I’ve created a data pipeline with the Twitter Streaming API. You can visit this blog post to learn more.

This is all for now. Thanks for taking the time to read and I hope you learned something new. If you have any questions or thoughts on the topic, please comment and I’ll talk to you soon. 👋

Resources

RSocket Demo Project

RSocket Docs

RSocket Client CLI

Pub/Sub in Spring Applications

Create a Data Pipeline with Apache Beam

--

--

Gabriel Fajardo

Computer Science grad from FIU. Back-end software engineer curious about data engineering. Currently doing contracting work for Cricket Wireless.