Kafka положить сообщение в топик

Обновлено: 02.07.2024

Компоненты системы обмена данными в Kafka

  • топик;
  • потребитель (consumer);
  • издатель (producer).

Каждый из этих компонентов мы подробнее рассмотрим далее.

Топик

Издатель

Потребитель

Освоить Apache Kafka на профессиональном уровне в качестве администратора Big Data кластеров, разработчика распределенных приложений и прочих прикладных областях Data Science вы сможете на практических курсах по Kafka в нашем лицензированном учебном центре обучения и повышения квалификации ИТ-специалистов в Москве:

Установка Кафки настолько простая, что в этот раз я отойду от своего обычного правила и в самом деле объясню, как его устанавливать. Итак, четыре шага:

Честно, это действительно всё. Но прежде, чем мы пойдём дальше, стоит осмотреться:

В комплекте с Кафкой идёт не так много папок, и нам понадобятся всего две их них: bin , где хранятся шелл-скрипты и config , где скрываются файлы конфигурации сервисов.

Как запустить Kafka

Как я упоминал в прошлый раз, даже одинокий кафкианский брокер — это кластер, так что его запуск немного отличается от запуска того же RabbitMQ. В отличие от кролика, Кафке нужен вспомогательный сервис для того, чтобы координировать работу зоопарка брокеров в кластере, и имя тому сервису — ZooKeeper. Когда создаётся новый топик, или добавляется новый брокер, или удаляется старый, ZooKeeper — это тот, кто будет со всем этим разбираться. Он решит, куда положить новый топик, чем загрузить нового брокера, и даже как сбалансировать набор реплик, если часть из них ушла вместе с павшим сервисом. Он надсмотрщик и координатор, и его запускают первым.

Как запустить Apache ZooKeeper

Инсталлер Кафки идёт в комплекте с ZooKeeper, так что поиски закончились, не успев начаться. Скрипт для запуска лежит в bin папке, а конфигурация — в config , и, собственно, больше ничего не нужно:


Хотелось бы сделать себе заметку (что-то типа cheat-sheet) с командами для работе с Kafka. Сейчас я приведу готовые примеры использования команд на готовых примера можно будет увидеть работу самой кафки.

Установка Kafka скриптов в Unix/Linux

Чтобы использовать скрипты для работы с Кафка, выполним небольшую установку:

Для простоты использования, можно добавить PATH (в ~/.bashrc):

После этого, стоит обновить файл:

Перейдем к работе!

Работа с Kafka в Unix/Linux

Вывести список топиков:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Так же, можно использовать:

  • ZooKeeperList.txt — это список хостов самого зукипера (zookeeper:port).

Получить информацию о топике:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Вот небольшой скрипт для создания топиков:

Записать в топик меседжы:

Прочитать с топика меседжы:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Так, написал небольшой скрипт для удаление:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Посмотреть конфиг для топика:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Установить время хранения (не рекомендуется) записей в топике:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Установить время хранения (современный способ) записей в топике:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Примечание: Время хранения по умолчанию составляет 24 часа (86400000 миллисекунд).

Можно вернуть все как и было:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Чтобы просмотреть offset позиции для consumer группы (для каждой из партиций):

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Чтобы начать заново (сбросьте смещение на 0):

  • your_topic_here — Топик.
  • group_ID_here — ИД группы.
  • bootstrap_host — Хост.
  • bootstrap_port — Порт.

Получить самое раннее смещение в топике:

Получить последнее смещение еще в топике:

Получить потребительские смещения (consumer offsets) для топика:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Считать из __consumer_offsets:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Вывести список consumer групп:

Просмотр сведений о consumer группе:

  • zookeeper_host — Хост зукипера, например, — это может быть localhost, 192.168.13.113 и так далее.
  • zookeeper_host_port — Порт зукипера, например по умолчанию — это 2181-й порт.

Чтобы записать в топик:

Запускаем Zookeeper shell:

Провести перформенс тесты:

Добавить комментарий Отменить ответ

