×

Синхронізація в Go: горутини, тести, варіанти

Привіт, мене звати Ярослав, займаюсь розробкою в компанії Evrius. Ця стаття про синхронізацію результатів від паралельно виконаних підзадач, призначена для спеціалістів-початківців та тих, хто планує перейти на Go.

На початку 2019 року, маючи досвід з Go, шукав нову роботу. Під час більшості співбесід ставили запитання, як розпаралелити виконання завдання. Приблизний опис завдання: є список посилань, треба за ними перейти, отримати результат та синхронізувати. Вирішення було достатньо для проходження технічної частини в пару аутсорс-компаній.

Вартість горутини

Кожен розробник, який використовує Go, знає, що горутини дешеві. Трохи менше знають, що розмір мінімального стека горутини змінювали в ранніх версіях Go, у версії 1.13.

 _StackMin = 2048

А щоб перевірити горутини на швидкодію, напишемо тест, у якому запустимо N горутин з простим завданням, дочекаємося завершення й подивимося результати:

package benchmarks

import (
	"sync"
	"sync/atomic"
	"testing"
)

func BenchmarkGoroutineCost(b *testing.B) {
	var value uint32
	var wg sync.WaitGroup

	wg.Add(b.N)

	for i := 0; i < b.N; i++ {
		go func() {
			atomic.AddUint32(&value, 1)

			wg.Done()
		}()
	}

	wg.Wait()

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}
}

go version

go version go1.13.3 linux/amd64

go test./benchmarks/... -v -bench=BenchmarkGoroutineCost -benchmem

BenchmarkGoroutineCost-4 3437931 351 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.563s

~350 наносекунд на створення, виконання, завершення горутини.

Спершу я в це повірив, але потім вирішив написати ще один тест, щоб перевірити відсутність оптимізації такої простої операції, а також те, що чисельність горутин під час виконання збільшується.

package benchmarks

import (
	"runtime"
	"sync"
	"sync/atomic"
	"testing"
)

type goRuntimeMaxCount struct {
	mu    sync.Mutex
	value int
}

func (c *goRuntimeMaxCount) update() {
	var value = runtime.NumGoroutine()

	c.mu.Lock()
	if value > c.value {
		c.value = value
	}
	c.mu.Unlock()
}

func (c *goRuntimeMaxCount) get() int {
	return c.value
}

func BenchmarkGoroutineCostDump(b *testing.B) {
	var (
		value          = uint32(0)
		wg             = new(sync.WaitGroup)
		goRuntimeCount = new(goRuntimeMaxCount)
	)

	b.Logf("before goroutine count %d", goRuntimeCount.get())

	wg.Add(b.N)

	for i := 0; i < b.N; i++ {
		go func() {
			atomic.AddUint32(&value, 1)

			goRuntimeCount.update()

			wg.Done()
		}()
	}

	wg.Wait()

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}

	b.Logf("after goroutine count %d for b.N = %d", goRuntimeCount.get(), b.N)
}

 go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostDump -benchmem

BenchmarkGoroutineCostDump-4 3223651 366 ns/op 0 B/op 0 allocs/op
before goroutine count 0
after goroutine count 904 for b.N = 1000000
before goroutine count 0
after goroutine count 3160 for b.N = 3223651
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 1.562s

Тест показав, що горутини справді створюються. А тепер змінимо тест так, щоб на кожній ітерації циклу горутина створювалася і завершувалася:

func BenchmarkGoroutineCostOne(b *testing.B) {
	var value uint32
	var wg sync.WaitGroup

	for i := 0; i < b.N; i++ {
		wg.Add(1)
		go func() {
			atomic.AddUint32(&value, 1)

			wg.Done()
		}()
		wg.Wait()
	}

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}
}

