Перейти к содержанию

Задачи на многопоточку

10 задач, разобранных по шагам — то, что реально дают на собесах в Авито, Озоне, Тинькоффе и других. Каждая задача начинается с минимально-сломанного варианта, дальше — постепенный фикс. Ответ свёрнут.

Совет: не подсматривай сразу. Запусти у себя go run -race, посмотри что выводит, и только потом разверни ответ.

1. Базовый deadlock + race (9 шагов фикса)

Вот стартовая «битая» программа — её приносят на собесе и просят починить. Кажется простой, но в ней зашиты сразу несколько типичных косяков.

package main

import "fmt"

func main() {
    ch := make(chan int)
    counter := 0

    for i := 0; i < 10; i++ {
        go func() {
            ch <- i
        }()
    }

    for v := range ch {
        counter++
        fmt.Println(v, counter)
    }

    fmt.Println("total:", counter)
}

Что здесь не так? Подумай прежде чем разворачивать.

Ответ — 9 шагов починки

Шаг 1. Запускаем — fatal error: all goroutines are asleep - deadlock!. range ch ждёт close(ch), никто не закрывает.

Шаг 2. Закрываем ch после for-цикла? Нет — close произойдёт ДО того, как горутины успеют отправить, и они запаникуют (send on closed channel). Закрывать должна координирующая логика после того, как все senders завершились.

Шаг 3. Используем sync.WaitGroup, чтобы дождаться всех senders, и закрываем канал в отдельной горутине:

var wg sync.WaitGroup
for i := 0; i < 10; i++ {
    wg.Add(1)
    go func() {
        defer wg.Done()
        ch <- i
    }()
}
go func() { wg.Wait(); close(ch) }()

Шаг 4. Теперь нет deadlock'а, но ch <- i использует i из замыкания. До Go 1.22 — все 10 горутин видят последнее значение (обычно 10 и отправлять нечего). С 1.22 — каждая итерация имеет свежий i, ок. Если среда старая — передаём аргументом:

go func(i int) { defer wg.Done(); ch <- i }(i)

Шаг 5. Запускаем go run -race. Видим WARNING: DATA RACE на counter. Хотя counter++ в main только в одной горутине (читателе), гонки тут нет. Зато если бы было несколько читателей или мы где-то инкрементили из горутин — была бы.

Шаг 6. Усложним: счётчик инкрементим внутри sender'ов (типичный собесовский «теперь пусть каждый sender ещё и счётчик увеличивает»):

go func(i int) {
    defer wg.Done()
    ch <- i
    counter++ // ← гонка
}(i)

go run -race ловит это сразу.

Шаг 7. Чиним через sync/atomic:

var counter int64
atomic.AddInt64(&counter, 1)
fmt.Println("total:", atomic.LoadInt64(&counter))

Шаг 8. Альтернатива — sync.Mutex вокруг counter++. Чуть дороже atomic'а, но универсальнее (защищает любую критическую секцию, не только одну переменную).

Шаг 9. Финал: go run -race ./... чисто, wg.Wait() ждёт всех, close(ch) корректен, счётчик не врёт.

Эта задача — диагностический «детектор» сразу нескольких знаний: deadlock, close-by-sender, loop-variable, race-condition, atomic vs mutex.

2. Найти максимальный чётный — race + WaitGroup

package main

import "fmt"

func main() {
    nums := []int{3, 8, 1, 14, 7, 22, 5, 18, 11}
    max := 0
    for _, n := range nums {
        go func(n int) {
            if n%2 == 0 && n > max {
                max = n
            }
        }(n)
    }
    fmt.Println("max even:", max)
}

Что выведет, что не так, как починить?

Ответ

Скорее всего 0 — main не ждёт горутин и завершается раньше. Плюс гонка на max (read-modify-write без синхронизации).

Починка:

var (
    mu  sync.Mutex
    wg  sync.WaitGroup
    max int
)
for _, n := range nums {
    wg.Add(1)
    go func(n int) {
        defer wg.Done()
        if n%2 != 0 { return }
        mu.Lock()
        if n > max { max = n }
        mu.Unlock()
    }(n)
}
wg.Wait()
fmt.Println("max even:", max) // 22

Альтернатива — отдать чётные в канал, в main проагрегировать. Без mutex при таких размерах входа быстрее обычно последовательная версия, но задача — про конкурентность.

3. Ozon — for i { go fmt.Println(i) } без WG

package main

import "fmt"

func main() {
    for i := 0; i < 5; i++ {
        go fmt.Println(i)
    }
}

Что выведет?

Ответ

Скорее всего ничего: main завершается мгновенно, runtime убивает программу, горутины не успевают планироваться. Иногда увидишь 1–2 числа.

Поправка вариант A — time.Sleep(time.Second) (костыль, для демонстрации ок).

Вариант B — WaitGroup:

var wg sync.WaitGroup
for i := 0; i < 5; i++ {
    wg.Add(1)
    go func(i int) {
        defer wg.Done()
        fmt.Println(i)
    }(i)
}
wg.Wait()

