Создайте свою собственную систему легко
Redis 5.0 предлагает тип Stream. Буквально это потоковый тип, но на самом деле, с функциональной точки зрения, это должна быть идеальная реализация Redis для очередей сообщений (MQ, Message Queue).
Любой, кто использовал Redis в качестве очереди сообщений, знает, что существует множество реализаций очередей сообщений на основе Reids, например:
- PUB/SUB, модель подписки/публикации.
- Реализация LPUSH+BRPOP на основе списка.
- Реализация на основе Sorted-Set.
Каждая реализация имеет типичные особенности и проблемы.
Тип Stream, выпущенный в Redis 5.0, также используется для реализации типичной очереди сообщений. Внешний вид типа Stream удовлетворяет практически всему содержимому очереди сообщений, включая, но не ограничиваясь:
- Генерация сериализации идентификатора сообщения.
- Обход сообщения.
- Блокировка и неблокировка чтения сообщений.
- Пакетное потребление сообщений.
- Обработка ожидающих сообщений.
- Мониторинг очереди сообщений.
В очереди сообщений есть производители и потребители. Давайте испытаем чудеса типа Stream.
Новости производства
Команда XADD
используется для добавления сообщения к потоку (данные потока), как показано ниже:
127.0.0.1:6379> XADD memberMessage * user reggie msg Hello
"1553439850328-0"
127.0.0.1:6379> XADD memberMessage * user dwen msg World
"1553439858868-0"
Формат синтаксиса:
XADD key ID field string [field string ...]
Вам необходимо указать key
, схему сообщения ID
и содержимое сообщения, где содержание сообщения — key-value
data.
ID
, наиболее часто используемый*
, указывает, что идентификатор сообщения генерируется Redis, что также является настоятельно рекомендуемой схемой.field string
[строка поля] — это содержимое текущего сообщения, состоящее из одного или нескольких значений ключа.
В приведенном выше примере пользователь сообщения reggie
msg Hello
добавлен к ключу memberMessage
.
Redis генерирует идентификаторы сообщений, используя метки времени в миллисекундах и порядковые номера. В этот момент сообщение доступно в очереди сообщений.
Использование сообщений
XREAD
, читал сообщения из Stream, продемонстрировал следующее:
127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
2) 1) 1) "1553439850328-0"
2) 1) "user"
2) "reggie"
3) "msg"
4) "Hello"
2) 1) "1553439858868-0"
2) 1) "user"
2) "dwen"
3) "msg"
4) "World"
Приведенная выше команда предназначена для чтения всех сообщений из очереди сообщений memberMessage
.
XREAD
поддерживает множество параметров, формат синтаксиса:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
COUNT count
используется для ограничения количества получаемых сообщений.BLOCK milliseconds
используется для установкиXREAD
в режим блокировки, по умолчанию используется неблокирующий режим.ID
, который используется для установки идентификатора сообщения, с которого следует начать чтение. Используйте0
, чтобы начать с первого сообщения. В этом примере используется0
. Здесь следует отметить, что идентификатор очереди сообщений монотонно увеличивается, поэтому, установив начальную точку, его можно прочитать в обратном направлении. В режиме блокировки$
может использоваться для представления идентификатора последнего сообщения.$
не имеет значения в неблокирующем режиме.XREAD
делится на блокирующий и неблокирующий режимы при чтении сообщений. Опцию BLOCK можно использовать для указания режима блокировки, при этом необходимо установить продолжительность блокировки. В неблокирующем режиме он возвращается сразу после чтения (даже если сообщения нет), а в блокирующем режиме, если содержимое не может быть прочитано, он блокируется и ждет.
Типичное использование режима блокировки:
127.0.0.1:6379> XREAD block 1000 streams memberMessage $
(nil)
(1.07s)
Мы используем режим блокировки с $
как ID
, чтобы прочитать последнее сообщение, если сообщения нет, команда будет заблокирована. В процессе ожидания другие клиенты добавляют сообщения в очередь, и они будут немедленно прочитаны.
Таким образом, типичная очередь XADD
завершена с XREAD
блоком. XADD
отвечает за создание сообщений, а XREAD
отвечает за потребление сообщений.
Описание идентификатора сообщения
1553439850328–0
, сгенерированный XADD
, представляет собой идентификатор сообщения, сгенерированный Redis, который состоит из двух частей: отметка времени-серийный номер.
Временная метка указывается в миллисекундах и представляет собой время сервера Redis, сгенерировавшего сообщение. Это 64-битное целое число (int64).
Порядковый номер — это порядковый номер сообщения в этот миллисекундный момент времени, а также 64-битное целое число. Серьезно говоря, порядковый номер может переполниться, но возможно ли это?
Приращение порядкового номера можно проверить с помощью многопакетной обработки:
127.0.0.1:6379> MULTI
OK
127.0.0.1:6379> XADD memberMessage * msg one
QUEUED
127.0.0.1:6379> XADD memberMessage * msg two
QUEUED
127.0.0.1:6379> XADD memberMessage * msg three
QUEUED
127.0.0.1:6379> XADD memberMessage * msg four
QUEUED
127.0.0.1:6379> XADD memberMessage * msg five
QUEUED
127.0.0.1:6379> EXEC
1) "1553441006884-0"
2) "1553441006884-1"
3) "1553441006884-2"
4) "1553441006884-3"
5) "1553441006884-4"
Поскольку выполнение команды Redis происходит очень быстро, видно, что в пределах одной и той же временной метки сообщение представлено возрастающим порядковым номером.
Чтобы обеспечить упорядоченность сообщений, идентификаторы, сгенерированные Redis, монотонно увеличиваются по порядку. Поскольку идентификатор содержит часть метки времени, во избежание проблем, вызванных ошибками времени сервера (например, время сервера задерживается), для каждого типа Stream данных Redis поддерживается атрибут latest_generated_id
, который используется для записи идентификатора последнего сообщение.
Если обнаруживается, что текущая отметка времени является обратной (меньше той, что записана latest_generated_id
), в качестве идентификатора нового сообщения используется схема, в которой отметка времени не изменяется, а порядковый номер увеличивается (по этой же причине порядковый номер использует int64 для обеспечения достаточного количества порядковых номеров), тем самым обеспечивая монотонно возрастающий характер идентификатора.
Настоятельно рекомендуется использовать схему Redis для генерации идентификаторов сообщений, потому что эта монотонно увеличивающаяся схема идентификаторов временной метки + порядковый номер может удовлетворить почти все ваши потребности.
Но в то же время помните, что идентификаторы настраиваются, не забывайте!
Модель группы потребителей
Когда несколько потребителей используют очередь сообщений одновременно, они могут повторно использовать одно и то же сообщение, то есть в очереди сообщений есть десять сообщений, и все три потребителя могут использовать эти десять сообщений.
Но иногда нам нужно, чтобы несколько потребителей взаимодействовали, чтобы использовать одну и ту же очередь сообщений, то есть в очереди сообщений десять сообщений, и три потребителя потребляют некоторые из них соответственно.
Например, потребитель A потребляет сообщения 1,2,5,8
, потребитель B потребляет сообщения 4,9,10
, а потребитель C потребляет сообщения 3,6,7
.
То есть три потребителя взаимодействуют для завершения потребления сообщений, и этот режим можно использовать, когда мощность потребления недостаточна, то есть эффективность программы обработки сообщений невысока.
Этот паттерн является паттерном потребительской группы. Как показано ниже:
Поддержка режима группы потребителей в основном реализуется двумя командами:
XGROUP
, для управления группами потребителей, предоставления таких операций, как создание групп, уничтожение групп и обновление идентификаторов сообщений начала групп.XREADGROUP
, операция группового потребления сообщений.
Для демонстрации в демонстрации используются пять сообщений. Идея состоит в том, чтобы создать очередь сообщений Stream, а производитель генерирует пять сообщений.
Создайте группу потребителей в очереди сообщений, и три получателя в группе потребляют сообщения:
#
Producer generates 5 messages127.0.0.1:6379> MULTI 127.0.0.1:6379> XADD mq * msg 1 127.0.0.1:6379> XADD mq * msg 2 127.0.0.1:6379> XADD mq * msg 3 127.0.0.1:6379> XADD mq * msg 4 127.0.0.1:6379> XADD mq * msg 5 127.0.0.1:6379> EXEC 1) "1553585533795-0" 2) "1553585533795-1" 3) "1553585533795-2" 4) "1553585533795-3" 5) "1553585533795-4" #
Create a consumer groupmqGroup 127.0.0.1:6379> XGROUP CREATE mq mqGroup 0 #
Create a consumer group mgGroup for message queue mqOK #
Consumer A, Consumption Article 1127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 streams mq >
#Consumer A in the consumer group reads a message from the message queue mq1) 1) "mq" 2) 1) 1) "1553585533795-0" 2) 1) "msg" 2) "1" #
Consumer A, Consumption Article 2127.0.0.1:6379> XREADGROUP group mqGroup consumerA COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-1" 2) 1) "msg" 2) "2" #
Consumer B, Consumption Article 3127.0.0.1:6379> XREADGROUP group mqGroup consumerB COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-2" 2) 1) "msg" 2) "3" #
Consumer A, Consumption Article 4127.0.0.1:6379> XREADGROUP group mqGroup consumerA count 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-3" 2) 1) "msg" 2) "4" #
Consumer C, Consumption Article 5127.0.0.1:6379> XREADGROUP group mqGroup consumerC COUNT 1 STREAMS mq > 1) 1) "mq" 2) 1) 1) "1553585533795-4" 2) 1) "msg" 2) "5"
В приведенном выше примере, когда три потребителя A, B и C находятся в одной группе сообщений mqGroup
using (потребитель может указать это во время потребления, нет необходимости создавать его заранее), у них действует принцип взаимного исключения. План потребления: А->1, А->2, В->3, А->4, С->5.
XGROUP create mq mqGroup 0
используется для создания группы потребления mqGroup
в очереди сообщений mq
. Последний параметр — 0
, что означает, что группа начинает потребление с первого сообщения. Значение соответствует 0
из XREAD
).
Помимо поддержки CREATE, он также поддерживает SETID
установку начального идентификатора, DESTROY
уничтожение группы, DELCONSUMER
удаление потребителей в группе и другие операции.
XREADGROUP group mqGroup consumerA count 1 streams mq >
, используемого для потребления ConsumerA в группе mqGroup в очереди mq
, параметр >
указывает на начальное сообщение, которое не было использовано в группе, а параметр count 1 указывает на получение одного сообщения.
Синтаксис в основном такой же, как XREAD
, но добавлена концепция групп.
Основной принцип внутригруппового потребления заключается в том, что тип STREAM будет записывать идентификатор последнего обработанного (доставленного) сообщения (last_delivered_id) для каждой группы, чтобы при потреблении в группе вы могли начать чтение с конца этого значения, чтобы гарантировать, что Повторите потребление.
Вышеизложенное является основной операцией группы потребителей.
Кроме того, когда группа потребителей потребляет, необходимо учитывать еще одну проблему, а именно, если потребитель получает сообщение, но не обрабатывает его успешно (например, процесс-потребитель не работает), сообщение может быть потеряно, потому что другие потребители в группе не могут снова использовать это сообщение.
Обсуждение решения продолжается ниже.
Список ожидающих сообщений
Чтобы решить проблему потери сообщений, вызванную сбоем потребителя во время чтения сообщений в группе, STREAM разработал список ожидающих для записи сообщений, которые были прочитаны, но не обработаны.
Команда XPENDING
используется для получения необработанных сообщений группы потребителей или потребителя внутри потребителя. Демонстрация выглядит следующим образом:
127.0.0.1:6379> XPENDING mq mqGroup 1) (integer) 5 #
5 messages read but not processed2) "1553585533795-0" # begin ID 3) "1553585533795-4" # end ID 4) 1) 1) "consumerA" # consumer A have 3 messages 2) "3" 2) 1) "consumerB" # consumer B have 1message 2) "1" 3) 1) "consumerC" # consumer C have 1message 2) "1" 127.0.0.1:6379> XPENDING mq mqGroup - + 10 #
Use the start end count option for details1) 1) "1553585533795-0" # Message ID 2) "consumerA" # consumer 3) (integer) 1654355 #
It has been 1654355ms from reading to now, IDLE4) (integer) 5 #
The message was read 5 times,delivery counter 2) 1) "1553585533795-1" 2) "consumerA" 3) (integer) 1654355 4) (integer) 4 #
A total of 5, the remaining 3 omitted... 127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA #
Add the consumer parameter to get the Pending list of a specific consumer1) 1) "1553585533795-0" 2) "consumerA" 3) (integer) 1641083 4) (integer) 5 #
A total of 3, the remaining 2 omitted...
Каждое ожидающее сообщение имеет четыре свойства:
Message-ID
consumer
IDLE
, время истекло- Счетчик доставки, сколько раз сообщение было прочитано
Из приведенных выше результатов мы видим, что все сообщения, которые мы читали ранее, записываются в список Ожидающие, указывая на то, что все прочитанные сообщения не были обработаны, а только прочитаны.
Итак, как это указывает на то, что потребитель закончил обработку сообщения?
Используйте команду XACK
завершение, чтобы сообщить, что обработка сообщения завершена.
Демонстрация выглядит следующим образом:
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 #
Notify message processing end, identified by message ID(integer) 1 127.0.0.1:6379> XPENDING mq mqGroup #
Check the Pending list again1) (integer) 4 #
The messages read but not processed have become 42) "1553585533795-1" 3) "1553585533795-4" 4) 1) 1) "consumerA" #
Consumer A, there are 2 message processing2) "2" 2) 1) "consumerB" 2) "1" 3) 1) "consumerC" 2) "1" 127.0.0.1:6379>
С таким механизмом ожидания это означает, что после того, как потребитель прочитает сообщение, но не обработает его, сообщение не будет потеряно.
Подождав, пока потребитель снова подключится к сети, вы можете прочитать список ожидающих и продолжить обработку сообщения, чтобы убедиться, что сообщение упорядочено и не потеряно.
В настоящее время существует еще одна проблема, то есть, если потребитель не имеет возможности выйти в сеть после того, как он вышел из строя, ему необходимо передать ожидающее сообщение потребителя другим потребителям для обработки, что является передачей сообщения.
Передача сообщений
Во время операции передачи сообщения сообщение передается в собственный список Ожидающих.
Чтобы использовать синтаксис XCLAIM
, вам необходимо установить группу, целевого потребителя и идентификатор сообщения о передаче, а также указать IDLE (продолжительность чтения). Только по прошествии этого времени его можно будет перевести.
#
The message currently belonging to consumer A is 1553585533795-1, which has been unprocessed for 15907,787ms127.0.0.1:6379> XPENDING mq mqGroup - + 10 1) 1) "1553585533795-1" 2) "consumerA" 3) (integer) 15907787 4) (integer) 4 #
Transfer message 1553585533795-1 over 3600s to consumer B's Pending list127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1 1) 1) "1553585533795-1" 2) 1) "msg" 2) "2" #
Message 1553585533795-1 has been transferred to Consumer B's Pending.127.0.0.1:6379> XPENDING mq mqGroup - + 10 1) 1) "1553585533795-1" 2) "consumerB" 3) (integer) 84404 #
IDLE, it's reset4) (integer) 5 #
The number of reads is also accumulated by 1
Приведенный выше код завершает передачу сообщения. В дополнение к указанию идентификатора, в переводе также необходимо указать IDLE
, чтобы гарантировать, что перевод не будет обрабатываться в течение длительного времени.
IDLE
переданного сообщения будет сброшен, чтобы гарантировать, что оно не будет передано повторно. Предполагается, что могут выполняться параллельные операции по передаче сообщений с истекшим сроком действия нескольким потребителям одновременно. Если установлено IDLE
, последующей передачи можно избежать. будет успешным, потому что IDLE
не удовлетворяет условию.
Например, в следующих двух последовательных передачах вторая не удастся.
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1
Это передача сообщения. До сих пор мы использовали идентификатор ожидающего сообщения, атрибуты потребителя и IDLE
, которому оно принадлежит, а еще один атрибут — количество раз, когда сообщение было прочитано, счетчик доставки. Функция этого атрибута — подсчитывать количество раз, когда сообщение было прочитано, включая передачу.
Это свойство в основном используется для определения того, являются ли данные неправильными.
Проблема недоставленных писем
Как было сказано выше, если сообщение не может быть обработано консументами, то есть не может быть подвергнуто XACK-у, оно долгое время будет находиться в списке Pending, даже если неоднократно будет передано различным консументам.
В это время счетчик доставки сообщения будет накапливаться (можно увидеть пример в предыдущем разделе), и когда он накапливается до определенного заданного нами порогового значения, мы считаем это плохой новостью (также называемой недоставленным письмом, DeadLetter, недоставлено). новости), из-за условий суждения мы можем просто разобраться с плохими новостями и удалить их.
Чтобы удалить сообщение, используйте синтаксис XDEL
, как показано ниже:
#
delete message from queue127.0.0.1:6379> XDEL mq 1553585533795-1 (integer) 1 #
Check that there is no more message in the queue127.0.0.1:6379> XRANGE mq - + 1) 1) "1553585533795-0" 2) 1) "msg" 2) "1" 2) 1) "1553585533795-2" 2) 1) "msg" 2) "3"
Обратите внимание, что в этом примере сообщения в ожидании не удаляются, поэтому, если вы просматриваете ожидание, сообщения все еще там. XACK может быть выполнен, чтобы отметить его завершение!
Мониторинг информации
Stream предоставляет XINFO
для мониторинга информации о сервере, которую можно запросить:
#
View queue information127.0.0.1:6379> xinfo stream mq ...
#
Consumer group information127.0.0.1:6379> xinfo groups mq ...
#
Consumer Group Member Information127.0.0.1:6379> xinfo consumers mq mqGroup ...
На этом описание работы очереди сообщений в целом заканчивается.
Давайте воспользуемся Golang для реализации очереди потоковых сообщений Redis.
Спасибо, что прочитали эту статью.
Следите за новостями.