So erstellen Sie eine einfache Anwendung mit Akka Cluster

Wenn Sie meine vorherige Geschichte über Scalachain gelesen haben, haben Sie wahrscheinlich bemerkt, dass es weit davon entfernt ist, ein verteiltes System zu sein. Es fehlen alle Funktionen, um ordnungsgemäß mit anderen Knoten zu arbeiten. Fügen Sie hinzu, dass eine Blockchain, die aus einem einzelnen Knoten besteht, nutzlos ist. Aus diesem Grund habe ich beschlossen, dass es Zeit ist, an dem Problem zu arbeiten.

Da Scalachain von Akka unterstützt wird, sollten Sie die Chance nutzen, mit Akka Cluster zu spielen. Ich habe ein einfaches Projekt erstellt, um ein bisschen mit Akka Cluster zu basteln, und in dieser Geschichte werde ich meine Erkenntnisse teilen. Wir werden einen Cluster aus drei Knoten erstellen und dabei Cluster Aware Router verwenden, um die Last zwischen ihnen auszugleichen. Alles wird in einem Docker-Container ausgeführt, und Docker-Compose wird für eine einfache Bereitstellung verwendet.

Ok, lass uns rollen! ?

Schnelle Einführung in Akka Cluster

Akka Cluster bietet hervorragende Unterstützung bei der Erstellung verteilter Anwendungen. Der beste Anwendungsfall ist, wenn Sie einen Knoten haben, den Sie N-mal in einer verteilten Umgebung replizieren möchten. Dies bedeutet, dass alle N Knoten Peers sind, auf denen derselbe Code ausgeführt wird. Mit Akka Cluster können Sie sofort Mitglieder im selben Cluster ermitteln. Mit Cluster Aware Routern ist es möglich, die Nachrichten zwischen Akteuren auf verschiedenen Knoten auszugleichen. Es ist auch möglich, die Ausgleichsrichtlinie zu wählen, sodass der Lastausgleich ein Kinderspiel ist!

Eigentlich können Sie zwischen zwei Arten von Routern wählen:

Gruppenrouter - Die Akteure, an die die Nachrichten gesendet werden sollen - sogenannte Router - werden über ihren Akteurspfad angegeben. Die Router teilen sich die im Cluster erstellten Router. In diesem Beispiel wird ein Gruppenrouter verwendet.

Pool-Router - Die Router werden vom Router erstellt und bereitgestellt, sodass sie die untergeordneten Elemente in der Akteurhierarchie sind. Router werden nicht von Routern gemeinsam genutzt. Dies ist ideal für ein primäres Replikatszenario, bei dem jeder Router der primäre und seine Router die Replikate sind.

Dies ist nur die Spitze des Eisbergs, daher lade ich Sie ein, die offizielle Dokumentation zu lesen, um weitere Einblicke zu erhalten.

Ein Cluster für mathematische Berechnungen

Stellen wir uns ein Anwendungsszenario vor. Angenommen, Sie entwerfen ein System, um auf Anfrage mathematische Berechnungen auszuführen. Das System wird online bereitgestellt, daher benötigt es eine REST-API, um die Berechnungsanforderungen zu empfangen. Ein interner Prozessor verarbeitet diese Anforderungen, führt die Berechnung aus und gibt das Ergebnis zurück.

Derzeit kann der Prozessor nur die Fibonacci-Zahl berechnen. Wir beschließen, einen Cluster von Knoten zu verwenden, um die Last auf die Knoten zu verteilen und die Leistung zu verbessern. Akka Cluster übernimmt die Clusterdynamik und den Lastausgleich zwischen Knoten. OK, klingt gut!

Schauspielerhierarchie

Das Wichtigste zuerst: Wir müssen unsere Akteurshierarchie definieren. Das System kann in drei funktionale Teile unterteilt werden: die Geschäftslogik , die Clusterverwaltung und den Knoten selbst. Es gibt auch den Server, aber es ist kein Schauspieler, und wir werden später daran arbeiten.

Geschäftslogik

Die Anwendung sollte mathematische Berechnungen durchführen. Wir können einen einfachen ProcessorAkteur definieren , um alle Rechenaufgaben zu verwalten. Jede Berechnung, die wir unterstützen, kann in einem bestimmten Akteur implementiert werden, der ein Kind des Processoreinen ist. Auf diese Weise ist die Anwendung modular aufgebaut und einfacher zu erweitern und zu warten. Im Moment wird das einzige Kind Processorder ProcessorFibonacciSchauspieler sein. Ich nehme an, Sie können erraten, was seine Aufgabe ist. Dies sollte ausreichen, um zu beginnen.

Clusterverwaltung

