После открытия сокета вызывается так называемая регистрация открытого Netty канала, чтобы сообщения этого соединения могли быть обработаны в рамках Netty. Сегодня мы посмотрим, как происходит управление сообщениями.
Все начинается с этого метода, который вызывается после принятия открытого соединения:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
|
Обработка сообщений в Netty производится цепочкой обработчиков, которая в терминологии Netty называется pipeline
. Поэтому в первую очередь создается новый pipeline с помощью фабрики, которую мы проинициализировали в момент создания сервера. Для обработки сообщений в конкретном соединении канала создается отдельный Worker поток. За обработку операции ввода/вывода отвечают Worker'ы
. В частности nio операции управляются NioWorker'ом
. Worker'ы
создаются внутри пула воркеров. Передавая в NioServerSocketChannelFactory
пул воркер-потоков, этот пул оборачивается в NioWorkerPool
, в конструкторе которого создаются экземпляры NioWorker'ов
. Количество воркеров ограничено количеством процессоров, доступных JVM из расчета - на каждый процессор два воркера.
1
|
|
Объекты воркеров хранятся в обычном массиве внутри пула. При регистрации канала, из пула воркеров последовательно достается воркер с помощью метода nextWorker
:
1 2 3 |
|
Индекс текущего воркера хранится в AtomicInteger
переменной. Целочисленное деление индекса на количество воркеров в пуле гарантирует получение воркера в пределах массива воркеров.
Далее создается новый канал NioAcceptedSocketChannel
и регистрируется внутри воркера. За принятие соединений отвечает один канал, за получение и отправку сообщений отвечает только что созданный канал. Регистрация канала начинается с создания объекта задачи регистрации RegisterTask
. Этот объект специфичен для конкретного воркера и объявляется внутри него. Далее запускается поток воркера:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
|
Стоит отметить, что запуск потока вызывается в блоке синхронизации с объектом блокировки, который принадлежит конкретному объекту worker’а. Эта блокировка гарантирует последовательную обработку сообщений с помощью одного и того же воркера, взятого из пула.
В основе работы потока все тот же бесконечный цикл и Selector
для обеспечения эффективных пауз и обработки сообщений. Внутри process*
методов выполняется обработка задач, которые висят в очереди, причем алгоритм работы этих методов идентичен. Различаются только очереди, из которых достаются задачи:
1 2 3 4 5 6 7 8 |
|
Зачем разведен Copy/Paste в методах обработки очередей, остается загадкой.
Задачи реализуют интерфейс Runnable
. Метод регистрации регистрирует канал в Selector'е
на события чтения/записи наподобие того, как регистрировался канал
принятия соединений на события принятия соединений.
Интересен метод обработки ключей, обнаруженных селектором (принцип работы селектора описывался в предыдущей статье):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
|
Сразу бросается в глаза интересный способ обхода итератора по ключам селектора. В ключах Nio используется работа с константами на основе битов. Так метод k.readyOps()
возвращает бит готовой операции. Сравнение констант производится с помощью побитовых операций. Сообщений в канале по большому счету может быть два - чтение и запись. О них мы поговорим в следующих статьях.
После обработки всех задач выполняется проверка на наличие ключей в селекторе на всякий случай, и если ключи все обработались, то происходит закрытие селектора и прерывание цикла. В случае исключения, как и в Boss потоке, вызывается секундный sleep
для предотвращения чрезмерной загрузки процессора в случае сбоев.