Создайте свою собственную систему легко

Redis 5.0 предлагает тип Stream. Буквально это потоковый тип, но на самом деле, с функциональной точки зрения, это должна быть идеальная реализация Redis для очередей сообщений (MQ, Message Queue).

Любой, кто использовал Redis в качестве очереди сообщений, знает, что существует множество реализаций очередей сообщений на основе Reids, например:

  • PUB/SUB, модель подписки/публикации.
  • Реализация LPUSH+BRPOP на основе списка.
  • Реализация на основе Sorted-Set.

Каждая реализация имеет типичные особенности и проблемы.

Тип Stream, выпущенный в Redis 5.0, также используется для реализации типичной очереди сообщений. Внешний вид типа Stream удовлетворяет практически всему содержимому очереди сообщений, включая, но не ограничиваясь:

  • Генерация сериализации идентификатора сообщения.
  • Обход сообщения.
  • Блокировка и неблокировка чтения сообщений.
  • Пакетное потребление сообщений.
  • Обработка ожидающих сообщений.
  • Мониторинг очереди сообщений.

В очереди сообщений есть производители и потребители. Давайте испытаем чудеса типа Stream.

Новости производства

Команда XADD используется для добавления сообщения к потоку (данные потока), как показано ниже:

127.0.0.1:6379> XADD memberMessage * user reggie msg Hello
"1553439850328-0"
127.0.0.1:6379> XADD memberMessage * user dwen msg World
"1553439858868-0"

Формат синтаксиса:

XADD key ID field string [field string ...]

Вам необходимо указать key, схему сообщения ID и содержимое сообщения, где содержание сообщения — key-valuedata.

  • ID, наиболее часто используемый *, указывает, что идентификатор сообщения генерируется Redis, что также является настоятельно рекомендуемой схемой.
  • field string [строка поля] — это содержимое текущего сообщения, состоящее из одного или нескольких значений ключа.

В приведенном выше примере пользователь сообщения reggiemsg Helloдобавлен к ключу memberMessage.

Redis генерирует идентификаторы сообщений, используя метки времени в миллисекундах и порядковые номера. В этот момент сообщение доступно в очереди сообщений.

Использование сообщений

XREAD, читал сообщения из Stream, продемонстрировал следующее:

127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
   2) 1) 1) "1553439850328-0"
         2) 1) "user"
            2) "reggie"
            3) "msg"
            4) "Hello"
      2) 1) "1553439858868-0"
         2) 1) "user"
            2) "dwen"
            3) "msg"
            4) "World"

Приведенная выше команда предназначена для чтения всех сообщений из очереди сообщений memberMessage.

XREADподдерживает множество параметров, формат синтаксиса:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  • COUNT countиспользуется для ограничения количества получаемых сообщений.
  • BLOCK milliseconds используется для установки XREADв режим блокировки, по умолчанию используется неблокирующий режим.
  • ID, который используется для установки идентификатора сообщения, с которого следует начать чтение. Используйте 0, чтобы начать с первого сообщения. В этом примере используется 0. Здесь следует отметить, что идентификатор очереди сообщений монотонно увеличивается, поэтому, установив начальную точку, его можно прочитать в обратном направлении. В режиме блокировки $ может использоваться для представления идентификатора последнего сообщения. $ не имеет значения в неблокирующем режиме.
  • XREAD делится на блокирующий и неблокирующий режимы при чтении сообщений. Опцию BLOCK можно использовать для указания режима блокировки, при этом необходимо установить продолжительность блокировки. В неблокирующем режиме он возвращается сразу после чтения (даже если сообщения нет), а в блокирующем режиме, если содержимое не может быть прочитано, он блокируется и ждет.

Типичное использование режима блокировки:

127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)

Мы используем режим блокировки с $ как ID, чтобы прочитать последнее сообщение, если сообщения нет, команда будет заблокирована. В процессе ожидания другие клиенты добавляют сообщения в очередь, и они будут немедленно прочитаны.

Таким образом, типичная очередь XADDзавершена с XREADблоком. XADDотвечает за создание сообщений, а XREADотвечает за потребление сообщений.

Описание идентификатора сообщения

1553439850328–0, сгенерированный XADD, представляет собой идентификатор сообщения, сгенерированный Redis, который состоит из двух частей: отметка времени-серийный номер.