Um den Cluster zu verwalten, benötigen wir a ClusterManager. Klingt einfach, oder? Dieser Akteur behandelt alles, was mit dem Cluster zu tun hat, wie die Rückgabe seiner Mitglieder, wenn er dazu aufgefordert wird. Es wäre nützlich zu protokollieren, was innerhalb des Clusters passiert, also definieren wir einen ClusterListenerAkteur. Dies ist ein untergeordnetes Element von ClusterManagerund abonniert Clusterereignisse, die diese protokollieren.

Knoten

Der NodeSchauspieler ist die Wurzel unserer Hierarchie. Es ist der Einstiegspunkt unseres Systems, der mit der API kommuniziert. Das Processorund das ClusterManagersind seine Kinder, zusammen mit dem ProcessorRouterSchauspieler. Dies ist der Load Balancer des Systems, der die Last auf Processors verteilt. Wir werden es als Cluster Aware Router konfigurieren, damit jeder auf jedem Knoten ProcessorRouterNachrichten an Processors senden kann .

Schauspieler Implementierung

Zeit, unsere Schauspieler umzusetzen! Zunächst implementieren wir die Akteure, die mit der Geschäftslogik des Systems zusammenhängen. Am Ende gehen wir dann zu den Akteuren für das Cluster-Management und dem Root-Akteur ( Node) über.

ProzessorFibonacci

Dieser Akteur führt die Berechnung der Fibonacci-Zahl durch. Es erhält eine ComputeNachricht mit der zu berechnenden Nummer und der Referenz des Akteurs, auf den geantwortet werden soll. Die Referenz ist wichtig, da es verschiedene anfragende Akteure geben kann. Denken Sie daran, dass wir in einer verteilten Umgebung arbeiten!

Sobald die ComputeNachricht empfangen wurde, fibonacciberechnet die Funktion das Ergebnis. Wir wickeln es in ein ProcessorResponseObjekt ein, um Informationen über den Knoten bereitzustellen, der die Berechnung ausgeführt hat. Dies wird später nützlich sein, um die Round-Robin-Richtlinie in Aktion zu sehen.

Das Ergebnis wird dann an den Schauspieler gesendet, auf den wir antworten sollen. Kinderleicht.

object ProcessorFibonacci { sealed trait ProcessorFibonacciMessage case class Compute(n: Int, replyTo: ActorRef) extends ProcessorFibonacciMessage def props(nodeId: String) = Props(new ProcessorFibonacci(nodeId)) def fibonacci(x: Int): BigInt = { @tailrec def fibHelper(x: Int, prev: BigInt = 0, next: BigInt = 1): BigInt = x match { case 0 => prev case 1 => next case _ => fibHelper(x - 1, next, next + prev) } fibHelper(x) } } class ProcessorFibonacci(nodeId: String) extends Actor { import ProcessorFibonacci._ override def receive: Receive = { case Compute(value, replyTo) => { replyTo ! ProcessorResponse(nodeId, fibonacci(value)) } } }

Prozessor

Der ProcessorSchauspieler verwaltet die spezifischen Subprozessoren wie den Fibonacci. Es sollte die Subprozessoren instanziieren und die Anforderungen an sie weiterleiten. Im Moment haben wir nur einen Subprozessor, so dass der Processoreine Art von Nachricht empfängt : ComputeFibonacci. Diese Nachricht enthält die zu berechnende Fibonacci-Zahl. Nach dem Empfang wird die zu berechnende Nummer FibonacciProcessorzusammen mit der Referenz des an a gesendet sender().

object Processor { sealed trait ProcessorMessage case class ComputeFibonacci(n: Int) extends ProcessorMessage def props(nodeId: String) = Props(new Processor(nodeId)) } class Processor(nodeId: String) extends Actor { import Processor._ val fibonacciProcessor: ActorRef = context.actorOf(ProcessorFibonacci.props(nodeId), "fibonacci") override def receive: Receive = { case ComputeFibonacci(value) => { val replyTo = sender() fibonacciProcessor ! Compute(value, replyTo) } } }

ClusterListener

Wir möchten nützliche Informationen darüber protokollieren, was im Cluster passiert. Dies könnte uns helfen, das System bei Bedarf zu debuggen. Dies ist der Zweck des ClusterListenerSchauspielers. Vor dem Start abonniert es die Ereignismeldungen des Clusters. Der Schauspieler reagiert auf Nachrichten wie MemberUp, UnreachableMember, oder MemberRemoved, um die entsprechende Ereignisprotokollierung. Wenn ClusterListeneres gestoppt wird, wird es von den Cluster-Ereignissen abgemeldet.

object ClusterListener { def props(nodeId: String, cluster: Cluster) = Props(new ClusterListener(nodeId, cluster)) } class ClusterListener(nodeId: String, cluster: Cluster) extends Actor with ActorLogging { override def preStart(): Unit = { cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember]) } override def postStop(): Unit = cluster.unsubscribe(self) def receive = { case MemberUp(member) => log.info("Node {} - Member is Up: {}", nodeId, member.address) case UnreachableMember(member) => log.info(s"Node {} - Member detected as unreachable: {}", nodeId, member) case MemberRemoved(member, previousStatus) => log.info(s"Node {} - Member is Removed: {} after {}", nodeId, member.address, previousStatus) case _: MemberEvent => // ignore } }

ClusterManager

Der für die Verwaltung des Clusters verantwortliche Akteur ist ClusterManager. Es erstellt den ClusterListenerAkteur und stellt auf Anfrage die Liste der Clustermitglieder bereit. Es könnte erweitert werden, um weitere Funktionen hinzuzufügen, aber im Moment ist dies genug.

object ClusterManager { sealed trait ClusterMessage case object GetMembers extends ClusterMessage def props(nodeId: String) = Props(new ClusterManager(nodeId)) } class ClusterManager(nodeId: String) extends Actor with ActorLogging { val cluster: Cluster = Cluster(context.system) val listener: ActorRef = context.actorOf(ClusterListener.props(nodeId, cluster), "clusterListener") override def receive: Receive = { case GetMembers => { sender() ! cluster.state.members.filter(_.status == MemberStatus.up) .map(_.address.toString) .toList } } }

ProcessorRouter

Der Lastausgleich zwischen Prozessoren wird von der ProcessorRouter. Es wird vom NodeAkteur erstellt, aber dieses Mal werden alle erforderlichen Informationen in der Konfiguration des Systems bereitgestellt.

class Node(nodeId: String) extends Actor { //... val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") //... }

Lassen Sie uns den relevanten Teil in der application.confDatei analysieren .

akka { actor { ... deployment { /node/processorRouter { router = round-robin-group routees.paths = ["/user/node/processor"] cluster { enabled = on allow-local-routees = on } } } } ... }

Als erstes müssen Sie den Pfad zum Router-Akteur angeben /node/processorRouter. Innerhalb dieser Eigenschaft können wir das Verhalten des Routers konfigurieren:

  • router: Dies ist die Richtlinie für den Lastausgleich von Nachrichten. Ich habe das gewählt round-robin-group, aber es gibt viele andere.
  • routees.paths: these are the paths to the actors that will receive the messages handled by the router. We are saying: “When you receive a message, look for the actors corresponding to these paths. Choose one according to the policy and forward the message to it.” Since we are using Cluster Aware Routers, the routees can be on any node of the cluster.
  • cluster.enabled: are we operating in a cluster? The answer is on, of course!
  • cluster.allow-local-routees: here we are allowing the router to choose a routee in its node.

Using this configuration we can create a router to load balance the work among our processors.

Node

The root of our actor hierarchy is the Node. It creates the children actors — ClusterManager, Processor, and ProcessorRouter — and forwards the messages to the right one. Nothing complex here.

object Node { sealed trait NodeMessage case class GetFibonacci(n: Int) case object GetClusterMembers def props(nodeId: String) = Props(new Node(nodeId)) } class Node(nodeId: String) extends Actor { val processor: ActorRef = context.actorOf(Processor.props(nodeId), "processor") val processorRouter: ActorRef = context.actorOf(FromConfig.props(Props.empty), "processorRouter") val clusterManager: ActorRef = context.actorOf(ClusterManager.props(nodeId), "clusterManager") override def receive: Receive = { case GetClusterMembers => clusterManager forward GetMembers case GetFibonacci(value) => processorRouter forward ComputeFibonacci(value) } }

Server and API

Every node of our cluster runs a server able to receive requests. The Server creates our actor system and is configured through the application.conf file.

object Server extends App with NodeRoutes { implicit val system: ActorSystem = ActorSystem("cluster-playground") implicit val materializer: ActorMaterializer = ActorMaterializer() val config: Config = ConfigFactory.load() val address = config.getString("http.ip") val port = config.getInt("http.port") val nodeId = config.getString("clustering.ip") val node: ActorRef = system.actorOf(Node.props(nodeId), "node") lazy val routes: Route = healthRoute ~ statusRoutes ~ processRoutes Http().bindAndHandle(routes, address, port) println(s"Node $nodeId is listening at //$address:$port") Await.result(system.whenTerminated, Duration.Inf) }

Akka HTTP powers the server itself and the REST API, exposing three simple endpoints. These endpoints are defined in the NodeRoutes trait.

The first one is /health, to check the health of a node. It responds with a 200 OK if the node is up and running

lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) }

The /status/members endpoint responds with the current active members of the cluster.

lazy val statusRoutes: Route = pathPrefix("status") { concat( pathPrefix("members") { concat( pathEnd { concat( get { val membersFuture: Future[List[String]] = (node ? GetClusterMembers).mapTo[List[String]] onSuccess(membersFuture) { members => complete(StatusCodes.OK, members) } } ) } ) } ) }

