Building a robust 
messaging service 
with real time chat

Hi everyone, I'm a front-end web developer at Sweet Mustard. When I was working on a project, lots of events needed to be sent from a client to a server, and then back to some clients. These events had to be accessible at multiple places on these clients. I was in need of a versatile architecture to handle real time events sent from a server. We want to receive these events in a centralised place and share it to the rest of the application.

We’ll be building a demo application using Angular, RxJS, Socket.IO, Express and Node.js. However, the main focus is Socket.IO and RxJS. I won’t extensively tackle the server side implementation and Node.js setup.
Note that Socket.IO offers a lot of features that can make building a chat easier. Definitely take a look at rooms and namespaces.

Used technologies

What are websockets?

According to the MDN website “The WebSocket API is an advanced technology that makes it possible to open a two-way interactive communication session between the user’s browser and a server. With this API, you can send messages to a server and receive event-driven responses without having to poll the server for a reply.”
Developing chat apps, real time dashboards, thus overall two-way communication can be smooth and painless with websockets.

Neat

What are Angular, Socket.IO and RxJS?

Angular is a Typescript based open-source front-end framework.
Socket.IO is a JavaScript library that enables real time bidirectional communication between web clients and servers. Socket.IO adds extra features like broadcasting to the websocket protocol. It will use, when available, websockets, but will fallback to long-polling when needed.
And finally, as said on their website, “RxJS is a library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code.” In essence, it takes asynchronous programming to the next level

Sample app

For demonstration purposes I made a small application which is essentially just two buttons with each a score that shows how many times the button was clicked.

Rtdd 1

App structure

We want to keep these projects as simple as possible to focus on the Socket.IO implementation on the front-end. This is the finished application structure. Don’t worry about generating these files right now, as we will construct them along the way.

App client 1
App server 1

Setting up a basic server

All right, let’s get coding. First, we’ll generate and start an express server on http://localhost:3000/.

Protip: for development, use nodemon.io or something similar. It will watch for changes and restart your server automatically.

Let’s create a badass server

First install socket.io and replace or create the following files.

nom install --save socket.io

Now that everything is set up. There is a sweet homepage and the socket server is listening for connections.

Sweet Homepage

Building a robust socket service

I’m assuming you’re familiar with Angular and the CLI. So let’s generate an Angular project, our service and an interface.

cd ../
ng new app-client —style=scss
cd app-client/
ng generate service services/socket
ng generate interface types/clickValues
npm install —save socket.io-client
npm install —save-dev @types/socket.io-client

Create the service and the interface which will define the Typescript ClickValues type.
types/click-values.ts

When this service is first created, it will establish a connection with our server on port 3000 and store the socket connection object. In most cases the socket will reconnect automatically. However, “io server disconnect” means the server actively disconnected. The socket won’t reconnect automatically, so we do it manually.

If you aren’t familiar with RxJS you might be thinking “what the hell is this magic”.

Tenor

Let’s find the logic behind this code.

The RxJS manual mentions this: “A ‘multicasted Observable’ passes notifications through a Subject which may have many subscribers, whereas a plain ‘unicast Observable’ only sends notifications to a single Observer.” (see reference for more technical details)

Perfect. We want to share our observable to multiple subscribers in the application. We also want each subscriber to receive the last emitted value. And the service should start listening for server events as soon as it is created. This is very important, because the server will emit an event as soon as the client connects.

Note that an observable by default is only activated when you subscribe, not when it is created. So we’ll have to find a way around this issue.
Problems to solve:

  1. Share observable among subscribers
  2. Replay last value
  3. Start listening immediately

The publishReplay() pipe uses the multicast() pipe and a ReplaySubject under the hood. This solves the first two problems. Multicast will share one observable to multiple subscribers through a subject. And ReplaySubject will… replay x number of values. Who would’ve thought. Nice, two problems solved.

.pipe(
publishReplay(1)
)
.pipe(
multicast(new ReplaySubject(1))
)