Временная метка указывается в миллисекундах и представляет собой время сервера Redis, сгенерировавшего сообщение. Это 64-битное целое число (int64).

Порядковый номер — это порядковый номер сообщения в этот миллисекундный момент времени, а также 64-битное целое число. Серьезно говоря, порядковый номер может переполниться, но возможно ли это?

Приращение порядкового номера можно проверить с помощью многопакетной обработки:

127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"

Поскольку выполнение команды Redis происходит очень быстро, видно, что в пределах одной и той же временной метки сообщение представлено возрастающим порядковым номером.

Чтобы обеспечить упорядоченность сообщений, идентификаторы, сгенерированные Redis, монотонно увеличиваются по порядку. Поскольку идентификатор содержит часть метки времени, во избежание проблем, вызванных ошибками времени сервера (например, время сервера задерживается), для каждого типа Stream данных Redis поддерживается атрибут latest_generated_id, который используется для записи идентификатора последнего сообщение.

Если обнаруживается, что текущая отметка времени является обратной (меньше той, что записана latest_generated_id), в качестве идентификатора нового сообщения используется схема, в которой отметка времени не изменяется, а порядковый номер увеличивается (по этой же причине порядковый номер использует int64 для обеспечения достаточного количества порядковых номеров), тем самым обеспечивая монотонно возрастающий характер идентификатора.

Настоятельно рекомендуется использовать схему Redis для генерации идентификаторов сообщений, потому что эта монотонно увеличивающаяся схема идентификаторов временной метки + порядковый номер может удовлетворить почти все ваши потребности.

Но в то же время помните, что идентификаторы настраиваются, не забывайте!

Модель группы потребителей

Когда несколько потребителей используют очередь сообщений одновременно, они могут повторно использовать одно и то же сообщение, то есть в очереди сообщений есть десять сообщений, и все три потребителя могут использовать эти десять сообщений.

Но иногда нам нужно, чтобы несколько потребителей взаимодействовали, чтобы использовать одну и ту же очередь сообщений, то есть в очереди сообщений десять сообщений, и три потребителя потребляют некоторые из них соответственно.

Например, потребитель A потребляет сообщения 1,2,5,8, потребитель B потребляет сообщения 4,9,10, а потребитель C потребляет сообщения 3,6,7.

То есть три потребителя взаимодействуют для завершения потребления сообщений, и этот режим можно использовать, когда мощность потребления недостаточна, то есть эффективность программы обработки сообщений невысока.

Этот паттерн является паттерном потребительской группы. Как показано ниже:

Поддержка режима группы потребителей в основном реализуется двумя командами:

  • XGROUP, для управления группами потребителей, предоставления таких операций, как создание групп, уничтожение групп и обновление идентификаторов сообщений начала групп.
  • XREADGROUP, операция группового потребления сообщений.

Для демонстрации в демонстрации используются пять сообщений. Идея состоит в том, чтобы создать очередь сообщений Stream, а производитель генерирует пять сообщений.

Создайте группу потребителей в очереди сообщений, и три получателя в группе потребляют сообщения:

# Producer generates 5 messages
127.0.0.1:6379> MULTI
127.0.0.1:6379> XADD mq * msg 1
127.0.0.1:6379> XADD mq * msg 2
127.0.0.1:6379> XADD mq * msg 3
127.0.0.1:6379> XADD mq * msg 4
127.0.0.1:6379> XADD mq * msg 5
127.0.0.1:6379> EXEC
 1) "1553585533795-0"
 2) "1553585533795-1"
 3) "1553585533795-2"
 4) "1553585533795-3"
 5) "1553585533795-4"

# Create a consumer group mqGroup
127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 # Create a consumer group mgGroup for message queue mq
OK

# Consumer A, Consumption Article 1
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq > #Consumer A in the consumer group reads a message from the message queue mq
1) 1) "mq"
   2) 1) 1) "1553585533795-0"
         2) 1) "msg"
            2) "1"
# Consumer A, Consumption Article 2
127.0.0.1:6379> XREADGROUP group mqGroup consumerA COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-1"
         2) 1) "msg"
            2) "2"
# Consumer B, Consumption Article 3
127.0.0.1:6379> XREADGROUP group mqGroup consumerB COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-2"
         2) 1) "msg"
            2) "3"