Этот сайт использует Akismet для борьбы со спамом. Узнайте, как обрабатываются ваши данные комментариев.

Привет! Меня зовут Дмитрий Шеламов и я работаю в Vivid.Money на должности backend-разработчика в отделе Customer Care. Наша компания – европейский стартап, который создает и развивает сервис интернет-банкинга для стран Европы. Это амбициозная задача, а значит и ее техническая реализация требует продуманной инфраструктуры, способной выдерживать высокие нагрузки и масштабироваться согласно требованиям бизнеса.

В основе проекта лежит микросервисная архитектура, которая включает в себя десятки сервисов на разных языках. В их числе Scala, Java, Kotlin, Python и Go. На последнем я пишу код, поэтому практические примеры, приведенные в этой серии статей, будут задействовать по большей части Go (и немного docker-compose).

Работа с микросервисами имеет свои особенности, одна из которых – организация коммуникаций между сервисами. Модель взаимодействия в этих коммуникациях бывает синхронной или асинхронной и может оказать существенное влияние на производительность и отказоустойчивость системы в целом.

Асинхронное взаимодействие

Итак, представим что у нас есть два микросервиса (А и Б). Будем считать, что коммуникация между ними осуществляется через API и они ничего не знают о внутренней реализации друг друга, как и предписывает микросервисный подход. Формат передаваемых между ними данных заранее оговорен.

image

Задача перед нами стоит следующая: нам нужно организовать передачу данных от одного приложения к другому и, желательно, с минимальными задержками.
В самом простом случае поставленная задача достигается синхронным взаимодействием, когда А отправляет приложению Б запрос, после чего сервис Б его обрабатывает и, в зависимости от того, успешно или не успешно был обработан запрос, отправляет некоторый ответ сервису А, который этот ответ ожидает.
Если же ответ на запрос так и не был получен (например, Б рвет соединение до отправки ответа или А отваливается по таймауту), сервис А может повторить свой запрос к Б.

С одной стороны, такая модель взаимодействия дает определенность статуса доставки данных для каждого запроса, когда отправитель точно знает, были ли получены данные получателем и какие дальнейшие действия ему необходимо делать в зависимости от ответа.
С другой стороны, плата за это – ожидание. После отправки запроса сервис А (или поток, в котором выполняется запрос) блокируется до того момента, пока не получит ответ или не сочтет запрос неудавшимся согласно своей внутренней логике, после чего примет дальнейшие действия.

Все, что остается А при такой модели взаимодействия – это просто ждать. Может быть наносекунду, а может быть час. И эта цифра вполне реальна в том случае, если Б в процессе обработки данных выполняет какие-либо тяжеловесные операции, вроде обработки видео.

Возможно, вам проблема не показалась существенной – одна железка ждет пока другая ответит, велика ли потеря?
Чтобы сделать эту проблему более личной, представим, что сервис А – это приложение, запущенное на вашем телефоне, и пока оно ожидает ответ от Б, вы видите на экране анимацию загрузки. Вы не можете продолжить пользоваться приложением до тех пор, пока сервис Б не ответит, и вынуждены ждать. Неизвестное количество времени. При том, что ваше время гораздо ценнее, чем время работы куска кода.

Подобные шероховатости решаются следующим образом – вы разделяете участников взаимодействия на два “лагеря”: одни не могут работать быстрее, как бы вы их ни оптимизировали (обработка видео), а другие не могут ждать дольше определенного времени (интерфейс приложения на вашем телефоне).
Затем вы заменяете cинхронное взаимодействие между ними (когда одна часть вынуждена ждать другую, чтобы удостовериться, что данные были доставлены и обработаны сервисом-получателем) на асинхронное, то есть модель работы по принципу отправил и забыл – в этом случае сервис А продолжит свою работу, не дожидаясь ответа от Б.

