This article is the final one of the series about distributed reactive apps in Erlang/Elixir. In the first article you will find the theoretical basis of reactive architecture. The second one illustrates the main templates and mechanisms of building such systems.

Today we will address the questions of code base development and projects development in general.

Service organization

When we deal with service development in real life, we often have to combine several interaction patterns in one controller. For example, service which performs managing the project users’ profiles must respond to req-resp requests and send notifications on profile update events using pub-sub. This case is quite simple: behind messaging, there is just one controller implementing the service logic and publishing the updates.

It’s getting more complicated when we have to create a fault-tolerant distributed service. Let’s imagine that the requirements for users service have changed:

  1. The service must now process requests at 5 nodes of the cluster,
  2. It must be capable of carrying out background processing tasks,
  3. It should be also able to dynamically manage the subscription lists

Note: In this article we are not addressing the question of consistent storing and replication of data. Let’s suppose that these issues have been solved before — the system already has a scalable and reliable storage layer and the handlers can interact with it.

Formal description of users service got more complicated. From a programmer’s point of view, the changes are minimal due to the use of messaging. To meet the first requirement, we have to define balancing algorithm on the req-resp.

The requirement of handling background processes comes up quite often. In users it can be checking users’ documents, processing of downloaded multimedia or synchronizing data with social networks. These tasks should be distributed within cluster, and their implementation must be controlled. That is why we have two options: either to use the pattern from the previous article or, if it doesn’t suit us, write a customized task scheduler which will be in control of handlers pool.

The third point requires pub-sub pattern extension. To implement it, we should first create a pub-sub exchange and, then, run the controller of this exchange within our service. This way we kind of move out managing subscribing events and lists of subscribers from the messaging layer to users implementation.

As a result, task decomposition has shown that to meet the requirements we should run 5 copies of the service on different nodes and create an extra entity — a pub-sub controller which is responsible for subscription.

To run 5 handlers, it’s not necessary to change the service code. There is just one additional action — setting the rules of balancing on the exchange. We will talk about this a bit later. Also, there is one more difficulty: pub-sub controller and customized scheduler must work as a singleton. The messaging service, being fundamental, must provide a mechanism for leader election.

Leader election

In distributed systems this is a procedure of nominating the only process which is responsible for scheduling and distributing tasks and load over the cluster.

In systems which don’t tend to be centralized some universal and consensus-based algorithms are used, for instance, paxos or raft.

As messaging is a broker and a central element at the same time, it knows about all service controllers — leadership candidates. Messaging can appoint the leader without voting.

After starting and connecting to an exchange, all services receive a system message:

#’$leader’{exchange = ?EXCHANGE, pid = LeaderPid, servers = Servers}

If LeaderPid matches current process pid, it is appointed as the leader. The Servers list includes all nodes and their parametres.

When a new cluster node appears and the currently working one is turned off, all service controllers get

#’$slave_up’{exchange = ?EXCHANGE, pid = SlavePid, options = SlaveOpts}

and

#’$slave_down’{exchange = ?EXCHANGE, pid = SlavePid, options = SlaveOpts}

respectively.

This way, all components are aware of all changes. What’s more, there is just one guaranteed leader in a cluster at any given moment.

Middleware

Middleware is convenient to use if we need to implement complex distributed processing or to optimize existing architecture.

If you don’t want to change service code and deal with the issues of additional processing, routing of message logging, you can turn on proxy handler which will do all the job.

A classic example of pub-sub optimisation is a distributed app with a business core which is generating update events like change of price at the market. Such an app also has an access layer — N servers which provide web clients with websocket API.

Client service, if deal with it outright, looks like this:

  • Client establishes connection with platform. On the side of the server, terminating the traffic, the process is launched which serves this connection,.
  • Authorization and subscription to updates take place within service process. The process calls subscribe method for topics.
  • When an event has been generated in the core, it is delivered to the processes which serve the connection.

Let’s imagine that we have 50000 subscribers to the topic ‘news’. The subscribers are evenly distributed across 5 servers. Eventually, every update, when it gets to the exchange, is replicated 50000 times: 10000 times per server, according to the number of subscribers. Doesn’t look like an effective scheme, right?

To improve it, let’s introduce a proxy which has the same name with the exchange. Global names registry must be able to resolve nearest process by name in a group as this is important.

Then, let’s run this proxy on access layer servers. All our processes which serve websocket api will subscribe to it, not to the initial pub-sub exchange in the core. Proxy subscribes to the core only in case of unique subscription and replicates the message which it gets to all its subscribers.

As a result, 5 messages instead of 50000 will be sent between the core and access servers.

Routing and balancing

Req-Resp

In current implementation of messaging there are 7 strategies of request distribution:

  • default. Request goes to all controllers.
  • round-robin. Controllers are assigned for incoming request in a cyclic way.
  • consensus. Controllers providing the service are divided into the leader and slaves. Requests go to the leader only.
  • consensus & round-robin. There is a leader in the group, but requests are distributed between all members.
  • sticky. Hash function is calculated and reserved for a certain handler. All the following requests with this signature go to the same handler.
  • sticky-fun. When an exchange is initialized, hash function for sticky balancing should be defined.
  • fun. Equivalent to sticky-fun, but you can also redirect, decline or pre-process it.