# Consumer A, Consumption Article 4
127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-3"
         2) 1) "msg"
            2) "4"
# Consumer C, Consumption Article 5
127.0.0.1:6379> XREADGROUP group mqGroup consumerC COUNT 1 STREAMS mq > 
1) 1) "mq"
   2) 1) 1) "1553585533795-4"
         2) 1) "msg"
            2) "5"

В приведенном выше примере, когда три потребителя A, B и C находятся в одной группе сообщений mqGroupusing (потребитель может указать это во время потребления, нет необходимости создавать его заранее), у них действует принцип взаимного исключения. План потребления: А->1, А->2, В->3, А->4, С->5.

XGROUP create mq mqGroup 0 используется для создания группы потребления mqGroupв очереди сообщений mq. Последний параметр — 0, что означает, что группа начинает потребление с первого сообщения. Значение соответствует 0из XREAD).

Помимо поддержки CREATE, он также поддерживает SETIDустановку начального идентификатора, DESTROYуничтожение группы, DELCONSUMER удаление потребителей в группе и другие операции.

XREADGROUP group mqGroup consumerA count 1 streams mq >, используемого для потребления ConsumerA в группе mqGroup в очереди mq, параметр > указывает на начальное сообщение, которое не было использовано в группе, а параметр count 1 указывает на получение одного сообщения.

Синтаксис в основном такой же, как XREAD, но добавлена ​​концепция групп.

Основной принцип внутригруппового потребления заключается в том, что тип STREAM будет записывать идентификатор последнего обработанного (доставленного) сообщения (last_delivered_id) для каждой группы, чтобы при потреблении в группе вы могли начать чтение с конца этого значения, чтобы гарантировать, что Повторите потребление.

Вышеизложенное является основной операцией группы потребителей.

Кроме того, когда группа потребителей потребляет, необходимо учитывать еще одну проблему, а именно, если потребитель получает сообщение, но не обрабатывает его успешно (например, процесс-потребитель не работает), сообщение может быть потеряно, потому что другие потребители в группе не могут снова использовать это сообщение.

Обсуждение решения продолжается ниже.

Список ожидающих сообщений

Чтобы решить проблему потери сообщений, вызванную сбоем потребителя во время чтения сообщений в группе, STREAM разработал список ожидающих для записи сообщений, которые были прочитаны, но не обработаны.

Команда XPENDING используется для получения необработанных сообщений группы потребителей или потребителя внутри потребителя. Демонстрация выглядит следующим образом:

127.0.0.1:6379> XPENDING mq mqGroup
1) (integer) 5 # 5 messages read but not processed
2) "1553585533795-0" # begin ID
3) "1553585533795-4" # end ID
4) 1) 1) "consumerA" # consumer A have 3 messages
      2) "3"
   2) 1) "consumerB" # consumer B have 1message
      2) "1"
   3) 1) "consumerC" # consumer C have 1message
      2) "1"

127.0.0.1:6379> XPENDING mq mqGroup - + 10 # Use the start end count option for details
1) 1) "1553585533795-0" # Message ID
   2) "consumerA" # consumer
   3) (integer) 1654355 # It has been 1654355ms from reading to now, IDLE
   4) (integer) 5 # The message was read 5 times,delivery counter
2) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 1654355
   4) (integer) 4
# A total of 5, the remaining 3 omitted ...

127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # Add the consumer parameter to get the Pending list of a specific consumer
1) 1) "1553585533795-0"
   2) "consumerA"
   3) (integer) 1641083
   4) (integer) 5
# A total of 3, the remaining 2 omitted ...

Каждое ожидающее сообщение имеет четыре свойства:

  • Message-ID
  • consumer
  • IDLE, время истекло
  • Счетчик доставки, сколько раз сообщение было прочитано

Из приведенных выше результатов мы видим, что все сообщения, которые мы читали ранее, записываются в список Ожидающие, указывая на то, что все прочитанные сообщения не были обработаны, а только прочитаны.

Итак, как это указывает на то, что потребитель закончил обработку сообщения?

Используйте команду XACK завершение, чтобы сообщить, что обработка сообщения завершена.

Демонстрация выглядит следующим образом:

127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # Notify message processing end, identified by message ID
(integer) 1

