Akka Stream состоит из трех элементов источника (Source), поточных элементов (Flow) и приемников (Sink). Соответственно в каждом из этих элементов может использоваться актер Akka. Документация, к сожалению, не пестрит большим количеством примеров как реализовать элементы потока из актеров.
Источник Source
Начнем с источника данных потока:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
object TestSourceActor{ def props(query:SourceQueueWithComplete[Long])= Props(new TestSourceActor(query)) } class TestSourceActor(val source:SourceQueueWithComplete[Long]) extends Actor{ var currentValue=100 implicit val dispatcher = context.dispatcher override def receive: Receive = { case QueueOfferResult.Enqueued =>{ println("Enqueued, current Value= "+currentValue) currentValue -= 1; source.offer(currentValue).map(self ! _) if(currentValue<=0) { source.complete() self!PoisonPill } } case QueueOfferResult.Dropped => println("element dropped") case QueueOfferResult.Failure => { println("Query Fail") self!PoisonPill } case QueueOfferResult.QueueClosed => { println("Query Close") self!PoisonPill } case a => println("message:"+a) } } object TestStreamSource extends App { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val queue:SourceQueueWithComplete[Long] = Source.queue[Long](bufferSize = 2, OverflowStrategy.backpressure).to(Sink.foreach(println)).run() val ref=system.actorOf(Props(classOf[TestSourceActor],queue),"test_source") //val ref=system.actorOf(TestSourceActor.props(queue),"test_source") //queue.runWith(Sink.foreach(println)) println("START PROGRAM!") ref!QueueOfferResult.Enqueued } |
В данном примере реализован актер который использует Source.queue для формирования экземпляра SourceQueueWithComplete, который и будет являться источником данных. Фактически актер отправляет данные в очередь и в ответ получает Future, который будет содержать результат выполнения операции и этот результат может иметь несколько заранее определенных значений. Фактически запуск формирования данных потока происходит с отправки первого сообщения QueueOfferResult.Enqueued и длится до тех пор пока счетчик не добежит до значения 0, после чего выполняется метод complete().
Другой вариант это применение функции Source.actorRef которая возвращает ссылку на актера, отправка сообщения этому актеру будет приводить к появлению в потоке нового сообщения. Поток завершается путем отправки актеру сообщения akka.actor.PoisonPill или akka.actor.Status.Success. В случае необходимости сигнализировать об ошибке отправляется сообщение akka.actor.Status.Failure. Главным ограничением этого способа является то, что на текущий момент стратегия OverflowStrategy.backpressure не поддерживается. Фактически единственная стратегия которая есть это отбрасывание элементов если более поздние элементы не успевают их обработать.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package ua.pp.manenok.example import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props} import akka.stream.{ActorMaterializer, OverflowStrategy} import akka.stream.scaladsl.{Sink, Source} object Next object TestSourceActor2{ def props(actor:ActorRef)= Props(new TestSourceActor2(actor)) } class TestSourceActor2(val source:ActorRef) extends Actor{ var currentValue=100 implicit val dispatcher = context.dispatcher override def receive: Receive = { case Next =>{ println("Enqueued, current Value= "+currentValue) currentValue -= 1; source!currentValue if(currentValue<=0) { source!akka.actor.Status.Success } } case a => println("message:"+a) } } object TestStreamSource2 extends App { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val queue:ActorRef = Source.actorRef(bufferSize = 2, OverflowStrategy.backpressure).to(Sink.foreach(println)).run() val ref=system.actorOf(Props(classOf[TestSourceActor2],queue),"test_source") //val ref=system.actorOf(TestSourceActor2.props(queue),"test_source") println("START PROGRAM!") ref!Next } |
Актер будет остановлен в случае если поток завершиться, возникнет ошибка или будет отменен из других элементов потока.
Приемник Sink
Управляемый приемник сообщение создается через вызов Sink.actorRefWithAck при этом приемник создается из ссылки на актера, который и будет приемником данных, при этом также указываются служебные сообщения которые отвечают за инициализацию приемника, за подтверждение обработки (это сообщение отправляется в ответ на сообщение об инициализации и в ответ на обработку данных), кроме того есть еще два вида сообщений это сообщение о завершении потока и об ошибке.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
package ua.pp.manenok.example import akka.actor.{Actor, ActorSystem, Props} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} case object Init case object Complete case class Fail(th:Throwable) case object Ack class TestStreamSink extends Actor{ override def receive: Receive = { case _:Init.type => { println("init"); sender ! Ack } case Fail => { println("Fail") } case Complete => println("Complete ") case data=>{ println("Data:"+data) sender ! Ack } } } object TestStreamSink extends App { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val ref=system.actorOf(Props[TestStreamSink],"test") val sink=Sink.actorRefWithAck(ref,Init,Ack,Complete,Fail) Source(1 to 100).map(a=>{ println("data="+a) a }).to(sink).run() } |
Если актер завершиться досрочно то весь поток будет отменен.
Возможен вариант без поддержки сигнала о готовности к обработке это метод Sink.actorRef для которого указывается ссылка на актера и сообщение которое получит актер при завершении потока. В этом случае реализация актера значительно проще, но имеется значительный недостаток, связанный с тем, что если поток данных превысит возможности обработки актера, то это может привести к переполнению буфера.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
package ua.pp.manenok.example import akka.actor.{Actor, ActorSystem, Props} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} case object CompleteSource class TestStreamSink2 extends Actor{ override def receive: Receive = { case CompleteSource => println("Complete ") case data=>{ println("Data:"+data) } } } object TestStreamSink2 extends App { implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() val ref=system.actorOf(Props[TestStreamSink2],"test") val sink=Sink.actorRef(ref,CompleteSource) Source(1 to 100).map(a=>{ println("data="+a) a }).to(sink).run() } |
Промежуточные элементы Flow
Хороший способ делегировать работу актеру, это применения паттерна ask.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
package ua.pp.manenok.example import akka.actor.{Actor, ActorSystem, Props} import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} import akka.pattern.ask import akka.util.Timeout import scala.concurrent.duration._ class TestStreamFlow extends Actor{ override def receive: Receive = { case data:Int=>{ println("data="+data) val res=data * 2 sender() ! res } case other=>{ println("other:"+other) sender() ! other } } } object TestStreamFlow extends App{ implicit val system = ActorSystem("QuickStart") implicit val materializer = ActorMaterializer() implicit val timeout: Timeout = Timeout(5 seconds) val ref=system.actorOf(Props[TestStreamFlow],"test") Source(1 to 100).mapAsync(5)(value => (ref ? value)).to(Sink.foreach(println)).run() } |
Управление поток осуществляется через Future, которые являются результатом операции ask. Кроме того параметр parallelism в методе mapAsync, который организует асинхронное управление определяет количество сообщений отправляемых в очередь актера. При этом стоит отметить, что порядок следования сообщений не меняется. Эффект от значения этого параметра в том, что предыдущие операции не будет ждать завершения этой операции, а смогут разместить значение в почтовом ящике актера. Актер обязан ответить отправителю sender каждого сообщения и через этот ответ отправляется результат работы для следующих участников и вместе с тем, это сигнал о том, что обработка этого элемента завершена.