Distribution strategy is defined when an exchange is initialized.

Apart from balancing, messaging enables us to tag entities. Let’s look at tag types in a system:

  • Connection tag. Allows us to understand the connection via which the events have arrived. It is used when controller process connects to the same exchange but with different routing keys.
  • Service tag. Allows us to unite handlers in groups within one service and expand routing and balancing opportunities. Routing is linear for req-resp pattern. We send a request to an exchange, the exchange passes it to the server. However, if we have to split handlers into logical groups, the splitting is done with the help of tags. If a tag is indicated, the request will be sent to a certain group of controllers.
  • Request tag. Allows us to distinguish responses. As our system is asynchronous, we need to have an opportunity to indicate RequestTag to process service responses. By this tag we will understand the response to which request we have received.

Pub-sub

It gets a bit easier with pub-sub. We have an exchange which gets the messages published. The exchange distributes the messages among subscribers that have subscribed to the routing keys they need (you could say it’s the equivalent of topics).

Scalability and fault-tolerance

System scalability generally depends on the scalability of its layers and components:

  • Services are scaled by adding extra nodes with the service handlers to the cluster. One can choose the optimal routing policy during the trial operation.
  • The messaging service itself within a certain cluster is generally scaled either by moving the most loaded exchanges to separate cluster nodes or by adding proxy processes to the most loaded cluster zones.
  • The whole system scalability depends on the flexibility of architecture and an opportunity to unite separate clusters into one logical entity.

Project success often depends on simplicity, fast transformation and scalability. Messaging in current implementation is growing with the app. Even if the cluster of 50–60 machines is not enough for us, we can use a federation. Unfortunately, the issues of federation extend beyond this particular article.

Reservation

We have already discussed service controllers reservation while talking about load balancing. However, messaging should be reserved too. If a node or a machine fails, messaging must automatically recover as quickly as possible.

In my projects I use additional nodes which can take part of the load in case of a failure. Erlang has a standard realization of distributed mode for OTP apps. Distributed mode is responsible for recovery by running the fallen app on another node which has been started beforehand. The process is clear: after a failure, the app automatically moves to the failover node. You can get more details about this functionality here.

Performance

Let’s try, at least roughly, to compare rabbitmq performance with the one of our customized messaging. I’ve come across the official test results of rabbitmq from openstack team. In point 6.14.1.2.1.2.2. of the document the following RPC CAST result is shown:

Let’s not use any advanced settings of OS kernel or erlang VM. Testing environment:

  • erl opts: +A1 +sbtu.
  • Test within one node is run on a laptop with an old mobile version of Intel i7.
  • Cluster tests are run on the servers with 10G network.
  • Code works in docker containers. The network is in NAT mode.

The code for the test:

req_resp_bench(_) ->
   W = perftest:comprehensive(10000,
        fun() ->
           messaging:request(?EXCHANGE, default, ping, self()),
           receive
              #'$msg'{message = pong} -> ok
           after 5000 ->
             throw(timeout)
           end
        end
   ),
   true = lists:any(fun(E) -> E >= 30000 end, W),
   ok.

Scenario 1: The test is run on a laptop with an old mobile version of Intel i7. The test, messaging and service are all implemented on the same node and in one docker-container:

Sequential 10000 cycles in ~0 seconds (26987 cycles/s)
Sequential 20000 cycles in ~1 seconds (26915 cycles/s)
Sequential 100000 cycles in ~4 seconds (26957 cycles/s)
Parallel 2 100000 cycles in ~2 seconds (44240 cycles/s)
Parallel 4 100000 cycles in ~2 seconds (53459 cycles/s)
Parallel 10 100000 cycles in ~2 seconds (52283 cycles/s)
Parallel 100 100000 cycles in ~3 seconds (49317 cycles/s)

Scenario 2: 3 nodes are run on various machines via docker (NAT).

Sequential 10000 cycles in ~1 seconds (8684 cycles/s)
Sequential 20000 cycles in ~2 seconds (8424 cycles/s)
Sequential 100000 cycles in ~12 seconds (8655 cycles/s)
Parallel 2 100000 cycles in ~7 seconds (15160 cycles/s)
Parallel 4 100000 cycles in ~5 seconds (19133 cycles/s)
Parallel 10 100000 cycles in ~4 seconds (24399 cycles/s)
Parallel 100 100000 cycles in ~3 seconds (34517 cycles/s)

In all cases CPU utilization hasn’t exceeded 250%.

Results

I hope this series doesn’t look like a brain dump and my experience will be of real use both for distributed systems researchers and for practitioners who are just starting out. I’m sure that some of them are looking at Erlang/Elixir with interest but have their doubts.