В качестве одного из решений данной проблемы мы можем добавить между сервисами А и Б прослойку, которая будет выступать временным хранилищем и гарантом доставки данных в удобном для отправителя и получателя темпе. Таким образом мы сможем расцепить сервисы, синхронное взаимодействие которых потенциально может быть проблемным:

  • Данные, которые теряются при аварийном завершении сервиса-получателя теперь могут быть снова получены из промежуточного хранилища, в то время как сервис-отправитель продолжает выполнять свою работу. Таким образом мы получаем механизм гарантии доставки;
  • Эта прослойка также защищает получателей от скачков нагрузки, ведь получателю выдаются данные по мере их обработки, а не по мере их поступления;
  • Запросы на выполнение тяжеловесных операций (таких как рендеринг видео) теперь могут быть переданы через эту прослойку, обеспечивая меньшую связность между быстрыми и медленными частями приложения.

Под вышеобозначенные требования вполне подходит и обычная СУБД. Данные в ней можно хранить в течении продолжительного времени, не беспокоясь о потере информации. Также исключена и перегрузка получателей, ведь они вольны сами выбрать темп и объемы чтения предназначенных для них записей. Подтверждение же обработки можно реализовать, помечая прочитанные записи в соответствующих таблицах.

Однако выбор СУБД в качестве инструмента для обмена данными может привести к проблемам с производительностью с ростом нагрузки. Причина в том, что большинство баз данных не предназначены для такого сценария использования. Также во многих СУБД отсутствует возможность разделения подключенных клиентов на получателей и отправителей (Pub/Sub) – в этом случае, логика доставки данных должна быть реализована на клиентской стороне.
Вероятно, нам нужно нечто более узкоспециализированное, чем база данных.

image

Стоит отметить, что существует еще одна гарантия доставки, которая называется “exactly once”. Ее трудно достичь в распределенных системах, но при этом она же является наиболее желаемой.
В этом плане, Apache Kafka, о которой мы будем говорить далее, выгодно выделяется на фоне многих доступных на рынке решений. Начиная с версии 0.11, Kafka предоставляет гарантию доставки exactly once в пределах кластера и транзакций, в то время как AMQP-брокеры таких гарантий предоставить не могут.
Транзакции в Кафке – тема для отдельной публикации, сегодня же мы начнем со знакомства с Apache Kafka.

Apache Kafka

Мне кажется, что будет полезно для понимания начать рассказ о Кафке со схематичного изображения устройства кластера.

image

Отдельный сервер Кафки именуется брокером. Брокеры образуют собой кластер, в котором один из этих брокеров выступает контроллером, берущим на себя некоторые административные операции (помечен красным).

Commit log

Структура данных, лежащая в основе Kafka, называется commit log или журнал фиксации изменений.

image

Новые элементы, добавляемые в commit log, помещаются строго в конец, и их порядок после этого не меняется, благодаря чему в каждом отдельном журнале элементы всегда расположены в порядке их добавления.

Партиции и топики

image

Pull и Push

image

Какие преимущества имеет данный подход?

Недостатки

image

Однако в Кафке масштабирование происходит несколько сложнее, чем в AMQP брокерах.
Например, если вы добавите еще один экземпляр читателя и натравите его на ту же партицию, вы получите нулевой КПД, так как в этом случае оба экземпляра будут читать один и тот же набор данных.
Поэтому базовое правило масштабирования Кафки — количество конкурентных читателей (то бишь группа сервисов, реализующих одинаковую логику обработки (реплик)) топика не должно превышать количество партиций в этом топике, иначе какая-то пара читателей будут обрабатывать одинаковый набор данных.

Consumer Group

Чтобы избежать ситуации с чтением одной партиции конкурентными читателями, в Кафке принято объединять несколько реплик одного сервиса в consumer Group, в рамках которого Zookeeper будет назначать одной партиции не более одного читателя.

image

В то же время ничего не мешает вам повесить на одну партицию несколько читателей с разной логикой обработки. Например вы храните в топике список событий по действиям пользователей и хотите использовать эти события для формирования нескольких представлений одних и тех же данных (например для бизнес-аналитиков, продуктовых-аналитиков, системных-аналитиков и пакета Яровой) и последующей отправкой их в соответствующие хранилища.

image

Retention Policy

Compaction Policy

Читайте также: