Привет! Меня зовут Саша Шутай, я тимлид в AGIMA. В прошлой статье я рассказывал, что делать, если на проекте Bitrix сожительствует с Vue.js и поисковые боты не видят контента сайта. А в этой помогу разобраться, как на Bitrix-проекте произвести интеграцию с брокером очередей Apache Kafka, почему этот вариант кажется мне более удобным, чем привычная система очередей в RabbitMQ, и как это можно подвязать на автотесты, не отправляя тестовые сообщения в продюсера.
Когда это пригодится
Но сначала давайте определимся, когда вообще может возникнуть необходимость в такой интеграции. Представьте: вы поддерживаете интернет-магазин, который является частью большой IT-инфраструктуры. В этой инфраструктуре много компонентов: сайты, складской учёт, хранилища данных, системы лояльности, офлайн-продажи и другие информационные системы. Все они могут быть разработаны на разных языках программирования, в разных фреймворках, представлять собой совсем разные продукты, которые ничего не знают про внутреннюю реализацию друг друга. И вот тут вопрос: как заставить все системы работать слаженным механизмом, иметь единый контекст и общую точку синхронизации и обмена данными? Причем взаимодействие должно быть асинхронным, чтобы нивелировать скачки нагрузки и гарантировать доставку информации.
По всей видимости, необходима централизованная система, которая позволяет связать все компоненты. Нужно, чтобы она обеспечивала прием информации, хранение и её отдачу по запросу сразу нескольким потребителям. Сообщения не должны удаляться из хранилища после их обработки и должны быть доступны для прочтения в любой момент времени разными потребителями.
В этой схеме обмена сообщениями между разными системами мы видим три основные роли:
- Producer — публикатор сообщений, которые должны быть считаны другими системами.
- Broker — хранилище сообщений, которое отдает сообщения по запросу, ведет учет того, кто, когда и что опубликовал или прочитал.
- Consumer — потребитель, считывающий сообщения.
Обратите внимание на стрелки, ведущие от Consumer к Broker. Так на схеме показано, что потребитель является инициатором запроса и сам регулирует нагрузку, с которой он хочет обрабатывать сообщения и сам их считывает по мере необходимости. В терминологии очередей такой подход называется Pull-model.
Почему Apache Kafka
Опытные читатели могут прервать меня и сказать, что в этом случае можно использовать хорошо знакомый всем механизм для временного хранения и распределения информации — систему очередей. Например, подошла бы популярная в PHP-стеке RabbitMQ. Эта концепция позволит быстро отправить информацию от поставщика сразу нескольким потребителям, каждому в собственную очередь.
Но практика показывает, что чем больше консьюмеров задействовано, чем больше очередей создано и чем больше информации передается, тем чаще не хватает производительности RabbitMQ, который, конечно, поддерживает горизонтальное масштабирование, но оно не очень адаптивное. При подключении нового потребителя создавать новую очередь и отправлять в неё старые данные для «истории» становится накладным. Поэтому для решения задач такого масштаба, по моему мнению, лучше использовать брокер Apache Kafka.
Вот преимущества, которые мы получим, используя эту стриминговую платформу:
- сообщения после прочтения не удаляются в течение заданного времени; это позволяет при подключении новых консьюмеров дать им возможность прочитать уже прочитанные другими системами данные;
- нет необходимости заводить поток сообщений под каждого консьюмера;
- гарантия, что сообщения будут прочитаны в порядке их попадания в хранилище;
- можно изменить позицию, с которой считываются сообщения; это позволяет при необходимости прочитать сообщения заново;
- адаптация к горизонтальному масштабированию «из коробки»;
- быстрая архитектура отдачи сообщений за счёт метода хранения информации, когда доступно последовательное считывание с физического диска;
- сбор и агрегации событий из нескольких источников.
Планирование архитектуры взаимодействия
Вернемся к нашему примеру. Вы поддерживаете интернет-магазин на PHP и фреймворке Bitrix. Вы согласились со мной и решили, что для обогащения данных в БД и отправки целевой информации в другие системы необходимо интегрироваться с Apache Kafka. В этом случае сайт будет в роли как поставщика (Producer), так и потребителя (Consumer). Давайте рассмотрим их подробнее.
Роль Consumer
Когда Kafka используется как поставщик данных, для взаимодействия с ней используется Pull-model. Поэтому сайт самостоятельно должен обращаться к Broker для получения сообщений. При чтении сначала посылается запрос receive и получается тело с информаций для обработки. А когда сайт успешно обработал информацию, отправляется запрос ack. Он сообщает, что все работы произведены и можно передвинуть Offset на следующее сообщение.
Здесь можно сразу дать определение топика. Это сущность, агрегирующая в себе сообщения одной категории. Поставщики данных публикуют сообщения в топик, а потребители подписываются на топик и считывают эти сообщения из него.
Важный момент: после запроса с типом receive метка текущего сообщения в брокере не будет сдвинута, пока не будет запроса ack. И все последующие запросы receive будут возвращать одно и то же сообщение. Учитывайте это, если планируете считывать информацию из одного топика в несколько потоков. В зависимости от настроек Kafka, вам может быть доступен ключ autoAck, когда после запроса receive система автоматически перемещает метку на следующее.
Запускаем фоновый процесс, который будет с некоторым интервалом обращаться в Kafka, получать сообщения и отправлять их на выполнение. Для управления пуллом процессов мы используем Supervisord. Поэтому и в этой задаче, чтобы процесс был стабильным, запускаем его через демона. Также в нашей реализации управления процессами можно задавать для каждого инстанса delay/частоту опроса.
Роль Producer
Здесь всё просто. Нужно отправлять сообщения send. Инициировать отправку сообщения можно на событии создания/изменения необходимой сущности Bitrix. Если важно гарантированно отправить сообщение с учётом, что сервис Kafka может быть недоступен по техническим причинам, то можно использовать такие подходы:
- С двумя очередями RabbitMQ. Если планируется, что поток сообщений очень большой и нам требуется самими регулировать/ограничивать его пропускную способность (с помощью очереди и числа воркеров, с ней работающих).
- С синхронным запросом и одной очередью RabbitMQ. Первый раз сообщение отправляется в момент его появления в системе. Если оно не было доставлено, то помещается в очередь и воркер будет его пытаться отправить до успешного раза.
Расскажу, как это работает в первом варианте. Создается две очереди. Каждую очередь обслуживает собственный фоновый процесс Worker. В первой очереди, «быстрой», Worker ожидает сообщение и после этого засыпает на более короткое время, чем во второй, «медленной» очереди.
Worker «быстрой» очереди пытается отправить сообщение в Kafka. Если отправка не удалась, то сообщение помещается в «медленную» очередь. Worker «медленной» очереди также пытается отправить сообщение в Kafka. В случае неудачи сообщение снова помещается в конец «медленной» очереди. Тем самым, в случае технической ошибки на стороне брокера, препятствующей получению данных, сообщения не будут потеряны и не будут с большой частотой отправляться в нерабочий сервис.
Построим REST клиента
Поскольку ваш стек — PHP, скорее всего, разработчики Kafka предоставят вам REST API, с помощью которого производится взаимодействия всех систем в архитектуре. Поэтому разработаем каркас нашего клиента для обмена данными. Для начала спроектируем интерфейс. Его наличие позволит в будущем реализовать несколько клиентов: основной, для тестирования и т. д. Он должен требовать реализацию основных методов работы с сообщениями: receive, ack, send. Потом добавим системный класс Topic, возвращающий наименования топиков для чтения и записи согласно необходимой внутренней логике (если такая присутствует в архитектуре).
Добавим класс RestClient и реализуем в нем все методы взаимодействия с внешней системой, а также дополнительные вспомогательные методы, касающиеся схемы конкретного протокола взаимодействия. В нашем случае это initClient, возвращающий объект битриксового Bitrix\Main\Web\HttpClient с предустановленными авторизационными кредами и заголовками.
Получить данные из внешней системы — это, конечно, хорошо, но хочется иметь строгий механизм для работы с полученными данными в любом месте нашей системы. Приведем ответ Kafka на наш запрос к классу Result, хранящему структурированную информаций: код ответа (успешен или нет), заголовок и тело сообщений.
Разработав методы обмена сообщениями, добавим в систему Actor для потребления и обработки сообщений. Реализуем чтение в абстрактном классе Consumer. Сам процесс предполагает 3 этапа:
- Receive (запрос на получение сообщения);
- Processe (валидация и обработка);
- Ack (запрос на перемещение указателя во внешней системе).
Для поллинга в методе poll() запускаем бесконечный цикл, который производит запрос на чтение, вызывает методы валидации и обработки и перемещает указатель, если сообщение было успешно обработано. После каждой итерации вызываем «засыпание» процесса. С целью оптимизации можно добавить такую механику: если receive вернул сообщение, то следующую итерацию запускать через N времени.
Если же сообщение не вернулось (очередь пуста), то «засыпать» на 10*N. Как уже говорилось выше, за работоспособность процесса следит Supervisord.
Мы хотим, чтобы конкретные реализации Consumer описывали, как именно следуют парсить тело сообщения parseMessage() и обрабатывать полученные данные processMessage(). И имели возможность отфильтровывать заведомо бесполезные для вашей системы сообщения (если такое предполагается в архитектуре) isAllowedMessage().
Финальный класс, который потребуется для базовой реализации взаимодействия, это поставщик Producer. Процедура отправки одинакова для всех поставщиков, поэтому Send реализуется в абстрактном классе. А его наследники должны реализовывать createMessage(), собирающий сообщение в нужном для потребителя формате.