Очереди и стримы¶
Если два сервиса общаются только синхронно — они становятся единой системой: падает один, падает другой. Очереди разрывают эту связь, добавляют буфер для всплесков и позволяют делать тяжёлую работу асинхронно. Здесь — про Kafka, RabbitMQ, NATS, и про главный паттерн надёжной публикации — Outbox.
Зачем нужны очереди¶
🧩 Простыми словами. Очередь — это «папка входящих» между сервисами. Один сервис кидает туда задачу и идёт дальше, второй когда-нибудь её заберёт и выполнит. Между ними может быть секунда, минута, или час — всё ок.
⚙️ Под капотом. Producer пишет message в queue/topic. Broker персистит и доставляет consumer'ам. Consumer обрабатывает и подтверждает (ack). Если упал — broker отдаст другому или повторит.
💥 Зачем это нужно.
- Decoupling. Producer не знает про consumer. Можно добавлять/менять consumer'ов независимо.
- Buffering. При всплеске publishers пишут быстро, consumer обрабатывает в своём темпе.
- Retry & DLQ. Сбой в consumer → автоматический повтор; постоянно падает → в dead-letter queue, разбираемся вручную.
- Async work. API возвращает 202 «принял», тяжёлая обработка идёт в background.
- Fan-out. Одно событие → много consumer'ов независимо.
graph LR
P[Producer<br/>API service] --> Q[(Queue<br/>broker)]
Q --> C1[Consumer A<br/>email]
Q --> C2[Consumer B<br/>analytics]
Q --> C3[Consumer C<br/>search index]
Queues vs streams¶
🧩 Простыми словами. Это два разных мира с похожими словами.
| Queue (RabbitMQ, SQS) | Stream (Kafka, Kinesis) | |
|---|---|---|
| Модель | Job queue: один забрал → другие не увидят | Лог: все consumer'ы читают независимо |
| Storage | Удаляется после ACK | Хранится N дней (retention) |
| Throughput | Тысячи ops/sec | Миллионы ops/sec |
| Replay | Невозможен (сообщение удалено) | Easy: перемотал offset |
| Use case | Tasks, notifications | Event sourcing, аудит, analytics |
⚙️ Streams — это commit log. Сообщения упорядочены в partition'ах, consumer запоминает свой offset. Можешь перематывать, читать с начала, читать параллельно. Идеально для event-driven архитектуры.
Сравнение брокеров¶
Apache Kafka¶
✅ Очень высокий throughput (миллионы msg/s), persistent log, репликация, exactly-once semantics, экосистема (Connect, Streams). ❌ Сложен в эксплуатации (Zookeeper / KRaft, partition tuning), latency на ack выше чем RabbitMQ.
Когда: event sourcing, analytics, audit log, межсервисная шина в крупном проде.
Go: github.com/segmentio/kafka-go или github.com/twmb/franz-go.
RabbitMQ¶
✅ Очень гибкий routing (exchange types: direct/fanout/topic/headers), низкая latency, AMQP-стандарт, работает прозрачно. ❌ Не масштабируется как Kafka (queue per node), не log-based — replay требует дополнительной логики (streams в новых версиях есть, но не основной use case).
Когда: task queue для сервисов, RPC через очередь, сложная маршрутизация.
Go: github.com/rabbitmq/amqp091-go.
NATS / NATS JetStream¶
✅ Очень лёгкий (один бинарь), низкая latency, JetStream даёт stream-семантику. Хорош для микросервисной шины и pub/sub. ❌ Меньшая экосистема, чем Kafka.
Когда: внутренняя шина в Kubernetes, low-latency pub/sub, lightweight stream.
Go: github.com/nats-io/nats.go.
Amazon SQS / Google Pub/Sub¶
✅ Managed, не надо ничего админить, автомасштабирование. ❌ Lock-in в облако, ограничения (SQS — порядок только в FIFO, доплатой).
Когда: ты в AWS/GCP и не хочешь сам поднимать брокер.
📊 Чек-лист выбора.
graph TD
Start{Что нужно?} --> Q1{Replay /<br/>event sourcing?}
Q1 -->|Да| Kafka[Kafka / JetStream]
Q1 -->|Нет| Q2{Сложная маршрутизация?}
Q2 -->|Да| Rabbit[RabbitMQ]
Q2 -->|Нет| Q3{Низкая latency<br/>в кластере?}
Q3 -->|Да| NATS[NATS]
Q3 -->|Нет| Q4{Managed?}
Q4 -->|Да| SQS[SQS / Pub/Sub]
Гарантии доставки¶
🧩 Простыми словами. «Сколько раз сообщение точно доставится?»
At-most-once¶
«Хотя бы раз отправили, дошло — ок, не дошло — забыли». UDP-style. Подходит для метрик, где потеря единичных пакетов не страшна. Самый дешёвый.
At-least-once¶
«Доставим минимум один раз, возможно дубль». Стандарт для большинства брокеров. Consumer должен быть идемпотентным (см. resilience.md).
Exactly-once¶
«Ровно один раз». Долго считалось невозможным; сейчас Kafka EOS делает близко к нему через transactional producer + idempotent send + read-committed consumer.
⚠️ EOS работает в рамках Kafka. Как только сообщение покидает Kafka и пишется в Postgres — гарантия не транзитивна. Для end-to-end exactly-once нужна идемпотентность на стороне consumer'а (отдельная dedup-таблица).
// Kafka EOS producer (segmentio/kafka-go)
w := &kafka.Writer{
Addr: kafka.TCP("kafka:9092"),
Topic: "orders",
RequiredAcks: kafka.RequireAll,
AllowAutoTopicCreation: false,
Transport: &kafka.Transport{...},
}
// + transactional ID, ProducerEpoch — для full EOS
Ordering guarantees¶
- Kafka. Гарантирован порядок внутри partition. Между partition'ами — нет.
→ Если важен порядок по
user_id, кладёмuser_idкак key, чтобы все его сообщения попадали в один partition. - RabbitMQ. Порядок внутри одной queue. С несколькими consumer'ами — порядок ломается на consumer-side.
- NATS JetStream. Per-subject порядок.
- SQS standard. Без гарантии. SQS FIFO. В рамках MessageGroupId.
Consumer groups¶
🧩 Простыми словами. Несколько consumer-инстансов делят работу. Каждое сообщение получает один consumer из группы — параллельность.
graph LR
Topic[Topic: 6 partitions] --> P1[part 0] --> CA1[Consumer A1]
Topic --> P2[part 1] --> CA1
Topic --> P3[part 2] --> CA2[Consumer A2]
Topic --> P4[part 3] --> CA2
Topic --> P5[part 4] --> CA3[Consumer A3]
Topic --> P6[part 5] --> CA3
Topic --> CB[Group B<br/>analytics]
⚙️ В Kafka. Группа = consumer group. Partition назначается одному consumer в группе. При добавлении/удалении consumer'а — rebalance.
📌 Чтобы масштабировать throughput, увеличиваешь partition'ы. Один partition обрабатывается одним consumer в группе → больше partition'ов = больше параллельности. Но pre-create достаточно: добавить partition'ы потом без переездов сложно.
Backpressure¶
🧩 Простыми словами. Producer пишет быстрее, чем consumer успевает. Очередь растёт, память кончается, latency взлетает.
⚙️ Стратегии.
- Bounded queue + reject. При заполнении producer получает ошибку и сам решает (повторить, отбросить).
- Drop oldest / drop newest. Метрики, дашборды — допустимо. Финансы — никогда.
- Pause consumer pull. Kafka так работает: consumer pull-модель, broker не пушит больше, чем consumer запросил.
- Scale consumers. Главный «правильный» ответ: больше параллельности.
В Go-сервисе backpressure часто реализуется через буферизированные каналы:
queue := make(chan Job, 1000) // backpressure: 1000 jobs max in flight
func produce(j Job) error {
select {
case queue <- j:
return nil
case <-time.After(50 * time.Millisecond):
return errBusy // явный signal: тормози
}
}
Dead letter queue¶
🧩 Простыми словами. Сообщение, которое не получилось обработать N раз подряд, отправляем в специальный «карантин» — DLQ. Чтобы не блокировать обычный поток и не загромождать ретраями.
graph LR
Q[Main queue] --> C[Consumer]
C -->|success| OK[done]
C -->|fail| Retry[retry queue]
Retry --> C
Retry -->|N attempts| DLQ[Dead Letter Queue]
DLQ --> Manual[Manual investigation]
⚙️ Что делать с DLQ.
- Дашборд / алёрт: «есть messages в DLQ» → разбирайся.
- Replay: после фикса консумера — выкатил, вернул сообщения в основную очередь.
- Никогда не игнорь — DLQ растёт молча, и у вас сюрприз через месяц.
Outbox pattern (важно)¶
🧩 Простыми словами. Тебе нужно: (1) записать заказ в БД, (2) отправить
событие OrderCreated в Kafka. Два внешних вызова, atomic между ними не сделать.
Если БД ок, Kafka упала → событие потеряно. Если Kafka ок, БД rolled back →
событие фантомное.
⚙️ Решение. Записываешь в одну транзакцию запись в orders и запись в
outbox (та же БД). Отдельный воркер читает outbox и публикует в Kafka,
помечая отправленные.
sequenceDiagram
API->>DB: BEGIN
API->>DB: INSERT order
API->>DB: INSERT outbox(event=OrderCreated)
API->>DB: COMMIT
Note over Worker: периодически
Worker->>DB: SELECT outbox WHERE sent=false
Worker->>Kafka: publish event
Worker->>DB: UPDATE outbox SET sent=true
CREATE TABLE outbox (
id BIGSERIAL PRIMARY KEY,
aggregate VARCHAR(100),
event_type VARCHAR(100),
payload JSONB,
created_at TIMESTAMPTZ DEFAULT now(),
sent_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unsent ON outbox(id) WHERE sent_at IS NULL;
// в одной транзакции
tx, _ := db.Begin(ctx)
defer tx.Rollback(ctx)
if _, err := tx.Exec(ctx, "INSERT INTO orders ..."); err != nil { ... }
if _, err := tx.Exec(ctx,
"INSERT INTO outbox(aggregate, event_type, payload) VALUES($1,$2,$3)",
"order", "OrderCreated", payload); err != nil { ... }
tx.Commit(ctx)
// worker сам опубликует в Kafka
✅ Гарантия at-least-once end-to-end через obviously трудолюбивый воркер. ✅ Атомарность БД-записи и события — теперь одна транзакция.
🧠 Альтернатива — Debezium / change-data-capture (CDC), читает WAL Postgres и пишет в Kafka. Сложнее, но не нужен воркер.
🔥 Пример: e-commerce checkout¶
graph LR
User --> API
API --> DB[(orders<br/>+ outbox)]
DB -.async.- W[Outbox<br/>worker]
W --> K[Kafka:<br/>order.created]
K --> Pay[Payment service]
K --> Inv[Inventory service]
K --> Email[Email service]
Pay --> KP[Kafka:<br/>payment.captured]
KP --> API2[Order status update]
API возвращает 202 «принят», async-сервисы (payment, inventory, email) независимо обрабатывают событие. Если payment упал — повторим, не теряя заказ.
❌ Типичные ошибки¶
- Sync вызов «потому что Kafka сложна» → один upstream падает = всё лежит.
- Использование Kafka как DB (long retention + редкое чтение) — не оптимально.
- Не идемпотентный consumer + at-least-once → дубли в БД.
- Отсутствие DLQ → ядовитые сообщения зацикливают consumer.
- Все события в один topic → партиции не помогают, ordering невозможен.
- Не мониторят consumer lag → сюрприз: «у вас сегодняшние данные обрабатываются через 8 часов».
🛠 Применение в Go-проекте: чек-лист¶
- Kafka producer →
kafka-go.Writer. Async batching, gzip compression. - Kafka consumer →
kafka-go.Readerилиfranz-go(быстрее). - Outbox worker → отдельный процесс, polling раз в 100 ms батчем по 100.
- DLQ → отдельный topic + alert на ненулевое значение.
- Метрики: producer success rate, consumer lag, DLQ depth.
- Retry: exponential backoff внутри consumer, заметные через лог + metric.
🤖 Что спрашивает AI-ментор¶
- Чем отличается queue от stream? Когда выбирать что?
- Что такое at-least-once и почему consumer должен быть идемпотентным?
- Расскажи про exactly-once в Kafka — как это работает и где не работает?
- Что такое outbox pattern и какую проблему он решает?
- Что такое consumer lag и как с ним бороться?
📊 Уровни глубины¶
L1. Знаешь термин очередь, видел Redis queue / Sidekiq. Понимаешь, что async вызовы быстрее.
L2. Различаешь Kafka / Rabbit / NATS, выбираешь обоснованно. Делал at-least-once consumer с идемпотентностью. Знаешь partition'ы и consumer groups.
L3. Настроил Kafka EOS. Имплементировал outbox pattern и видел в проде, как оно спасло данные. Решал backpressure (slow consumer, lag растёт). Использовал CDC через Debezium. Знаешь tradeoff log compaction, retention, segment size.
Полезные материалы¶
📺 Видео
- Outbox pattern — как и зачем (RU) — атомарность INSERT и publish через одну транзакцию.
- Брокеры — обзор и сравнение (RU) — Kafka vs RabbitMQ vs NATS.
- Kafka — внутренности (RU) — partition, offset, consumer group.
- Гарантии доставки (RU) — at-most-once / at-least-once / exactly-once.
- Идемпотентность consumer'а (RU) — почему at-least-once требует dedup на стороне consumer'а.
- Сценарии использования брокеров (RU) — когда что выбирать.
📝 Подумай¶
- У тебя event «UserRegistered», на который должны реагировать 3 сервиса (email, analytics, recommend). Какой брокер выберешь?
- Consumer обрабатывает заказ → пишет в Postgres. Как сделать его идемпотентным?
- Зачем нужен outbox, если можно после INSERT просто публиковать в Kafka?
Ответ
- Kafka. Один topic
user.registered, три consumer-группы (по одной на сервис). Каждая читает свой offset независимо. RabbitMQ тоже подойдёт (fanout exchange + 3 queue), но Kafka лучше масштабируется и даёт replay. - Уникальный ключ события (
event_id) + табличкаprocessed_events(event_id). В транзакции: проверь существование → если нет, INSERT + INSERT в processed_events. Дубли от ретрая отсекутся ON CONFLICT DO NOTHING. - Между INSERT и publish может произойти что угодно: процесс упадёт, Kafka не ответит. Если INSERT прошёл, но publish нет — событие «потеряно». Outbox делает обе записи атомарной транзакцией БД, а воркер потом гарантирует доставку (at-least-once).
Дальше: microservices.md — когда стоит пилить монолит на сервисы, и какие паттерны при этом нужны.