For a recent project, I had to integrate AKKA TCP with our REST server built on play framework. We faced and solved many issues while trying to scale the application for high concurrency. Here are the details

Requirment:
We were building REST API. For one particular API, the request was then routed to a TCP server(http request transformed to XML)
And once TCP server sends the response back,return the response to the same http request.

First method

was straight forward, whenever we get HTTP request

  1. convert it to XML.
  2. Send to Server using java
  3. Iteratively check for response for pre-configured number of times.
  4. If no response, report failure
  5. If we get response from server within the time, report the result to user.
Issues
  1. Sequential. Only one HTTP request will be served at a time.
  2. Polling at the TCP server.
Solution

Use reactive library for TCP communication.

Second method

Since we were using play framework for AKKA was our first choice. Akka library provides actor based programming for concurrent computation. AKKA has built in support for handling TCP( AKKA TCP).

Connecting and sending messages to server is pretty straight forward as you can see from the example code.

public class Client extends UntypedActor{
    final InetSocketAddress remote;
public static Props props(InetSocketAddressremote) {
  return Props.create(Client.class, remote);
}
//Constructor takes remote server IP address
public Client(InetSocketAddress remote) {
   this.remote = remote;
//All of the Akka I/O APIs are accessed through manager objects.
   final ActorRef tcp = Tcp.get(getContext().system()).manager();
//Using the manager object, try to connect to the server. Response from this connect will be handled by onRecieve method below
   tcp.tell(TcpMessage.connect(remote), getSelf());
}
@Override
public void onReceive(Object msg) throws Exception {
if (msg instanceof CommandFailed) {
//If the tcp connect fails
  listener.tell("failed", getSelf());
  getContext().stop(getSelf());
} else if (msg instanceof Connected) { //If the tcp connected successfully
//Once connection is made, we need to send a register message to the connection to make it active.
getSender().tell(TcpMessage.register(getSelf()), getSelf());
//Change the behaviour of this connection to "connected" method.
  getContext().become(connected(getSender()));
 }
}
//Actual code block responsible for server communication.  
private Procedure<Object> connected(final ActorRef connection) {
return new Procedure<Object>() {
  @Override
  public void apply(Object msg) throws Exception {
/*Once the actor has changed its behaviour to this Procedure, all the messages addressed to this actor will be processed here. In the first *if* block, upon recieving a string, actor writes it to the connection
*/
    if (msg instanceof String) {          connection.tell(TcpMessage.write((ByteString.fromString(istMessage)) msg), getSelf());      
    } else if (msg instanceof CommandFailed) {
      // OS kernel socket buffer was full        
    } else if (msg instanceof Received) {
/* 
Received event indicates data arrived from the server. Code is self explanatory
*/
      listener.tell(((Received) msg).data(), getSelf());       
    } else if (msg instanceof ConnectionClosed) {
      getContext().stop(getSelf());
     }
   }
  };
 }
}

In the initial iteration, we tried a simple straight forward solution using AKKA TCP.

  1. One TCP actor connects to TCP server and receives from server.
  2. On http request, using ask protocole send message to tcp actor. Actor make the request, send it to server.
  3. Once response received from server, send it back to message originator. We might have to keep the sender actor ref for this.

Or we can use the completion stage. While making the request, send one completion stage also with the request to Actor.
Actor saves the future in memory. Once we get the response, get the future we saved earlier, and make it complete or exception based on the result.

In controller we can have something like

 public CompletionStage<Result> tcpWithFuture() {
    String message = getXMLMessageFromHttpRequest(request);
    CompletableFuture<String> future =new CompletableFuture<>();
    MessageProtocol message = new MessageProtocol(future, message);
    clientActor.tell(message,ActorRef.noSender());
    return future.thenApply(res-> {
        return ok(res.toString());
    });
}

and in TCP actor we need to keep the futures somewhere. For brevity we can have a HashMap.

if (msg instanceof MessageProtocol) {      
String toSend = ((MessageProtocol) msg).message;            connection.tell(TcpMessage.write(ByteString.fromString(toSend)), getSelf());      
} else if (msg instanceof Received) {
   String data = (Received) msg).data();   
   int requestId = getRequestId(data);
   if(map.contains(requestId)) {
     CompletableFuture future = map.get(requestId);
     String responseData = getResponseData(data);
     future.complete(responseData);
     map.remove(requestId)
  }
}
Issues

This solution was working fine for minimal load/concurrency. After 100 concurrent users we started to get issues.
As you can see from the sample code, we started getting socket buffer full error.

else if (msg instanceof CommandFailed) {
      // OS kernel socket buffer was full  
    LOG.error("OS kernel socket buffer was full");      
 }

The basic model of the TCP actor is that it has no internal buffering. Most of the time it happens when you are not handling the packet buffering and there are more packets to be sent. In our case that was the issue, not the system network buffer.

Solution

In the above method we are doing all the processing in the same TCP actor. So we can make most of processing parallel. Looking at the code we found that most of the processing was to

  1. Construct XML request from http request.
  2. Create http response from the response received from TCP server.

AKKA provides very easy to use tools and methods for concurrency. And AKKA encourages to use as many actors we need. In our case we can create multiple actors to process input request and another set of actors to process output response from TCP server.

to create multiple actors, we can use AKKA routing. First create an actor to process the request

public class RequestHandler extends UntypedActor {

@Override
public void onReceive(Object msg) throws Exception {
    if (msg instanceof String) {
        // Do request processing
....
....
getSender().tell(processedOutput, getSelf());
    }
}

}

Then create configuration

akka.actor.deployment {
  /RequestHandler {
  router = round-robin-pool
  nr-of-instances = 10
 }
}

then create multiple actors using

requestProcessor =   system.actorOf(Props.create(RequestHandler).withRouter(new           FromConfig()), "RequestHandler");

Notice the router parameter in config which says round-robin-pool. This means when we run this, there will be 10 identical actors created, and messages send to this actor will be delivered in round-robin fashion. AKKA provides many other algorithms like

1. akka.routing.RoundRobinRoutingLogic
2. akka.routing.RandomRoutingLogic
3. akka.routing.SmallestMailboxRoutingLogic
4. akka.routing.BroadcastRoutingLogic
5. akka.routing.ScatterGatherFirstCompletedRoutingLogic
6. akka.routing.TailChoppingRoutingLogic
7. akka.routing.ConsistentHashingRoutingLogic

Or you can even create your own routing logic.

Similiarly we can create multiple actors to process the output response from TCP server.

These changes helped to increase the concurrency of our application. But soon we crossed 1000 concurrent users on a minimal hardware, we started getting the old error again. Network buffer full!!. Again we have the buffering issue. AKKA TCP connection no internal buffering (i.e. one write at a time, meaning it can buffer one write until it has been passed on to the O/S kernel in full). Congestion needs to be handled at the user level, for both writes and reads. To solve this we can use any of the solution provided by AKKA itself ([throttling](The basic model of the TCP connection actor is that it has no internal buffering (i.e. it can only process one write at a time, meaning it can buffer one write until it has been passed on to the O/S kernel in full). Congestion needs to be handled at the user level, for both writes and reads.)). Once we implemented the throttling, we were able to manage much higher concurrency.

A sample project implementing above mentioned methods can be found here