Перейти к содержанию

Очереди и стримы

Если два сервиса общаются только синхронно — они становятся единой системой: падает один, падает другой. Очереди разрывают эту связь, добавляют буфер для всплесков и позволяют делать тяжёлую работу асинхронно. Здесь — про Kafka, RabbitMQ, NATS, и про главный паттерн надёжной публикации — Outbox.

Зачем нужны очереди

🧩 Простыми словами. Очередь — это «папка входящих» между сервисами. Один сервис кидает туда задачу и идёт дальше, второй когда-нибудь её заберёт и выполнит. Между ними может быть секунда, минута, или час — всё ок.

⚙️ Под капотом. Producer пишет message в queue/topic. Broker персистит и доставляет consumer'ам. Consumer обрабатывает и подтверждает (ack). Если упал — broker отдаст другому или повторит.

💥 Зачем это нужно.

  1. Decoupling. Producer не знает про consumer. Можно добавлять/менять consumer'ов независимо.
  2. Buffering. При всплеске publishers пишут быстро, consumer обрабатывает в своём темпе.
  3. Retry & DLQ. Сбой в consumer → автоматический повтор; постоянно падает → в dead-letter queue, разбираемся вручную.
  4. Async work. API возвращает 202 «принял», тяжёлая обработка идёт в background.
  5. Fan-out. Одно событие → много consumer'ов независимо.
graph LR
    P[Producer<br/>API service] --> Q[(Queue<br/>broker)]
    Q --> C1[Consumer A<br/>email]
    Q --> C2[Consumer B<br/>analytics]
    Q --> C3[Consumer C<br/>search index]

Queues vs streams

🧩 Простыми словами. Это два разных мира с похожими словами.

Queue (RabbitMQ, SQS) Stream (Kafka, Kinesis)
Модель Job queue: один забрал → другие не увидят Лог: все consumer'ы читают независимо
Storage Удаляется после ACK Хранится N дней (retention)
Throughput Тысячи ops/sec Миллионы ops/sec
Replay Невозможен (сообщение удалено) Easy: перемотал offset
Use case Tasks, notifications Event sourcing, аудит, analytics

⚙️ Streams — это commit log. Сообщения упорядочены в partition'ах, consumer запоминает свой offset. Можешь перематывать, читать с начала, читать параллельно. Идеально для event-driven архитектуры.

Сравнение брокеров

Apache Kafka

✅ Очень высокий throughput (миллионы msg/s), persistent log, репликация, exactly-once semantics, экосистема (Connect, Streams). ❌ Сложен в эксплуатации (Zookeeper / KRaft, partition tuning), latency на ack выше чем RabbitMQ.

Когда: event sourcing, analytics, audit log, межсервисная шина в крупном проде. Go: github.com/segmentio/kafka-go или github.com/twmb/franz-go.

RabbitMQ

✅ Очень гибкий routing (exchange types: direct/fanout/topic/headers), низкая latency, AMQP-стандарт, работает прозрачно. ❌ Не масштабируется как Kafka (queue per node), не log-based — replay требует дополнительной логики (streams в новых версиях есть, но не основной use case).

Когда: task queue для сервисов, RPC через очередь, сложная маршрутизация. Go: github.com/rabbitmq/amqp091-go.

NATS / NATS JetStream

✅ Очень лёгкий (один бинарь), низкая latency, JetStream даёт stream-семантику. Хорош для микросервисной шины и pub/sub. ❌ Меньшая экосистема, чем Kafka.

Когда: внутренняя шина в Kubernetes, low-latency pub/sub, lightweight stream. Go: github.com/nats-io/nats.go.

Amazon SQS / Google Pub/Sub

✅ Managed, не надо ничего админить, автомасштабирование. ❌ Lock-in в облако, ограничения (SQS — порядок только в FIFO, доплатой).

Когда: ты в AWS/GCP и не хочешь сам поднимать брокер.

📊 Чек-лист выбора.

graph TD
    Start{Что нужно?} --> Q1{Replay /<br/>event sourcing?}
    Q1 -->|Да| Kafka[Kafka / JetStream]
    Q1 -->|Нет| Q2{Сложная маршрутизация?}
    Q2 -->|Да| Rabbit[RabbitMQ]
    Q2 -->|Нет| Q3{Низкая latency<br/>в кластере?}
    Q3 -->|Да| NATS[NATS]
    Q3 -->|Нет| Q4{Managed?}
    Q4 -->|Да| SQS[SQS / Pub/Sub]

Гарантии доставки

🧩 Простыми словами. «Сколько раз сообщение точно доставится?»

At-most-once

«Хотя бы раз отправили, дошло — ок, не дошло — забыли». UDP-style. Подходит для метрик, где потеря единичных пакетов не страшна. Самый дешёвый.

At-least-once

«Доставим минимум один раз, возможно дубль». Стандарт для большинства брокеров. Consumer должен быть идемпотентным (см. resilience.md).

Exactly-once

«Ровно один раз». Долго считалось невозможным; сейчас Kafka EOS делает близко к нему через transactional producer + idempotent send + read-committed consumer.

⚠️ EOS работает в рамках Kafka. Как только сообщение покидает Kafka и пишется в Postgres — гарантия не транзитивна. Для end-to-end exactly-once нужна идемпотентность на стороне consumer'а (отдельная dedup-таблица).

// Kafka EOS producer (segmentio/kafka-go)
w := &kafka.Writer{
    Addr:                   kafka.TCP("kafka:9092"),
    Topic:                  "orders",
    RequiredAcks:           kafka.RequireAll,
    AllowAutoTopicCreation: false,
    Transport:              &kafka.Transport{...},
}
// + transactional ID, ProducerEpoch — для full EOS

Ordering guarantees

  • Kafka. Гарантирован порядок внутри partition. Между partition'ами — нет. → Если важен порядок по user_id, кладём user_id как key, чтобы все его сообщения попадали в один partition.
  • RabbitMQ. Порядок внутри одной queue. С несколькими consumer'ами — порядок ломается на consumer-side.
  • NATS JetStream. Per-subject порядок.
  • SQS standard. Без гарантии. SQS FIFO. В рамках MessageGroupId.

Consumer groups

🧩 Простыми словами. Несколько consumer-инстансов делят работу. Каждое сообщение получает один consumer из группы — параллельность.

graph LR
    Topic[Topic: 6 partitions] --> P1[part 0] --> CA1[Consumer A1]
    Topic --> P2[part 1] --> CA1
    Topic --> P3[part 2] --> CA2[Consumer A2]
    Topic --> P4[part 3] --> CA2
    Topic --> P5[part 4] --> CA3[Consumer A3]
    Topic --> P6[part 5] --> CA3
    Topic --> CB[Group B<br/>analytics]

⚙️ В Kafka. Группа = consumer group. Partition назначается одному consumer в группе. При добавлении/удалении consumer'а — rebalance.

📌 Чтобы масштабировать throughput, увеличиваешь partition'ы. Один partition обрабатывается одним consumer в группе → больше partition'ов = больше параллельности. Но pre-create достаточно: добавить partition'ы потом без переездов сложно.

Backpressure

🧩 Простыми словами. Producer пишет быстрее, чем consumer успевает. Очередь растёт, память кончается, latency взлетает.

⚙️ Стратегии.

  1. Bounded queue + reject. При заполнении producer получает ошибку и сам решает (повторить, отбросить).
  2. Drop oldest / drop newest. Метрики, дашборды — допустимо. Финансы — никогда.
  3. Pause consumer pull. Kafka так работает: consumer pull-модель, broker не пушит больше, чем consumer запросил.
  4. Scale consumers. Главный «правильный» ответ: больше параллельности.

В Go-сервисе backpressure часто реализуется через буферизированные каналы:

queue := make(chan Job, 1000) // backpressure: 1000 jobs max in flight

func produce(j Job) error {
    select {
    case queue <- j:
        return nil
    case <-time.After(50 * time.Millisecond):
        return errBusy // явный signal: тормози
    }
}

Dead letter queue

🧩 Простыми словами. Сообщение, которое не получилось обработать N раз подряд, отправляем в специальный «карантин» — DLQ. Чтобы не блокировать обычный поток и не загромождать ретраями.

graph LR
    Q[Main queue] --> C[Consumer]
    C -->|success| OK[done]
    C -->|fail| Retry[retry queue]
    Retry --> C
    Retry -->|N attempts| DLQ[Dead Letter Queue]
    DLQ --> Manual[Manual investigation]

⚙️ Что делать с DLQ.

  • Дашборд / алёрт: «есть messages в DLQ» → разбирайся.
  • Replay: после фикса консумера — выкатил, вернул сообщения в основную очередь.
  • Никогда не игнорь — DLQ растёт молча, и у вас сюрприз через месяц.

Outbox pattern (важно)

🧩 Простыми словами. Тебе нужно: (1) записать заказ в БД, (2) отправить событие OrderCreated в Kafka. Два внешних вызова, atomic между ними не сделать. Если БД ок, Kafka упала → событие потеряно. Если Kafka ок, БД rolled back → событие фантомное.

⚙️ Решение. Записываешь в одну транзакцию запись в orders и запись в outbox (та же БД). Отдельный воркер читает outbox и публикует в Kafka, помечая отправленные.

sequenceDiagram
    API->>DB: BEGIN
    API->>DB: INSERT order
    API->>DB: INSERT outbox(event=OrderCreated)
    API->>DB: COMMIT
    Note over Worker: периодически
    Worker->>DB: SELECT outbox WHERE sent=false
    Worker->>Kafka: publish event
    Worker->>DB: UPDATE outbox SET sent=true