127.0.0.1:6379> XPENDING mq mqGroup # Check the Pending list again
1) (integer) 4 # The messages read but not processed have become 4
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # Consumer A, there are 2 message processing
      2) "2"
   2) 1) "consumerB"
      2) "1"
   3) 1) "consumerC"
      2) "1"
127.0.0.1:6379>

С таким механизмом ожидания это означает, что после того, как потребитель прочитает сообщение, но не обработает его, сообщение не будет потеряно.

Подождав, пока потребитель снова подключится к сети, вы можете прочитать список ожидающих и продолжить обработку сообщения, чтобы убедиться, что сообщение упорядочено и не потеряно.

В настоящее время существует еще одна проблема, то есть, если потребитель не имеет возможности выйти в сеть после того, как он вышел из строя, ему необходимо передать ожидающее сообщение потребителя другим потребителям для обработки, что является передачей сообщения.

Передача сообщений

Во время операции передачи сообщения сообщение передается в собственный список Ожидающих.

Чтобы использовать синтаксис XCLAIM, вам необходимо установить группу, целевого потребителя и идентификатор сообщения о передаче, а также указать IDLE (продолжительность чтения). Только по прошествии этого времени его можно будет перевести.

# The message currently belonging to consumer A is 1553585533795-1, which has been unprocessed for 15907,787ms
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 15907787
   4) (integer) 4

# Transfer message 1553585533795-1 over 3600s to consumer B's Pending list
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
   2) 1) "msg"
      2) "2"

# Message 1553585533795-1 has been transferred to Consumer B's Pending.
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerB"
   3) (integer) 84404 # IDLE, it's reset
   4) (integer) 5 # The number of reads is also accumulated by 1

Приведенный выше код завершает передачу сообщения. В дополнение к указанию идентификатора, в переводе также необходимо указать IDLE, чтобы гарантировать, что перевод не будет обрабатываться в течение длительного времени.

IDLE переданного сообщения будет сброшен, чтобы гарантировать, что оно не будет передано повторно. Предполагается, что могут выполняться параллельные операции по передаче сообщений с истекшим сроком действия нескольким потребителям одновременно. Если установлено IDLE, последующей передачи можно избежать. будет успешным, потому что IDLE не удовлетворяет условию.

Например, в следующих двух последовательных передачах вторая не удастся.

127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

Это передача сообщения. До сих пор мы использовали идентификатор ожидающего сообщения, атрибуты потребителя и IDLE, которому оно принадлежит, а еще один атрибут — количество раз, когда сообщение было прочитано, счетчик доставки. Функция этого атрибута — подсчитывать количество раз, когда сообщение было прочитано, включая передачу.

Это свойство в основном используется для определения того, являются ли данные неправильными.

Проблема недоставленных писем

Как было сказано выше, если сообщение не может быть обработано консументами, то есть не может быть подвергнуто XACK-у, оно долгое время будет находиться в списке Pending, даже если неоднократно будет передано различным консументам.

В это время счетчик доставки сообщения будет накапливаться (можно увидеть пример в предыдущем разделе), и когда он накапливается до определенного заданного нами порогового значения, мы считаем это плохой новостью (также называемой недоставленным письмом, DeadLetter, недоставлено). новости), из-за условий суждения мы можем просто разобраться с плохими новостями и удалить их.

Чтобы удалить сообщение, используйте синтаксис XDEL, как показано ниже:

# delete message from queue
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# Check that there is no more message in the queue
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
   2) 1) "msg"
      2) "1"
2) 1) "1553585533795-2"
   2) 1) "msg"
      2) "3"

Обратите внимание, что в этом примере сообщения в ожидании не удаляются, поэтому, если вы просматриваете ожидание, сообщения все еще там. XACK может быть выполнен, чтобы отметить его завершение!

Мониторинг информации

Stream предоставляет XINFO для мониторинга информации о сервере, которую можно запросить:

# View queue information
127.0.0.1:6379> xinfo stream mq
...
# Consumer group information
127.0.0.1:6379> xinfo groups mq
...
# Consumer Group Member Information
127.0.0.1:6379> xinfo consumers mq mqGroup
...

На этом описание работы очереди сообщений в целом заканчивается.

Давайте воспользуемся Golang для реализации очереди потоковых сообщений Redis.

Спасибо, что прочитали эту статью.

Следите за новостями.