Гочча на Go < 1.22 — без захвата i параметром все горутины увидят i == 5 (или последнее значение). С 1.22 уже свежая копия на итерацию.

4. Параллельные HTTP-запросы (google + avito)

Нужно одновременно сделать два HTTP-запроса (на https://google.com и https://avito.ru), вернуть тот, который ответил быстрее. Если оба упали — вернуть ошибку.

Ответ

Классический паттерн «first response wins» — через буферизированный канал и select с ctx.Done():

type result struct {
    body []byte
    err  error
    from string
}

func fastest(ctx context.Context, urls ...string) (result, error) {
    ch := make(chan result, len(urls)) // буфер = N, чтобы медленные не висли
    for _, u := range urls {
        go func(u string) {
            req, _ := http.NewRequestWithContext(ctx, "GET", u, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                ch <- result{err: err, from: u}
                return
            }
            defer resp.Body.Close()
            b, err := io.ReadAll(resp.Body)
            ch <- result{body: b, err: err, from: u}
        }(u)
    }

    var lastErr error
    for i := 0; i < len(urls); i++ {
        r := <-ch
        if r.err == nil {
            return r, nil // первый удачный — возвращаем
        }
        lastErr = r.err
    }
    return result{}, lastErr // все упали
}

Ключевое: - Буфер len(urls) — иначе медленные горутины повиснут на send, когда никто не читает (горутина leak). - Контекст пробрасывается в HTTP, чтобы при отмене — реальная отмена, а не «главная функция вышла, а горутина живая».

5. predictableFunc — context + ticker

Дано: функция с непредсказуемым временем выполнения. Нужно обернуть так, чтобы:

  • если она возвращается за 1 секунду — отдать результат.
  • если не возвращается — отдать ctx.DeadlineExceeded.
  • внутри функции каждые 200ms тикер пишет «still working...» в лог.
Ответ
func predictableFunc(parent context.Context, work func() (int, error)) (int, error) {
    ctx, cancel := context.WithTimeout(parent, time.Second)
    defer cancel()

    type res struct {
        v   int
        err error
    }
    out := make(chan res, 1) // буфер 1 — горутина не повиснет

    go func() {
        v, err := work()
        out <- res{v, err}
    }()

    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()

    for {
        select {
        case r := <-out:
            return r.v, r.err
        case <-ticker.C:
            log.Println("still working...")
        case <-ctx.Done():
            return 0, ctx.Err()
        }
    }
}

Самое важное — буферизированный канал out. Если бы был unbuffered, и work дольше секунды — main вышла, никто не читает, send висит, горутина течёт навсегда.

6. Распараллеливание — 10 000 запросов с WG и Mutex

Дан список из 10 000 URL'ов. Нужно отправить запросы параллельно, собрать ответы в общую map (url → status_code).

Ответ

Без лимита параллельности 10k горутин — это плохая идея (DNS-резолвер задохнётся, file descriptors кончатся). Используем семафор + WG + Mutex:

func fetchAll(urls []string, parallel int) map[string]int {
    results := make(map[string]int, len(urls))
    var (
        mu  sync.Mutex
        wg  sync.WaitGroup
        sem = make(chan struct{}, parallel) // например 100
    )

    for _, u := range urls {
        wg.Add(1)
        sem <- struct{}{} // занять слот
        go func(u string) {
            defer wg.Done()
            defer func() { <-sem }() // освободить слот

            resp, err := http.Get(u)
            code := 0
            if err == nil {
                code = resp.StatusCode
                resp.Body.Close()
            }

            mu.Lock()
            results[u] = code
            mu.Unlock()
        }(u)
    }
    wg.Wait()
    return results
}

Критика на собесе любят: «а почему семафор через канал, а не через sync.Semaphore?» — потому что в стандартной либе нет sync.Semaphore, есть в golang.org/x/sync/semaphore. Канал — идиоматичный вариант без зависимостей.

7. Worker pool на 100k запросов с лимитом 300

Нужно обработать 100 000 заданий, не превышая 300 одновременных соединений. Каждое задание — короткий HTTP-запрос. Возврат — slice результатов.

Ответ

Worker pool: фиксированное число воркеров читают из канала задач, пишут в канал результатов.

func process(jobs []Job) []Result {
    const workers = 300
    in := make(chan Job)
    out := make(chan Result, workers)

    var wg sync.WaitGroup
    for w := 0; w < workers; w++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for j := range in {
                out <- doJob(j)
            }
        }()
    }

    // фид-горутина
    go func() {
        for _, j := range jobs {
            in <- j
        }
        close(in)
    }()

    // закрыватель out
    go func() {
        wg.Wait()
        close(out)
    }()

    results := make([]Result, 0, len(jobs))
    for r := range out {
        results = append(results, r)
    }
    return results
}

Почему worker pool, а не «семафор на 300»? Семафор стартует горутину для каждой задачи (100k горутин на go-стеках в памяти, плюс шедулинг). Pool держит 300 long-living горутин, переиспользует их — заметно экономит память и шедулинг при большом N.

graph LR
    Producer --> InCh[in chan]
    InCh --> W1[Worker 1]
    InCh --> W2[Worker 2]
    InCh --> Wn[Worker 300]
    W1 --> OutCh[out chan]
    W2 --> OutCh
    Wn --> OutCh
    OutCh --> Aggregator