The last (but not the least) is the /process/fibonacci/n endpoint, used to request the Fibonacci number of n.

lazy val processRoutes: Route = pathPrefix("process") { concat( pathPrefix("fibonacci") { concat( path(IntNumber) { n => pathEnd { concat( get { val processFuture: Future[ProcessorResponse] = (node ? GetFibonacci(n)).mapTo[ProcessorResponse] onSuccess(processFuture) { response => complete(StatusCodes.OK, response) } } ) } } ) } ) }

It responds with a ProcessorResponse containing the result, along with the id of the node where the computation took place.

Cluster Configuration

Once we have all our actors, we need to configure the system to run as a cluster! The application.conf file is where the magic takes place. I’m going to split it in pieces to present it better, but you can find the complete file here.

Let’s start defining some useful variables.

clustering { ip = "127.0.0.1" ip = ${?CLUSTER_IP} port = 2552 port = ${?CLUSTER_PORT} seed-ip = "127.0.0.1" seed-ip = ${?CLUSTER_SEED_IP} seed-port = 2552 seed-port = ${?CLUSTER_SEED_PORT} cluster.name = "cluster-playground" }

Here we are simply defining the ip and port of the nodes and the seed, as well as the cluster name. We set a default value, then we override it if a new one is specified. The configuration of the cluster is the following.

akka { actor { provider = "cluster" ... /* router configuration */ ... } remote { log-remote-lifecycle-events = on netty.tcp { hostname = ${clustering.ip} port = ${clustering.port} } } cluster { seed-nodes = [ "akka.tcp://"${clustering.cluster.name}"@"${clustering.seed-ip}":"${clustering.seed-port} ] auto-down-unreachable-after = 10s } } ... /* server vars */ ... /* cluster vars */ }

Akka Cluster is build on top of Akka Remoting, so we need to configure it properly. First of all, we specify that we are going to use Akka Cluster saying that provider = "cluster". Then we bind cluster.ip and cluster.port to the hostname and port of the netty web framework.

The cluster requires some seed nodes as its entry points. We set them in the seed-nodes array, in the format akka.tcp://"{clustering.cluster.name}"@"{clustering.seed-ip}":”${clustering.seed-port}”. Right now we have one seed node, but we may add more later.

The auto-down-unreachable-after property sets a member as down after it is unreachable for a period of time. This should be used only during development, as explained in the official documentation.

Ok, the cluster is configured, we can move to the next step: Dockerization and deployment!

Dockerization and deployment

To create the Docker container of our node we can use sbt-native-packager. Its installation is easy: add addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.15") to the plugin.sbt file in the project/ folder. This amazing tool has a plugin for the creation of Docker containers. it allows us to configure the properties of our Dockerfile in the build.sbt file.

// other build.sbt properties enablePlugins(JavaAppPackaging) enablePlugins(DockerPlugin) enablePlugins(AshScriptPlugin) mainClass in Compile := Some("com.elleflorio.cluster.playground.Server") dockerBaseImage := "java:8-jre-alpine" version in Docker := "latest" dockerExposedPorts := Seq(8000) dockerRepository := Some("elleflorio")

Once we have setup the plugin, we can create the docker image running the command sbt docker:publishLocal. Run the command and taste the magic… ?

We have the Docker image of our node, now we need to deploy it and check that everything works fine. The easiest way is to create a docker-compose file that will spawn a seed and a couple of other nodes.

version: '3.5' networks: cluster-network: services: seed: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '2552:2552' - '8000:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: seed CLUSTER_SEED_IP: seed node1: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8001:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node1 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552 node2: networks: - cluster-network image: elleflorio/akka-cluster-playground ports: - '8002:8000' environment: SERVER_IP: 0.0.0.0 CLUSTER_IP: node2 CLUSTER_PORT: 1600 CLUSTER_SEED_IP: seed CLUSTER_SEED_PORT: 2552

I won’t spend time going through it, since it is quite simple.

Let’s run it!

Time to test our work! Once we run the docker-compose up command, we will have a cluster of three nodes up and running. The seed will respond to requests at port :8000, while node1 and node2 at port :8001 and :8002. Play a bit with the various endpoints. You will see that the requests for a Fibonacci number will be computed by a different node each time, following a round-robin policy. That’s good, we are proud of our work and can get out for a beer to celebrate! ?

Conclusion

We are done here! We learned a lot of things in these ten minutes:

  • What Akka Cluster is and what can do for us.
  • How to create a distributed application with it.
  • How to configure a Group Router for load-balancing in the cluster.
  • How to Dockerize everything and deploy it using docker-compose.

You can find the complete application in my GitHub repo. Feel free to contribute or play with it as you like! ?

See you! ?