Akka актеры как элементы Akka Stream

Akka Stream состоит из трех элементов источника (Source), поточных элементов (Flow) и приемников (Sink). Соответственно в каждом из этих элементов может использоваться актер Akka. Документация, к сожалению, не пестрит большим количеством примеров как реализовать элементы потока из актеров.

Источник Source

Начнем с источника данных потока:

В данном примере реализован актер который использует Source.queue для формирования экземпляра SourceQueueWithComplete, который и будет являться источником данных. Фактически актер отправляет данные в очередь и в ответ получает Future, который будет содержать результат выполнения операции и этот результат может иметь несколько заранее определенных значений. Фактически запуск формирования данных потока происходит с отправки первого сообщения QueueOfferResult.Enqueued и длится до тех пор пока счетчик не добежит до значения 0, после чего выполняется метод complete().

Другой вариант это применение функции Source.actorRef которая возвращает ссылку на актера, отправка сообщения этому актеру будет приводить к появлению в потоке нового сообщения. Поток завершается путем отправки актеру сообщения akka.actor.PoisonPill или akka.actor.Status.Success. В случае необходимости сигнализировать об ошибке отправляется сообщение akka.actor.Status.Failure. Главным ограничением этого способа является то, что на текущий момент стратегия OverflowStrategy.backpressure не поддерживается. Фактически единственная стратегия которая есть это отбрасывание элементов если более поздние элементы не успевают их обработать.

Актер будет остановлен в случае если поток завершиться, возникнет ошибка или будет отменен из других элементов потока.

Приемник Sink

Управляемый приемник сообщение создается через вызов Sink.actorRefWithAck при этом приемник создается из ссылки на актера, который и будет приемником данных, при этом также указываются служебные сообщения которые отвечают за инициализацию приемника, за подтверждение  обработки (это сообщение отправляется в ответ на сообщение об инициализации и в ответ на обработку данных), кроме того есть еще два вида сообщений это сообщение о завершении потока и об ошибке.

Если актер завершиться досрочно то весь поток будет отменен.

Возможен вариант без поддержки сигнала о готовности к обработке это метод Sink.actorRef для которого указывается ссылка на актера и сообщение которое получит актер при завершении потока. В этом случае реализация актера значительно проще, но имеется значительный недостаток, связанный с тем, что если поток данных превысит возможности обработки актера, то это может привести к переполнению буфера.

Промежуточные элементы Flow

Хороший способ делегировать работу актеру, это применения паттерна ask.

Управление поток осуществляется через Future, которые являются результатом операции ask. Кроме того параметр parallelism в методе mapAsync, который организует асинхронное управление определяет количество сообщений отправляемых в очередь актера. При этом стоит отметить, что порядок следования сообщений не меняется. Эффект от значения этого параметра в том, что предыдущие операции не будет ждать завершения этой операции, а смогут разместить значение в почтовом ящике актера. Актер обязан ответить отправителю sender каждого сообщения и через этот ответ отправляется результат работы для следующих участников и вместе с тем, это сигнал о том, что обработка этого элемента завершена.

Обсуждение закрыто.