8. AsyncMerge каналов через дженерики

Реализуй функцию Merge[T any](chs ...<-chan T) <-chan T — fan-in: читает из N входных каналов, пишет всё в один выходной. Закрывает выходной, когда все входные закрыты.

Ответ
func Merge[T any](chs ...<-chan T) <-chan T {
    out := make(chan T)
    var wg sync.WaitGroup

    for _, ch := range chs {
        wg.Add(1)
        go func(ch <-chan T) {
            defer wg.Done()
            for v := range ch {
                out <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Вопрос на собесе: «А что если consumer медленный?» — все горутины блокируются на out <- v, пока кто-то не вычитает. Backpressure по дефолту. Если хочешь буфер — make(chan T, N).

«А если один из входных каналов никогда не закроется?» — wg.Wait() никогда не вернётся, out не закроется. Это feature: Merge «доверяет» отправителям корректно закрывать каналы.

9. GetFiles параллельно с context-отменой

Дан GetFiles(ctx context.Context, ids []int) ([]File, error). Каждый getFile(id) — медленный (200–500ms). Нужно вызывать параллельно, на первой ошибке — отменить остальные через context.

Ответ

Идиоматично — через errgroup:

import "golang.org/x/sync/errgroup"

func GetFiles(ctx context.Context, ids []int) ([]File, error) {
    files := make([]File, len(ids))
    g, ctx := errgroup.WithContext(ctx)

    for i, id := range ids {
        i, id := i, id // pre-1.22 capture; на 1.22+ можно убрать
        g.Go(func() error {
            f, err := getFile(ctx, id)
            if err != nil {
                return err // первая ошибка → ctx отменяется автоматом
            }
            files[i] = f // безопасно: каждый пишет в СВОЙ индекс
            return nil
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return files, nil
}

Тонкости: - Запись по индексу — без mutex, потому что разные горутины пишут в разные ячейки слайса. Race detector это разрешает. - errgroup.WithContext — на первой ошибке отменяет общий ctx, остальные горутины должны это уважать (передаём ctx внутрь getFile).

10. Do(ctx, []User) — параллельный fetchByName

Дано: func Do(ctx context.Context, users []User) ([]Profile, error). Для каждого user нужно вызвать fetchByName(ctx, user.Name) параллельно. Нужна сохранность порядка.

Ответ

Тот же паттерн что в задаче 9, но с явным ограничением параллельности (используем errgroup.SetLimit или semaphore-канал):

func Do(ctx context.Context, users []User) ([]Profile, error) {
    profiles := make([]Profile, len(users))
    g, ctx := errgroup.WithContext(ctx)
    g.SetLimit(20) // максимум 20 одновременных fetchByName

    for i, u := range users {
        i, u := i, u
        g.Go(func() error {
            p, err := fetchByName(ctx, u.Name)
            if err != nil {
                return fmt.Errorf("fetchByName(%s): %w", u.Name, err)
            }
            profiles[i] = p
            return nil
        })
    }
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return profiles, nil
}

Если errgroup нельзя использовать (например, на собесе сказали «без либ»):

sem := make(chan struct{}, 20)
var wg sync.WaitGroup
errCh := make(chan error, 1) // первая ошибка
ctx, cancel := context.WithCancel(ctx)
defer cancel()

for i, u := range users {
    select {
    case <-ctx.Done():
        return nil, ctx.Err()
    case sem <- struct{}{}:
    }
    wg.Add(1)
    go func(i int, u User) {
        defer wg.Done()
        defer func() { <-sem }()

        p, err := fetchByName(ctx, u.Name)
        if err != nil {
            select {
            case errCh <- err:
                cancel() // отменим остальные
            default:
            }
            return
        }
        profiles[i] = p
    }(i, u)
}
wg.Wait()
select {
case err := <-errCh:
    return nil, err
default:
    return profiles, nil
}

Менее красиво, чем errgroup — это и есть аргумент «почему стоит использовать golang.org/x/sync/errgroup».

📝 Подумай

  1. В задаче 4 (параллельные HTTP) — почему мы используем буферизированный канал размера len(urls), а не unbuffered?
  2. В задаче 7 (worker pool) — почему 300 long-living воркеров лучше, чем 100 000 коротких горутин с семафором на 300?
Ответ
  1. Если канал unbuffered, то когда первый ответ принят и main вышла — остальные горутины висят на ch <- result{...}, потому что некому читать. Это утечка горутин (goroutine leak). Буфер len(urls) гарантирует, что все горутины смогут записать и завершиться, даже если их результат уже не нужен.
  2. Каждая горутина — это аллокация стека (~2 KB старт, может расти). 100 000 горутин ≈ 200 МБ только на стеки + куча работы для шедулера (work-stealing между P, контекст-свитчи). Pool из 300 переиспользует те же горутины, держит память постоянной и снимает нагрузку с шедулера. Особенно заметно, когда задача очень короткая (миллисекунды) — оверхед создания горутины может быть сравним со временем работы.