Задачи на многопоточку¶
10 задач, разобранных по шагам — то, что реально дают на собесах в Авито, Озоне, Тинькоффе и других. Каждая задача начинается с минимально-сломанного варианта, дальше — постепенный фикс. Ответ свёрнут.
Совет: не подсматривай сразу. Запусти у себя go run -race, посмотри
что выводит, и только потом разверни ответ.
1. Базовый deadlock + race (9 шагов фикса)¶
Вот стартовая «битая» программа — её приносят на собесе и просят починить. Кажется простой, но в ней зашиты сразу несколько типичных косяков.
package main
import "fmt"
func main() {
ch := make(chan int)
counter := 0
for i := 0; i < 10; i++ {
go func() {
ch <- i
}()
}
for v := range ch {
counter++
fmt.Println(v, counter)
}
fmt.Println("total:", counter)
}
Что здесь не так? Подумай прежде чем разворачивать.
Ответ — 9 шагов починки
Шаг 1. Запускаем — fatal error: all goroutines are asleep - deadlock!.
range ch ждёт close(ch), никто не закрывает.
Шаг 2. Закрываем ch после for-цикла? Нет — close произойдёт ДО того,
как горутины успеют отправить, и они запаникуют (send on closed channel).
Закрывать должна координирующая логика после того, как все senders
завершились.
Шаг 3. Используем sync.WaitGroup, чтобы дождаться всех senders, и
закрываем канал в отдельной горутине:
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
ch <- i
}()
}
go func() { wg.Wait(); close(ch) }()
Шаг 4. Теперь нет deadlock'а, но ch <- i использует i из замыкания.
До Go 1.22 — все 10 горутин видят последнее значение (обычно 10 и
отправлять нечего). С 1.22 — каждая итерация имеет свежий i, ок. Если
среда старая — передаём аргументом:
Шаг 5. Запускаем go run -race. Видим WARNING: DATA RACE на
counter. Хотя counter++ в main только в одной горутине (читателе),
гонки тут нет. Зато если бы было несколько читателей или мы где-то
инкрементили из горутин — была бы.
Шаг 6. Усложним: счётчик инкрементим внутри sender'ов (типичный собесовский «теперь пусть каждый sender ещё и счётчик увеличивает»):
go run -race ловит это сразу.
Шаг 7. Чиним через sync/atomic:
Шаг 8. Альтернатива — sync.Mutex вокруг counter++. Чуть дороже
atomic'а, но универсальнее (защищает любую критическую секцию, не только
одну переменную).
Шаг 9. Финал: go run -race ./... чисто, wg.Wait() ждёт всех,
close(ch) корректен, счётчик не врёт.
Эта задача — диагностический «детектор» сразу нескольких знаний: deadlock, close-by-sender, loop-variable, race-condition, atomic vs mutex.
2. Найти максимальный чётный — race + WaitGroup¶
package main
import "fmt"
func main() {
nums := []int{3, 8, 1, 14, 7, 22, 5, 18, 11}
max := 0
for _, n := range nums {
go func(n int) {
if n%2 == 0 && n > max {
max = n
}
}(n)
}
fmt.Println("max even:", max)
}
Что выведет, что не так, как починить?
Ответ
Скорее всего 0 — main не ждёт горутин и завершается раньше. Плюс гонка
на max (read-modify-write без синхронизации).
Починка:
var (
mu sync.Mutex
wg sync.WaitGroup
max int
)
for _, n := range nums {
wg.Add(1)
go func(n int) {
defer wg.Done()
if n%2 != 0 { return }
mu.Lock()
if n > max { max = n }
mu.Unlock()
}(n)
}
wg.Wait()
fmt.Println("max even:", max) // 22
Альтернатива — отдать чётные в канал, в main проагрегировать. Без mutex при таких размерах входа быстрее обычно последовательная версия, но задача — про конкурентность.
3. Ozon — for i { go fmt.Println(i) } без WG¶
Что выведет?
Ответ
Скорее всего ничего: main завершается мгновенно, runtime убивает программу, горутины не успевают планироваться. Иногда увидишь 1–2 числа.
Поправка вариант A — time.Sleep(time.Second) (костыль, для демонстрации
ок).
Вариант B — WaitGroup:
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
fmt.Println(i)
}(i)
}
wg.Wait()
Гочча на Go < 1.22 — без захвата i параметром все горутины увидят
i == 5 (или последнее значение). С 1.22 уже свежая копия на итерацию.
4. Параллельные HTTP-запросы (google + avito)¶
Нужно одновременно сделать два HTTP-запроса (на https://google.com и
https://avito.ru), вернуть тот, который ответил быстрее. Если оба упали —
вернуть ошибку.
Ответ
Классический паттерн «first response wins» — через буферизированный канал
и select с ctx.Done():
type result struct {
body []byte
err error
from string
}
func fastest(ctx context.Context, urls ...string) (result, error) {
ch := make(chan result, len(urls)) // буфер = N, чтобы медленные не висли
for _, u := range urls {
go func(u string) {
req, _ := http.NewRequestWithContext(ctx, "GET", u, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
ch <- result{err: err, from: u}
return
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
ch <- result{body: b, err: err, from: u}
}(u)
}
var lastErr error
for i := 0; i < len(urls); i++ {
r := <-ch
if r.err == nil {
return r, nil // первый удачный — возвращаем
}
lastErr = r.err
}
return result{}, lastErr // все упали
}
Ключевое:
- Буфер len(urls) — иначе медленные горутины повиснут на send,
когда никто не читает (горутина leak).
- Контекст пробрасывается в HTTP, чтобы при отмене — реальная отмена,
а не «главная функция вышла, а горутина живая».
5. predictableFunc — context + ticker¶
Дано: функция с непредсказуемым временем выполнения. Нужно обернуть так, чтобы:
- если она возвращается за 1 секунду — отдать результат.
- если не возвращается — отдать
ctx.DeadlineExceeded. - внутри функции каждые 200ms тикер пишет «still working...» в лог.
Ответ
func predictableFunc(parent context.Context, work func() (int, error)) (int, error) {
ctx, cancel := context.WithTimeout(parent, time.Second)
defer cancel()
type res struct {
v int
err error
}
out := make(chan res, 1) // буфер 1 — горутина не повиснет
go func() {
v, err := work()
out <- res{v, err}
}()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
for {
select {
case r := <-out:
return r.v, r.err
case <-ticker.C:
log.Println("still working...")
case <-ctx.Done():
return 0, ctx.Err()
}
}
}
Самое важное — буферизированный канал out. Если бы был
unbuffered, и work дольше секунды — main вышла, никто не читает, send
висит, горутина течёт навсегда.
6. Распараллеливание — 10 000 запросов с WG и Mutex¶
Дан список из 10 000 URL'ов. Нужно отправить запросы параллельно, собрать
ответы в общую map (url → status_code).
Ответ
Без лимита параллельности 10k горутин — это плохая идея (DNS-резолвер задохнётся, file descriptors кончатся). Используем семафор + WG + Mutex:
func fetchAll(urls []string, parallel int) map[string]int {
results := make(map[string]int, len(urls))
var (
mu sync.Mutex
wg sync.WaitGroup
sem = make(chan struct{}, parallel) // например 100
)
for _, u := range urls {
wg.Add(1)
sem <- struct{}{} // занять слот
go func(u string) {
defer wg.Done()
defer func() { <-sem }() // освободить слот
resp, err := http.Get(u)
code := 0
if err == nil {
code = resp.StatusCode
resp.Body.Close()
}
mu.Lock()
results[u] = code
mu.Unlock()
}(u)
}
wg.Wait()
return results
}
Критика на собесе любят: «а почему семафор через канал, а не
через sync.Semaphore?» — потому что в стандартной либе нет
sync.Semaphore, есть в golang.org/x/sync/semaphore. Канал —
идиоматичный вариант без зависимостей.
7. Worker pool на 100k запросов с лимитом 300¶
Нужно обработать 100 000 заданий, не превышая 300 одновременных соединений. Каждое задание — короткий HTTP-запрос. Возврат — slice результатов.
Ответ
Worker pool: фиксированное число воркеров читают из канала задач, пишут в канал результатов.
func process(jobs []Job) []Result {
const workers = 300
in := make(chan Job)
out := make(chan Result, workers)
var wg sync.WaitGroup
for w := 0; w < workers; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := range in {
out <- doJob(j)
}
}()
}
// фид-горутина
go func() {
for _, j := range jobs {
in <- j
}
close(in)
}()
// закрыватель out
go func() {
wg.Wait()
close(out)
}()
results := make([]Result, 0, len(jobs))
for r := range out {
results = append(results, r)
}
return results
}
Почему worker pool, а не «семафор на 300»? Семафор стартует горутину для каждой задачи (100k горутин на go-стеках в памяти, плюс шедулинг). Pool держит 300 long-living горутин, переиспользует их — заметно экономит память и шедулинг при большом N.
graph LR
Producer --> InCh[in chan]
InCh --> W1[Worker 1]
InCh --> W2[Worker 2]
InCh --> Wn[Worker 300]
W1 --> OutCh[out chan]
W2 --> OutCh
Wn --> OutCh
OutCh --> Aggregator
8. AsyncMerge каналов через дженерики¶
Реализуй функцию Merge[T any](chs ...<-chan T) <-chan T — fan-in: читает
из N входных каналов, пишет всё в один выходной. Закрывает выходной, когда
все входные закрыты.
Ответ
func Merge[T any](chs ...<-chan T) <-chan T {
out := make(chan T)
var wg sync.WaitGroup
for _, ch := range chs {
wg.Add(1)
go func(ch <-chan T) {
defer wg.Done()
for v := range ch {
out <- v
}
}(ch)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
Вопрос на собесе: «А что если consumer медленный?» — все горутины
блокируются на out <- v, пока кто-то не вычитает. Backpressure по
дефолту. Если хочешь буфер — make(chan T, N).
«А если один из входных каналов никогда не закроется?» — wg.Wait()
никогда не вернётся, out не закроется. Это feature: Merge
«доверяет» отправителям корректно закрывать каналы.
9. GetFiles параллельно с context-отменой¶
Дан GetFiles(ctx context.Context, ids []int) ([]File, error). Каждый
getFile(id) — медленный (200–500ms). Нужно вызывать параллельно, на
первой ошибке — отменить остальные через context.
Ответ
Идиоматично — через errgroup:
import "golang.org/x/sync/errgroup"
func GetFiles(ctx context.Context, ids []int) ([]File, error) {
files := make([]File, len(ids))
g, ctx := errgroup.WithContext(ctx)
for i, id := range ids {
i, id := i, id // pre-1.22 capture; на 1.22+ можно убрать
g.Go(func() error {
f, err := getFile(ctx, id)
if err != nil {
return err // первая ошибка → ctx отменяется автоматом
}
files[i] = f // безопасно: каждый пишет в СВОЙ индекс
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return files, nil
}
Тонкости:
- Запись по индексу — без mutex, потому что разные горутины
пишут в разные ячейки слайса. Race detector это разрешает.
- errgroup.WithContext — на первой ошибке отменяет общий ctx,
остальные горутины должны это уважать (передаём ctx внутрь
getFile).
10. Do(ctx, []User) — параллельный fetchByName¶
Дано: func Do(ctx context.Context, users []User) ([]Profile, error). Для
каждого user нужно вызвать fetchByName(ctx, user.Name) параллельно.
Нужна сохранность порядка.
Ответ
Тот же паттерн что в задаче 9, но с явным ограничением параллельности
(используем errgroup.SetLimit или semaphore-канал):
func Do(ctx context.Context, users []User) ([]Profile, error) {
profiles := make([]Profile, len(users))
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(20) // максимум 20 одновременных fetchByName
for i, u := range users {
i, u := i, u
g.Go(func() error {
p, err := fetchByName(ctx, u.Name)
if err != nil {
return fmt.Errorf("fetchByName(%s): %w", u.Name, err)
}
profiles[i] = p
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return profiles, nil
}
Если errgroup нельзя использовать (например, на собесе сказали «без
либ»):
sem := make(chan struct{}, 20)
var wg sync.WaitGroup
errCh := make(chan error, 1) // первая ошибка
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, u := range users {
select {
case <-ctx.Done():
return nil, ctx.Err()
case sem <- struct{}{}:
}
wg.Add(1)
go func(i int, u User) {
defer wg.Done()
defer func() { <-sem }()
p, err := fetchByName(ctx, u.Name)
if err != nil {
select {
case errCh <- err:
cancel() // отменим остальные
default:
}
return
}
profiles[i] = p
}(i, u)
}
wg.Wait()
select {
case err := <-errCh:
return nil, err
default:
return profiles, nil
}
Менее красиво, чем errgroup — это и есть аргумент «почему стоит
использовать golang.org/x/sync/errgroup».
📝 Подумай¶
- В задаче 4 (параллельные HTTP) — почему мы используем буферизированный
канал размера
len(urls), а не unbuffered? - В задаче 7 (worker pool) — почему 300 long-living воркеров лучше, чем 100 000 коротких горутин с семафором на 300?
Ответ
- Если канал unbuffered, то когда первый ответ принят и main вышла —
остальные горутины висят на
ch <- result{...}, потому что некому читать. Это утечка горутин (goroutine leak). Буферlen(urls)гарантирует, что все горутины смогут записать и завершиться, даже если их результат уже не нужен. - Каждая горутина — это аллокация стека (~2 KB старт, может расти). 100 000 горутин ≈ 200 МБ только на стеки + куча работы для шедулера (work-stealing между P, контекст-свитчи). Pool из 300 переиспользует те же горутины, держит память постоянной и снимает нагрузку с шедулера. Особенно заметно, когда задача очень короткая (миллисекунды) — оверхед создания горутины может быть сравним со временем работы.