CREATE TABLE outbox (
    id BIGSERIAL PRIMARY KEY,
    aggregate VARCHAR(100),
    event_type VARCHAR(100),
    payload JSONB,
    created_at TIMESTAMPTZ DEFAULT now(),
    sent_at TIMESTAMPTZ
);
CREATE INDEX idx_outbox_unsent ON outbox(id) WHERE sent_at IS NULL;
// в одной транзакции
tx, _ := db.Begin(ctx)
defer tx.Rollback(ctx)

if _, err := tx.Exec(ctx, "INSERT INTO orders ..."); err != nil { ... }
if _, err := tx.Exec(ctx,
    "INSERT INTO outbox(aggregate, event_type, payload) VALUES($1,$2,$3)",
    "order", "OrderCreated", payload); err != nil { ... }

tx.Commit(ctx)
// worker сам опубликует в Kafka

Гарантия at-least-once end-to-end через obviously трудолюбивый воркер. ✅ Атомарность БД-записи и события — теперь одна транзакция.

🧠 Альтернатива — Debezium / change-data-capture (CDC), читает WAL Postgres и пишет в Kafka. Сложнее, но не нужен воркер.

🔥 Пример: e-commerce checkout

graph LR
    User --> API
    API --> DB[(orders<br/>+ outbox)]
    DB -.async.- W[Outbox<br/>worker]
    W --> K[Kafka:<br/>order.created]
    K --> Pay[Payment service]
    K --> Inv[Inventory service]
    K --> Email[Email service]
    Pay --> KP[Kafka:<br/>payment.captured]
    KP --> API2[Order status update]

API возвращает 202 «принят», async-сервисы (payment, inventory, email) независимо обрабатывают событие. Если payment упал — повторим, не теряя заказ.

❌ Типичные ошибки

  • Sync вызов «потому что Kafka сложна» → один upstream падает = всё лежит.
  • Использование Kafka как DB (long retention + редкое чтение) — не оптимально.
  • Не идемпотентный consumer + at-least-once → дубли в БД.
  • Отсутствие DLQ → ядовитые сообщения зацикливают consumer.
  • Все события в один topic → партиции не помогают, ordering невозможен.
  • Не мониторят consumer lag → сюрприз: «у вас сегодняшние данные обрабатываются через 8 часов».

🛠 Применение в Go-проекте: чек-лист

  • Kafka producer → kafka-go.Writer. Async batching, gzip compression.
  • Kafka consumer → kafka-go.Reader или franz-go (быстрее).
  • Outbox worker → отдельный процесс, polling раз в 100 ms батчем по 100.
  • DLQ → отдельный topic + alert на ненулевое значение.
  • Метрики: producer success rate, consumer lag, DLQ depth.
  • Retry: exponential backoff внутри consumer, заметные через лог + metric.

🤖 Что спрашивает AI-ментор

  • Чем отличается queue от stream? Когда выбирать что?
  • Что такое at-least-once и почему consumer должен быть идемпотентным?
  • Расскажи про exactly-once в Kafka — как это работает и где не работает?
  • Что такое outbox pattern и какую проблему он решает?
  • Что такое consumer lag и как с ним бороться?

📊 Уровни глубины

L1. Знаешь термин очередь, видел Redis queue / Sidekiq. Понимаешь, что async вызовы быстрее.

L2. Различаешь Kafka / Rabbit / NATS, выбираешь обоснованно. Делал at-least-once consumer с идемпотентностью. Знаешь partition'ы и consumer groups.

L3. Настроил Kafka EOS. Имплементировал outbox pattern и видел в проде, как оно спасло данные. Решал backpressure (slow consumer, lag растёт). Использовал CDC через Debezium. Знаешь tradeoff log compaction, retention, segment size.

Полезные материалы

📺 Видео

📝 Подумай

  1. У тебя event «UserRegistered», на который должны реагировать 3 сервиса (email, analytics, recommend). Какой брокер выберешь?
  2. Consumer обрабатывает заказ → пишет в Postgres. Как сделать его идемпотентным?
  3. Зачем нужен outbox, если можно после INSERT просто публиковать в Kafka?
Ответ
  1. Kafka. Один topic user.registered, три consumer-группы (по одной на сервис). Каждая читает свой offset независимо. RabbitMQ тоже подойдёт (fanout exchange + 3 queue), но Kafka лучше масштабируется и даёт replay.
  2. Уникальный ключ события (event_id) + табличка processed_events(event_id). В транзакции: проверь существование → если нет, INSERT + INSERT в processed_events. Дубли от ретрая отсекутся ON CONFLICT DO NOTHING.
  3. Между INSERT и publish может произойти что угодно: процесс упадёт, Kafka не ответит. Если INSERT прошёл, но publish нет — событие «потеряно». Outbox делает обе записи атомарной транзакцией БД, а воркер потом гарантирует доставку (at-least-once).

Дальше: microservices.md — когда стоит пилить монолит на сервисы, и какие паттерны при этом нужны.