func BenchmarkGoroutineCostOneOverhead(b *testing.B) {
	var value uint32
	var wg sync.WaitGroup

	for i := 0; i < b.N; i++ {
		wg.Add(1)
		atomic.AddUint32(&value, 1)
		wg.Done()
		wg.Wait()
	}

	if value != uint32(b.N) {
		b.Errorf("expected %d, got %d", b.N, value)
	}
}

go test ./benchmarks/... -v -bench=BenchmarkGoroutineCostOne -benchmem

BenchmarkGoroutineCostOne-4 1488328 841 ns/op 0 B/op 0 allocs/op
BenchmarkGoroutineCostOneOverhead-4 32435746 34.6 ns/op 0 B/op 0 allocs/op
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 3.229s

Тести проводив на домашньому Intel® Core™ i5-4210U CPU @ 1.70GHz, що показали вартість горутини ~800 наносекунд.

Для порівняння: знаходження максимального елементу в масиві з 1024 елементів ~1400 наносекунд.

Атомарність

Ми вже використовували пакет atomic, потрібний для паралельних операцій, щоб гарантувати їхню успішність.

Якщо з попередніх тестів забрати atomic і використовувати просте додавання:

func TestParallelPureIncrement(t *testing.T) {
	const n = 1000000

	var (
		value uint32
		wg    = new(sync.WaitGroup)
	)

	wg.Add(n)
	for i := 0; i < n; i++ {
		go func() {
			value++ // same sa "value = value + 1"

			wg.Done()
		}()
	}

	wg.Wait()

	if value != n {
		t.Errorf("expected %d, got %d", n, value)
	}
}

Отримаємо:

=== RUN TestParallelPureIncrement
--- FAIL: TestParallelPureIncrement (0.35s)
expected 1000000, got 851804
FAIL
FAIL gitlab.com/go-yp/go-sync/benchmarks 0.353s

Очікували 1 000 000, отримали 851 804, через відсутність синхронізації між горутинами.

На основі пакета atomic ґрунтуються інші структури в Go, що використовують для синхронізації. Так, у тесті замість WaitGroup.Wait, ми можемо використати циклічну перевірку:

func TestParallelPureAtomic(t *testing.T) {
	const n = 1000000

	var value uint32

	for i := 0; i < n; i++ {
		go func() {
			atomic.AddUint32(&value, 1)
		}()
	}

	for atomic.LoadUint32(&value) < n {
		// NOP
	}
}

=== RUN TestParallelPureAtomic
--- PASS: TestParallelPureAtomic (0.42s)
PASS
ok gitlab.com/go-yp/go-sync/benchmarks 0.425s

і додати перемикання на інші горутини runtime.Gosched()

for atomic.LoadUint32(&value) < n {
		runtime.Gosched()
	}

Проте ліпше використовуйте sync.WaitGroup.

Синхронізація результатів від горутин і перегони даних (data race)

Якщо ви початківець у Go, а вам треба розпаралелити завдання, то ліпше напишіть тест для цього завдання, розпаралельте й запустіть з флагом -race, щоб перевірити наявність data race.

Data race — помилка проектування, стан програми, коли один або більше потоків змінюють дані без блокування й один або більше читають ці дані без блокування, у результаті програма працює інакше, ніж очікуємо.

Знайдемо функціонал, що розробили в однопотоковому варіанті й зі збільшенням завдань — розпаралелили.

Наприклад, візьмемо сторінку bestofjs.org/projects, де навпроти кожного проекту бачимо число зірок на GitHub-і.

Раз на день ці числа треба оновлювати, а в базі всього 20 репозиторіїв.

Є функція, щоб отримати число зірок за ID репозиторію:

func fetchRepositoryStarByID(id int) int {
	// emulate slow http request by sleep
	time.Sleep(100 * time.Millisecond)

	// emulate response
	var stars = id % 32

	return stars
}

і програма, яку запускають раз на день, щоб оновити число зірочок для кожного репозиторію:

package main

import (
	"fmt"
	"time"
)

const repositoryCount = 20

