Данная статья посещена вопросам работы потока Akka Stream в целом и вопросу создание своих элементов для графов в частности. Изначально планировалось разобраться только с вопросом создания своего элемента графа с нужными свойствами, но параллельно получились отсылки к тому как поток работает.
Сначала два слова о том из чего состоит Akka Stream, в первую очередь это дерево элементов, которое описывает план обработки данных, причем стоит особенно подчеркнуть, что это только план обработки, сама обработка как и создание всего, что для этого потребуется будет происходить на этапе матереализации (иными словами на момент запуска), при этом этот план можно использовать несколько раз.
Сам поток состоит из трех элементов: Источника Source, Приемника Seek и промежуточных блоков Flow. При этом в самом простом случае у источника есть только один выход из которого поступают данные, у приемника есть только один вход в который поступаю данные, а между источником и приемником расположены блоки потока у которых один вход и один выход. При этом все эти элементы образуют замкнутый граф у которого нет висячих не подключенных входов и выходов у элементов.
Стоит отметить, что элементы не отправляются источником в какие то моменты времени, а будут отправлены только после запроса. Таким образом получается такая картина, что со стороны приемника отправляются запросы на получение элементов (причем тут стоит отметить, что такой запрос отправляется сразу же после создания приемника), далее он поступает на блок Flow, который или должен сам создать фрагмент данных или обратится с запросом на получение данных к предыдущему блоку и так до тех пор пока такой запрос не дойдет до источника который и будет в ответ на запрос отправлять фрагмент потока данных для обработки, после чего этот блок данных отправляется вверх по элементам пока не доберется до приемника, на котором его обработка заканчивается. При этом интенсивность запросов будет регулировать и количество проходящих по потоку данных, если какой то элемент графа будет затягивать обработку, то он просто будет тянуть и отправку следующего фрагмента данных, таким образом притормаживая поток. В целом в таком варианте скорость обработки всего графа будет определяется его самым медленным звеном.
Библиотека полна достаточно большим количеством готовых строительных блоков для графов, многие элементы можно получить при помощи готовых функций, аналогичных функциями обработки коллекций вроде filter или map. Но иногда необходимо создать, что-то свое, с требуемым не стандартным поведением.
Что бы создать свой элемент графа, причем, что очень важно с любым количеством входов или выходов, для этого необходимо наследоваться от абстрактного класса GraphStage
, который согласно документации является аналогом вызова GraphDSL.create() позволяющего создать новый элемент графа из уже существующего. GraphStage позволяет создать элемент графа, который не делится на более мелкие части, и позволяет поддерживать внутренне состояние безопасным образом.
Стандартная документация дает нам такой пример созданного источника, который генерирует последовательность чисел, до тех пор пока не будет остановлен.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
import akka.stream.SourceShape import akka.stream.Graph import akka.stream.stage.GraphStage import akka.stream.stage.OutHandler class NumbersSource extends GraphStage[SourceShape[Int]] { val out: Outlet[Int] = Outlet("NumbersSource") override val shape: SourceShape[Int] = SourceShape(out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { // All state MUST be inside the GraphStageLogic, // never inside the enclosing GraphStage. // This state is safe to access and modify from all the // callbacks that are provided by GraphStageLogic and the // registered handlers. private var counter = 1 setHandler(out, new OutHandler { override def onPull(): Unit = { push(out, counter) counter += 1 } }) } } |
В данном классе происходит следующая “магия”, создается наследник класса GraphStage, типизированный типом описывающий форму источника SourceShape (есть еще форма приемника SinkShape и формы промежуточных блоков, например блок типа FlowShape с одним входом и одним выходом), которая в свою очередь типизирована типом элемента поток которых мы будем обрабатывать (вместо этого второго типа может быть и генерик, для того, что бы задать тип позже). При этом любой наследник данного класса обязан реализовать метод createLogic, который и отвечает за создание элемента обработки. Точнее сказать класс GraphStage при материализации вызовет метод createLogic, который должен будет вернуть экземпляр GraphStageLogic, который и будет содержать всю логику работы. При этом для каждой материализации будет создаваться своя копия GraphStageLogic, поэтому ОЧЕНЬ ВАЖНО, все изменяемые состояния, ДОЛЖНЫ НАХОДИТСЯ только внутри этого объекта. Поэтому внутри реализуемого нами метода создается экземпляр абстрактного класса GraphStageLogic, внутри которого и размещается вся логика.
Как было описано выше, что бы отправлять данные далее для обработки источник должен получать запросы на эти данные (в документации backpressured) со стороны других элементов потока (downstream), что бы реагировать на запросы нашему коду надо подписаться на события через вызов метода setHandler, для которого указывается прослушиваемый “порт” и обработчик, который является экземпляром OutHandler для исходящих портов и InHandler для входящих. При этом в нашем случае реализована реакция на запрос элемента в виде переопределенной реализации метода onPull, который говорит о том, что следующий в цепочке блок обработки готов к приему следующего элемента данных, в самом простом случае просто отправляется этот элемент. Другой доступный в классе OutHandler обработчик это onDownstreamFinish, который отменяет дальнейшую работу потока и в стандартной реализации просто завершает работу блока. Однако, что бы превратить экземпляр класса наследника GraphStage в полноценный источник Source надо вызвать метод Source.fromGraph.
1 2 3 4 5 6 7 8 9 10 |
implicit val system = ActorSystem("Test") implicit val materializer = ActorMaterializer() // A GraphStage это полноценный граф Graph, такой же как и при создании через GraphDSL.create val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource // Создание Source из графа val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph) // Returns 55 val result1: Future[Int] = mySource.take(10).runFold(0)(_ + _) |
Другой пример из документации это приемник Sink, который печатает все преступаемые элементы, правда данный пример слегка модифицирован под применение с любым типом данных, то есть класс создан типизируемым:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import akka.stream.SinkShape import akka.stream.stage.GraphStage import akka.stream.stage.InHandler class StdoutSink[T] extends GraphStage[SinkShape[T]] { val in: Inlet[T] = Inlet("StdoutSink") override val shape: SinkShape[T] = SinkShape(in) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { // This requests one element at the Sink startup. override def preStart(): Unit = pull(in) setHandler(in, new InHandler { override def onPush(): Unit = { println(grab(in)) pull(in) } }) } } |
Аналогично, свой приемник Sink регистрирует обработчик событий InHandler, для входящего порта Inlet. При этом создается свой обработчик события onPush, которое сигнализирует о поступлении нового фрагмента данных на обработку и этом элемент может быть получен через вызов метода grab и обработан. Стоит заметить, что для приемника очень важно как можно раньше отправить запрос на получение элементов через вызов pull(inlet), что и сделано в методе preStart().
Таким образом создание своих элементов графа сводится к написанию обработчиков событий от входящих и выходящих портов. При этом внутри GraphStageLogic доступны следующие операции для исходящих портов:
- push(out,elem) – отправляет фрагмент данных в порт. Вызов возможен ТОЛЬКО после получения запроса от следующих элементов графа.
- complete(out) – закрывает порт
- fail(out,exception) – закрывает порт и сигнализирует об ошибке.
Все события происходящие на исходящем порту доступны только после регистрации экземпляра обработчика типа OutHandler через вызов метода setHandler(out,handler), при этом доступны два события и соответственно методы для реакции на них:
- onPull() – вызывается тогда когда порт готов принят следующий элемент,то есть для данного порта разрешен вызов push(out, elem).
- onDownstreamFinish() – указывает, что обработка отменена и порт больше сообщений не принимает. Для такого порта больше НЕ будут наступать события onPull.
Кроме того доступны дополнительные методы:
- isAvailable(out) – возвращает true если в порт можно отправить данные, то есть был на них запрос;
- isClosed(out) – проверяет закрыт ли порт.
Диаграмма состояний:
А для входящих портов:
- pull(in) – отправляет запрос на получение порции данных в порт. Вызов возможен ТОЛЬКО после получения данных.
- grab(in) – получает элемент который был получен при событии onPush
- cancel(in) – закрывает порт.
Все события происходящие на исходящем порту доступны только после регистрации экземпляра обработчика типа InHandler через вызов метода setHandler(in,handler), при этом доступны два события и соответственно методы для реакции на них:
- onPush() – вызывается тогда в порт поступает новый фрагмент данных, то есть для данного порта разрешен вызов grab(in) и \ или pull(in), для получения пришедшего элемента или отправки запроса на получение нового элемента .
- onUpstreamFinish() – указывает, что обработка завершена и порт больше сообщений не принимает. Для такого порта больше НЕ будут наступать события onPush.
- onUpstreamFailure() – указывает, что обработка отменена с ошибкой и порт больше сообщений не принимает. Для такого порта больше НЕ будут наступать события onPush.
Кроме того доступны дополнительные методы:
- isAvailable(out) – возвращает true если в порт можно отправить данные, то есть был на них запрос;
- complete() – завершить поток;
- fail(e:Exception) – завершить с ошибкой;
- isClosed(out) – проверяет закрыт ли порт.
Исходя из выше описанного стоит заметить, что со стороны приемника приходит исключительно сигнал завершения (при этом не конкретизируется почему поток был завершен), но с со стороны источника есть отдельный сигнал указывающий на возникновение ошибки при обработки потока.
Таким образом диаграмма состояний для входящего порта:
Кроме того оба обработчика имеют методы для универсального закрытия completeStage() и failStage(exception), которые закрывают все входящие и исходящие порты успешно или с ошибкой соответственно.