All right, let’s solve the final problem. We should start listening to events as soon as the service is created. It is a quick fix with our current setup. Just call .connect() on the ConnectableObservable. This will subscribe and start receiving events. The service will now use the ReplaySubject to resend the last value when we subscribe to it elsewhere.
Replace attachWebsocketStreams() with this:

private attachWebsocketStreams() {
this.onClickValueChanged = this.clickValueChanged();
this.onClickValueChanged.connect();
}

Note that we immediately connect the observable after it is created.
Now we just need a way to send events to the server. Add this to the socket service:

Use the service in our component

So first, paste the styles and template. The styling has been written using the BEMmethodology

There are two different kinds of elements that need different kinds of data:
- Buttons: they will display the number of clicks
- Score meter at the bottom: this will display the score relative to each other.

The SIP principle is a great approach to handle asynchronous data and present it in a reactive way. I highly recommend reading this article from strongbow about the [SIP principle](https://blog.strongbrew.io/the-sip-principle/). However, since this complexity is not needed for our demo application, we’ll be taking a more simple approach. First we inject our service and initialize the websocket stream. Remember that when we create a new websocket service it will immediately start listening and remember the last received value. The server will send the current values to a client as soon as it connects. **Inject the service:**

constructor(private socketService: SocketService) {}

Create two new methods and call them when initializing.

clickValuesStream$: Observable;
relativePercentageClickValuesStream$: Observable;
ngOnInit() {
this.initClickValuesStream();
this.convertClickValuesToPercentageStream();
}

`initClickValueStream()` will simply place the socket stream in a variable which we can connect to our GUI or manipulate in other ways.

private initClickValuesStream() {
this.clickValuesStream$ = this.socketService.onClickValueChanged;
}

This method takes the `clickValuesStream$` and prepares it to display the score meter. Values received from the server are quite simple. Blue has received x number of clicks and red has received y number of clicks. (See `ClickValues` interface). We want to convert these values to a relative ratio. We’ll use the [map operator](https://www.learnrxjs.io/operators/transformation/map.html) to achieve this. We can simply calculate a ratio in the operator function and return it to a new stream `relativePercentageClickValuesStream$`.

private convertClickValuesToPercentageStream() {
this.relativePercentageClickValuesStream$ = this.clickValuesStream$.pipe(
map((values: ClickValues) => {
const relativePercentages: ClickValues = {
red: (values.red / (values.red + values.blue)) * 50,
blue: (values.blue / (values.red + values.blue)) * 50
};
return relativePercentages;
})
);
}

Add these methods which will inform the socket service a button has been clicked. The socket service will inform the server of these changes which will propagate to all connected clients.

onRedClicked(): void {
this.socketService.incrementClickValue(ClickValueType.RED);
}
onBlueClicked(): void {
this.socketService.incrementClickValue(ClickValueType.BLUE);
}

Final code in app.component.ts: app.component.ts

Change the html file to look like this:

We use the created streams in combination with the async pipe to automatically manage our subscriptions and make the data inside the observable available to the template. We also call the clicked method from the template using a click event.
Now build the angular app by executing the following command.

ng build

And on the server side, change the content of the get method in app.module.js to this:

res.sendFile(
path.join(__dirname, ‘../../app-client/dist/app-client’, ‘index.html’)
);

Don’t forget to import the path module.
When we start our backend server. The buttons application is available on http://localhost:3000/.

Wrap up and conclusion

We’ve learned a lot today. Our solution definitely shines where multiple output streams need to be generated from a single input stream. Like real time dashboards. And with a little bit of tweaking, it can be used as a foundation for a real time chat.
We used the publishReplay pipe to share an observable among it’s subscribers and to replay the last value it received. This pipe uses multicast with a ReplaySubject under the hood. We call .connect() on the ConnectableObservable this pipe creates. This will subscribe to the underlying observable.

The sample application is a low-level solution to see the technologies in action. There is so much more you can achieve. And if you want to download the sample application it can be found on my repository.

Obama