type repository struct {
	id        int
	starCount int
}

func main() {
	var startTime = time.Now()

	var ids = getRepositoryIDs()
	var repositories = fetchRepositoryStarsByIDs(ids)

	updateRepositoryStars(repositories)

	var duration = time.Since(startTime)

	fmt.Printf("fetch %d from %d repositories by %d \n", len(repositories), repositoryCount, duration)
}

func getRepositoryIDs() []int {
	return make([]int, repositoryCount)
}

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var result = make([]repository, 0, len(ids))

	for _, id := range ids {
		result = append(result, repository{
			id:        id,
			starCount: fetchRepositoryStarByID(id),
		})
	}

	return result
}

func fetchRepositoryStarByID(id int) int {
	// emulate slow http request by sleep
	time.Sleep(100 * time.Millisecond)

	// emulate response
	var stars = id % 32

	return stars
}

func updateRepositoryStars(repositories []repository) {
	// NOP
}

Програма виконується за 2 секунди.

Опублікували React, Vue, Svelte й розширення до них, тепер у базі 100 репозиторіїв і програма виконується за 10 секунд.

З цим треба щось робити, бо чекаємо, що в проекті буде ще більше репозиторіїв, а отже, оновлення займатиме ще більше часу.

Вирішили розпаралелити з використанням горутин і WaitGroup.

Тепер fetchRepositoryStarsByIDs має такий вигляд:

package main

import (
	"fmt"
	"sync"
	"time"
)

const repositoryCount = 100

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, 0, length)
		wg     = new(sync.WaitGroup)
	)

	wg.Add(length)

	for _, id := range ids {
		go func() {
			result = append(result, repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			})

			wg.Done()
		}()
	}

	wg.Wait()

	return result
}

виконується за 100 мілісекунд, але в консолі бачимо, що тільки частину репозиторіїв оновлено:

fetch 81 from 100 repositories by 112999451

Запустимо з флагом -race

go run -race main.go

й отримаємо повідомлення, у яких рядках коду є помилки (вивів тільки частину повідомлення):

WARNING: DATA RACE
Read at 0×00c00009a020 by goroutine 8:
main.fetchRepositoryStarsByIDs.func1()
gitlab.com/go-yp/go-sync/main.go:41 +0×91

У цьому коді відразу дві помилки з data race.

Перша помилка data race: змінна id змінюється в основній горутині, де виконується цикл for і читається зі створених горутин.

Ось приклад, що покаже цю помилку:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	var ids = []int{1, 2, 3}
	var wg = new(sync.WaitGroup)

	wg.Add(3)
	for _, id := range ids {
		go func() {
			time.Sleep(time.Millisecond)

			fmt.Printf("id is %d\n", id)

			wg.Done()
		}()
	}

	wg.Wait()
}

id is 3
id is 3
id is 3

Варіанти розв’язання:

for _, id := range ids {
		go func(id int) {
			time.Sleep(time.Millisecond)

			fmt.Printf("id is %d\n", id)

			wg.Done()
		}(id)
	}
for _, id := range ids {
		id := id

		go func() {
			time.Sleep(time.Millisecond)

			fmt.Printf("id is %d\n", id)

			wg.Done()
		}()
	}

Друга помилка data race: це append, що змінює SliceHeader через append з багатьох горутин.

Є три відомі мені варіанти розв’язання проблеми data race під час збереження результатів від горутин.

У першому: ми ініціалізуємо slice, і кожна горутина пише у свій індекс:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)
	)

	wg.Add(length)

	for i, id := range ids {
		go func(i, id int) {
			result[i] = repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			wg.Done()
		}(i, id)
	}

	wg.Wait()

	return result
}

Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race.

fetch 100 from 100 repositories by 113026518

Якщо переглянути документацію — кожний елемент масиву як окрема змінна і в цьому прикладі з fetchRepositoryStarsByIDs кожна горутина працює зі своїм індексом (змінною), а отже, немає data race:

Structured variables of array, slice, and struct types have elements and fields that may be addressed individually. Each such element acts like a variable.

Це саме питання про запис у різні індекси є на stackoverflow.

Другий: обернути append в sync.Mutex:

package main

import (
	"fmt"
	"sync"
	"time"
)

const repositoryCount = 100

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, 0, length)
		wg     = new(sync.WaitGroup)
		mu     = new(sync.Mutex)
	)

	wg.Add(length)

	for _, id := range ids {
		go func(id int) {
			var starCount = fetchRepositoryStarByID(id)

			mu.Lock()
			result = append(result, repository{
				id:        id,
				starCount: starCount,
			})
			mu.Unlock()

			wg.Done()
		}(id)
	}

	wg.Wait()

	return result
}

Після запуску отримаємо очікуваний результат 100 зі 100 й без помилки data race (так само).

Жодної переваги перед першим варіантом.

Діє таке саме правило, що й у циклах:

  • Якщо дію можна винести за цикл, так ліпше й зробити.
  • Якщо дію можна винести за mutex, так ліпше й зробити.

Наприклад, написавши такий код:

go func(id int) {
	mu.Lock()
	var starCount = fetchRepositoryStarByID(id)

	result = append(result, repository{
		id:        id,
		starCount: starCount,
	})
	mu.Unlock()

	wg.Done()
}(id)

Програма буде заблокована під час виконання важкої операції fetchRepositoryStarByID і стане послідовною з часом виконання 10 секунд.

Третій: писати результати в канал, це стандартне рішення, бо канали створені для можливості писання й читання з багатьох горутин, без помилки data race:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var length = len(ids)
	// can also use channel with length, in this case result will be same
	// var resultChan = make(chan repository, length)
	var resultChan = make(chan repository)
	var wg = new(sync.WaitGroup)

	wg.Add(length)

	for _, id := range ids {
		go func(id int) {
			resultChan <- repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			wg.Done()
		}(id)
	}

	go func() {
		wg.Wait()
		// close chan and break read loop
		close(resultChan)
	}()

	var repositories = make([]repository, 0, length)
	// read loop, while resultChan is open
	for result := range resultChan {
		repositories = append(repositories, result)
	}

	return repositories
}

Перевага цього варіанта в тому, що замість збереження в slice ми можемо відразу починати опрацьовувати результати й навіть розпаралелити читання, якщо потрібно.

Є ще один варіант — комбінація першого й другого:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)
		index  = int32(-1)
	)

	wg.Add(length)

	for _, id := range ids {
		go func(id int) {
			newIndex := atomic.AddInt32(&index, 1)

			result[newIndex] = repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			wg.Done()
		}(id)
	}

	wg.Wait()

	return result
}

Throttling (rate limiting)

Проект із зірочками зростає і вже має 1000 репозиторіїв.

GitHub (або інший сервіс) починає повертати HTTP status 429 (too many requests) або HTTP Timeout замість зірок, коли є багато одночасних запитів.

У нашому прикладі ми додамо додатковий time.Sleep(time.Second) у fetchRepositoryStarByID, щоб емулювати затримку HTTP Timeout:

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

const repositoryCount = 1000

// ... same

func fetchRepositoryStarByID(id int) int {
	// emulate timeout
	if runtime.NumGoroutine() > 250 {
		time.Sleep(time.Second)
	}

	// emulate slow http request by sleep
	time.Sleep(100 * time.Millisecond)

	// emulate response
	var stars = id % 32

	return stars
}

Запустимо:

go run main.go

fetch 1000 from 1000 repositories by 1232180912

Як бачимо, тепер код виконується за секунду.

Тепер обмежмо число одночасних запитів за раз до 200.

Розгляньмо теж 3 варіанти (якщо комбінувати з попередніми, то, звісно, буде більше).

Найпростіший варіант — через канали (можна також пошукати за словом semaphore):

package main

import (
	"fmt"
	"runtime"
	"sync"
	"time"
)

const (
	repositoryCount = 1000
	requestLimit    = 200
)

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)

		// WE ADD THIS
		throttler = make(chan struct{}, requestLimit)
	)

	wg.Add(length)

	for i, id := range ids {
		// WE ADD THIS
		throttler <- struct{}{}

		go func(i, id int) {
			result[i] = repository{
				id:        id,
				starCount: fetchRepositoryStarByID(id),
			}

			// WE ADD THIS
			<-throttler
			wg.Done()
		}(i, id)
	}

	wg.Wait()
	close(throttler)

	return result
}

go run -race main.go

fetch 1000 from 1000 repositories by 535724238

За півсекунди.

Ми пишемо в канал 200 разів і запускаємо 200 горутин, запустити наступну зможемо тоді, коли хтось прочитає з каналу.

Варіант через запуск воркерів (workers).

Запускаємо N воркерів (де N — наша константа request Limit), що з одного каналу читають завдання, а в інший пишуть результат:

package main

// ... same

func workerFetchRepositoryStarByID(wg *sync.WaitGroup, requestIdChannel <-chan int, responseRepositoryChannel chan<- repository) {
	// read while requestIdChannel open
	for id := range requestIdChannel {
		var starCount = fetchRepositoryStarByID(id)

		responseRepositoryChannel <- repository{
			id:        id,
			starCount: starCount,
		}
	}

	wg.Done()
}

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length      = len(ids)
		result      = make([]repository, 0, length)
		workerCount = requestLimit
	)

	if workerCount > length {
		workerCount = length
	}

	var (
		requestIdChannel          = make(chan int, workerCount)
		responseRepositoryChannel = make(chan repository, workerCount)

		workerComplete = new(sync.WaitGroup)
		readComplete   = new(sync.WaitGroup)
	)

	workerComplete.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel)
	}

	readComplete.Add(1)
	go func() {
		for responseRepository := range responseRepositoryChannel {
			result = append(result, responseRepository)
		}
		readComplete.Done()
	}()

	for _, id := range ids {
		requestIdChannel <- id
	}

	close(requestIdChannel)
	workerComplete.Wait()

	close(responseRepositoryChannel)
	readComplete.Wait()

	return result
}

go run -race main.go

fetch 1000 from 1000 repositories by 540738968

Так багато коду для синхронізації, бо ми визначили розміри каналів, що менше загальної чисельності репозиторіїв.

Цей код працюватиме навіть з каналами без буфера:

var (
	requestIdChannel          = make(chan int, 0)
	responseRepositoryChannel = make(chan repository, 0)
)

Якщо ж використовувати ресурси пам’яті сповна, то код матиме простіший вигляд:

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length      = len(ids)
		result      = make([]repository, 0, length)
		workerCount = requestLimit
	)

	if workerCount > length {
		workerCount = length
	}

	var (
		requestIdChannel          = make(chan int, length)
		responseRepositoryChannel = make(chan repository, length)
		workerComplete            = new(sync.WaitGroup)
	)

	workerComplete.Add(workerCount)
	for i := 0; i < workerCount; i++ {
		go workerFetchRepositoryStarByID(workerComplete, requestIdChannel, responseRepositoryChannel)
	}

	for _, id := range ids {
		requestIdChannel <- id
	}
	close(requestIdChannel)

	workerComplete.Wait()

	close(responseRepositoryChannel)
	for responseRepository := range responseRepositoryChannel {
		result = append(result, responseRepository)
	}

	return result
}

і якщо потім захочемо змінити:

var (
	requestIdChannel          = make(chan int, 0)
	responseRepositoryChannel = make(chan repository, 0)
)

то заблокуємо виконання назавжди, коли воркери почнуть писати в responseRepositoryChannel, а читання вже після завершення.

Останній варіант — розділити початковий slice на пачки й для кожної розпаралелити виконання.

Ми використаємо зовнішній пакет, що поділить на діапазони:

package main

import (
	"fmt"
	"github.com/gopereza/packer"
	"runtime"
	"sync"
	"time"
)

const (
	repositoryCount = 1000
	requestLimit    = 200
)

// ... same

func fetchRepositoryStarsByIDs(ids []int) []repository {
	var (
		length = len(ids)
		result = make([]repository, length)
		wg     = new(sync.WaitGroup)
	)

	var packs = packer.Pack(length, requestLimit)

	for _, pack := range packs {
		for i := pack.From; i < pack.To; i++ {
			wg.Add(1)

			go func(i int) {
				var id = ids[i]

				result[i] = repository{
					id:        id,
					starCount: fetchRepositoryStarByID(id),
				}

				wg.Done()
			}(i)
		}

		wg.Wait()
	}

	return result
}

Схожий приклад бачив у реальному проекті.

Недолік такого рішення — одна тривала операція затримає виконання всієї пачки, тому переписував на варіант з воркерами.

Епілог

Ми в проекті вибрали варіант з воркерами, що відправляють пачками.

Репозиторій, у якому тестував варіанти.

Далі буде.

Все про українське ІТ в телеграмі — підписуйтеся на канал DOU

👍ПодобаєтьсяСподобалось2
До обраногоВ обраному7
LinkedIn

Схожі статті

  • Визначаємо вартість декоратора в GolangВизначаємо вартість декоратора в Golang

    Привіт, мене звати Ярослав, займаюсь розробкою сервісу для збереження активів у криптовалюті в компанії ITAdviser, розробляємо на Go. У цій статті розглянемо декоратор, його вартість і чи варто використовувати його в розробці нових сервісів. 57

  • Приклад gRPC-мікросервісу на GoПриклад gRPC-мікросервісу на Go

    Фреймворк gRPC можна розглядати як хорошу заміну REST під час взаємодії між мікросервісами. Ярослав Характерник, Golang Developer, демонструє розробку мікросервісу для збереження статистики. 39

  • Використання Defer у GoВикористання Defer у Go

    Defer — команда для відкладеного виконання дії перед завершенням основної функції. У цій статті йдеться про добре відомі приклади використання команди defer у Go. Автор також розглядає випадки, коли defer зайвий. 7




10 коментарів

Підписатись на коментаріВідписатись від коментарів Коментарі можуть залишати тільки користувачі з підтвердженими акаунтами.

красавчик
--------------
close(ch)
for r := range ch {}

Якщо знайшов помилку то поділись, бо код тестував перед публікацією

Я только сейчас увидел что range по готовым результатам. Все ок :)

Buffered channels are best closed by producers. Немного неожиданно было видеть что за 1 строчку от ридера закрывается канал

Странно. Недавно как раз обсуждалось, как Go популярен и продвинут, а тут за 5 дней всего 3 комента.

Так, бачив в темі про перспективи Go і там вже 245 коментарів, в цій статті ще буде пару коментарів про defer

хм, а ведь правда. И в статье Сергея про Java 9 всего 38 комментариев. И если продолжать странные выводы Сергея о продвинутости языка на основе комментов, то можно сделать вывод что Go в 6.44 раза продвинутей чем Java 9

Вот правильно говорит Алексей Шипилев, что performance engineer — это отдельная профессия, и когда люди думают, что они что-то измеряют, очень часто они не измеряют ничего... Чтобы реально измерять производительность, нужно понимать как работает система совсем на другом уровне, чем просто писать код.

Я и близко себя не отношу к людям, которые разбираются в том, как правильно заниматься производительностью, но вот измерение создания рутины через atomic похоже на то, что Вы не представляете что такое atomic и синхронизация тоже.

Очень толково, большое спасибо !

Шикарная статья с отличными примерами.

Підписатись на коментарі