Перевод “Measuring your system’s performance using software (Go edition),” in Daniel Lemire’s blog, March 17, 2024.


Разрабатывая программы, мы работаем над абстракцией системы. Оборудование может не знать о ваших функциях, переменных и данных. Оно может видеть только биты и инструкции. И все же, чтобы писать эффективные программы, программист должен знать характеристики базовой системы. К счастью, мы можем использовать и само программное обеспечение, чтобы наблюдать за поведением системы с помощью экспериментов.

Между программным и аппаратным обеспечением существует несколько слоев, таких как компиляторы, операционная система и драйвера. Хороший программист должен учитывать эти слои, когда это необходимо. Хороший программист также должен понимать поведение своего программного обеспечения с точки зрения этих слоев.

Бенчмарки в Go

Чтобы измерить производительность, мы часто измеряем время, необходимое для выполнения некоторой функции. Поскольку большинство функций работают быстро, бывает трудно точно измерить время, которое занимает функция, если запустить ее только один раз. Вместо этого мы можем запустить функцию много раз и записать общее время. Затем мы можем разделить общее время на количество запусков. Определить, сколько раз нужно выполнить функцию, бывает непросто: отчасти это зависит от того, насколько быстрой является функция. Если функция выполняется 6 секунд, возможно, мы не хотим или не должны выполнять ее слишком часто. Более простая стратегия - указать минимальную продолжительность и многократно вызывать функцию, пока мы не достигнем или не превысим минимальную продолжительность.

Если функция имеет малое время выполнения, мы часто называем бенчмарк микробенчмарком. Мы используем микробенчмарки для сравнения различных реализаций одной и той же функциональности или для лучшего понимания системы или проблемы. При этом всегда следует помнить, что микробенчмарк сам по себе не может быть использован для обоснования оптимизации программного обеспечения. Производительность в реальном мире зависит от множества факторов, которые сложно представить в микробенчмарке.

Важно отметить, что все бенчмарки подвержены влиянию ошибок измерений и помех со стороны системы. Что еще хуже, распределение таймингов может не соответствовать нормальному распределению.

Многие языки программирования предоставляют возможность запускать бенчмарки. В Go тулинг облегчает написание бенчмарков. Вы можете импортировать пакет testing и создать функцию с префиксом Benchmark и параметром-указателем типа testing.B. Например, следующая программа определяет время, необходимое для вычисления факториала числа 10:

package main

import (
    "fmt"
    "testing"
)

var fact int

func BenchmarkFactorial(b *testing.B) {
    for n := 0; n < b.N; n++ {
        fact = 1
        for i := 1; i <= 10; i++ {
            fact *= i
        }
    }
}

func main() {
    res := testing.Benchmark(BenchmarkFactorial)
    fmt.Println("BenchmarkFactorial", res)
}

Если вы добавите функции с такой сигнатурой (BenchmarkSomething(b *testing.B)), как часть ваших тестов в проекте, вы сможите запускать их командой go test -bench .. Чтобы запустить только одну из них, можно указать паттерн, например go test -bench Factorial, который будет запускать только бенчмарки содержащие слово Factorial.

b.N указывает, сколько раз выполняется эталонная функция. Пакет тестирования настраивает это значение, увеличивая его до тех пор, пока контрольная функция не проработает хотя бы одну секунду.

Измерение выделения памяти

В Go у каждой функции есть своя “стековая память”. Как следует из названия, стековая память выделяется и удаляется в порядке очереди (LIFO). Эта память обычно используется только внутри функции, и ее размер часто ограничен. Другой тип памяти, который может использовать программа на Go, - это память на кучи(heap). Такая память выделяется и освобождается в случайном порядке. Существует только одна куча, общая для всех функций.

При использовании стековой памяти нет риска, что память может быть потеряна или использована не по назначению, поскольку она принадлежит определенной функции и может быть освобождена по ее завершении. С динамической памятью больше проблем: не всегда ясно когда память должна быть освобождена. Языки программирования вроде Go полагаются на сборщик мусора(garbage collector) для решения этой проблемы. Например, когда мы создаем новый слайс с помощью функции make, нам не нужно беспокоиться об освобождении памяти. Go автоматически освобождает ее. Тем не менее, постоянное выделение и удаление памяти может негативно сказаться на производительности. Во многих реальных системах управление памятью становится узким местом.

Поэтому иногда бывает интересно включить использование памяти в бенчмарк. Пакет тестирования Go позволяет измерить количество выделений памяти на куче. Как правило, в Go это примерно соответствует количеству вызовов и количеству объектов, которые должен обработать сборщик мусора. Следующая программа вычисляет факториал, храня его вычисления в динамически выделяемых фрагментах:

package main

import (
    "fmt"
    "testing"
)

var fact int

func BenchmarkFactorial(b *testing.B) {
    for n := 0; n < b.N; n++ {
        fact = 1
        for i := 1; i <= 10; i++ {
            fact *= i
        }
    }
}
func BenchmarkFactorialBuffer(b *testing.B) {
    for n := 0; n < b.N; n++ {
        buffer := make([]int, 11)
        buffer[0] = 1
        for i := 1; i <= 10; i++ {
            buffer[i] = i * buffer[i-1]
        }
    }
    b.ReportAllocs()
}

func BenchmarkFactorialBufferLarge(b *testing.B) {
    for n := 0; n < b.N; n++ {
        buffer := make([]int, 100001)
        buffer[0] = 1
        for i := 1; i <= 100000; i++ {
            buffer[i] = i * buffer[i-1]
        }
    }
    b.ReportAllocs()
}

func main() {
    res := testing.Benchmark(BenchmarkFactorial)
    fmt.Println("BenchmarkFactorial", res)
    resmem := testing.Benchmark(BenchmarkFactorialBuffer)
    fmt.Println("BenchmarkFactorialBuffer", resmem, resmem.MemString())
    resmem = testing.Benchmark(BenchmarkFactorialBufferLarge)
    fmt.Println("BenchmarkFactorialBufferLarge", resmem, resmem.MemString())
}

Если вы запустите такую программу, вы можете получить следующий результат:

BenchmarkFactorial 90887572             14.10 ns/op
BenchmarkFactorialBuffer 88609930               11.96 ns/op        0 B/op              0 allocs/op
BenchmarkFactorialBufferLarge     4408      249263 ns/op   802816 B/op         1 allocs/op

Последняя функция выделяет 802816 байт на одну операцию. Eсли Go определит, что на данные не будет ссылок после возврата функции (этот процесс называется “escape analysis”), и если объем памяти достаточно мал, вместо кучи может быть использован стек. Но в последней функцией объем используемой памяти слишком велик, поэтому память выделяется на куче.

Измерение расхода памяти

Операционная система предоставляет память запущенному процессу в виде страниц(pages). Операционная система не может предоставлять память в меньших единицах, чем страница. Таким образом, если вы выделяете память в программе, это может либо не потребовать дополнительной памяти, если в ней уже достаточно страниц, либо заставить операционную систему предоставить больше страниц.

Размер страницы зависит от операционной системы и ее конфигурации. Чаще всего он может составлять от 4 до 16 килобайт, хотя возможны и гораздо большие страницы (например, 1 гигабайт).

Страница - это непрерывный массив адресов виртуальной памяти. Страница может также представлять собой реальную физическую память. Однако операционные системы обычно отображают в физическую память только используемые страницы. Операционная система может предоставлять процессу практически бесконечное количество страниц, никогда не сопоставляя их с физической памятью. Таким образом, определить, сколько памяти использует программа, не так-то просто. Может показаться, что программа использует много (виртуальной) памяти, но при этом не использует много физической памяти, и наоборот.

Размер страницы влияет как на производительность, так и на использование памяти. Выделение страниц процессу не является бесплатным, оно требует определенных усилий. Кроме всего прочего, операционная система не может просто повторно использовать страницу памяти, полученную от другого процесса, как есть. Это может представлять угрозу безопасности, поскольку вы можете получить косвенный доступ к данным, хранящимся в памяти другого процесса. Этот другой процесс мог хранить в памяти ваши пароли или другую конфиденциальную информацию. Как правило, операционная система должна инициализировать (например, установить в ноль) вновь назначенную страницу. Кроме того, сопоставление страниц с их реальной физической памятью также занимает время.

Чтобы ускорить процесс отображения, современные системы часто используют буфер ассоциативной трансляции(translation lookaside buffer, TLB) для хранения информации в кэше. Когда в буфере трансляции заканчивается место, может потребоваться ручное вычисление отображения страниц, что является дорогостоящим процессом. Таким образом, большие страницы могут повысить производительность некоторых программ. Однако большие страницы заставляют операционную систему предоставлять процессу память большими кусками, что может привести к нерациональному использованию драгоценной памяти.

Вы можете написать программу на Go, которая выведет размер страницы в вашей системе:

import (
    "fmt"
    "os"
)

func main() {
    pageSize := os.Getpagesize()
    fmt.Printf("Page size: %d bytes (%d KB)\n", pageSize, pageSize/1024)
}

С помощью Go относительно легко измерить количество страниц, выделенных операционной системой для программы. Тем не менее, необходимо соблюдать некоторую осторожность. Поскольку Go использует сборщик мусора для освобождения выделенной памяти, может возникнуть задержка между моментом, когда вам больше не нужна память, и фактическим освобождением памяти. Вы можете заставить Go немедленно вызвать сборщик мусора с помощью функции runtime.GC(). На практике вы редко должны намеренно запускать сборщик мусора, но для наших целей (измерение использования памяти) это полезно.

Существует несколько метрик памяти. В Go одними из самых полезных являются HeapSys и HeapAlloc. Первая показывает, сколько памяти (в байтах) было предоставлено программе операционной системой. Второе значение, которое обычно меньше, показывает, сколько этой памяти активно используется программой.

Следующая программа выделяет все большие, а затем все меньшие слайсы. Теоретически, использование памяти должно сначала увеличиваться, а затем уменьшаться:

package main

import (
    "fmt"
    "os"
    "runtime"
)

func main() {
    pageSize := os.Getpagesize()
    var m runtime.MemStats
    runtime.GC()
    runtime.ReadMemStats(&m)
    fmt.Printf(
        "HeapSys = %.3f MiB, HeapAlloc =  %.3f MiB,  %.3f pages\n",
        float64(m.HeapSys)/1024.0/1024.0,
        float64(m.HeapAlloc)/1024.0/1024.0,
        float64(m.HeapSys)/float64(pageSize),
    )
    i := 100
    for ; i < 1000000000; i *= 10 {
        runtime.GC()
        s := make([]byte, i)
        runtime.ReadMemStats(&m)
        fmt.Printf(
            "%.3f MiB, HeapSys = %.3f MiB, HeapAlloc =  %.3f MiB,  %.3f pages\n",
            float64(len(s))/1024.0/1024.0,
            float64(m.HeapSys)/1024.0/1024.0,
            float64(m.HeapAlloc)/1024.0/1024.0,
            float64(m.HeapSys)/float64(pageSize),
        )
    }
    for ; i >= 100; i /= 10 {
        runtime.GC()
        s := make([]byte, i)
        runtime.ReadMemStats(&m)
        fmt.Printf(
            "%.3f MiB, HeapSys = %.3f MiB, HeapAlloc =  %.3f MiB,  %.3f pages\n",
            float64(len(s))/1024.0/1024.0,
            float64(m.HeapSys)/1024.0/1024.0,
            float64(m.HeapAlloc)/1024.0/1024.0,
            float64(m.HeapSys)/float64(pageSize),
        )
    }
    runtime.GC()
    runtime.ReadMemStats(&m)
    fmt.Printf(
        "HeapSys = %.3f MiB, HeapAlloc =  %.3f MiB,  %.3f pages\n",
        float64(m.HeapSys)/1024.0/1024.0,
        float64(m.HeapAlloc)/1024.0/1024.0,
        float64(m.HeapSys)/float64(pageSize),
    )
}

Программа вызывает os.Getpagesize(), чтобы получить размер страницы памяти базовой системы в байтах в виде целого числа, и присваивает его переменной pageSize. Она объявляет переменную m типа runtime.MemStats, которая представляет собой структуру, содержащую различные статистические данные о аллокаторе памяти и сборщике мусора. Программа неоднократно вызывает runtime.GC(), чтобы вручную запустить цикл сборки мусора, который может освободить часть памяти и сделать ее доступной для освобождения. Она вызывает runtime.ReadMemStats(&m), чтобы заполнить переменную m текущей статистикой памяти. Мы можем повторно использовать одну и ту же переменную m из вызова в вызов. Цель этой программы - продемонстрировать, как изменяется использование памяти программой на Go в зависимости от объема и частоты выделений и деаллокаций памяти, а также как сборщик мусора и время выполнения влияют на освобождение памяти. Программа выводит данные об использовании памяти до и после каждого выделения и показывает, как значения m.HeapSys, m.HeapAlloc и m.HeapSys / pageSize растут и уменьшаются соответственно.

Если вы запустите эту программу, то сможете заметить, что она имеет тенденцию удерживать память, которую вы выделили и позже освободили. Отчасти это вопрос оптимизации: получение памяти занимает время, и мы хотим избежать отдачи страниц только для того, чтобы вскоре запросить их снова. Это иллюстрирует, что определить, сколько памяти использует программа, может быть непросто.

Программа может вывести что-то вроде следующего:

$ go run mem.go
HeapSys = 3.719 MiB, HeapAlloc =  0.367 MiB,  238.000 pages
0.000 MiB, HeapSys = 3.719 MiB, HeapAlloc =  0.367 MiB,  238.000 pages
0.001 MiB, HeapSys = 3.719 MiB, HeapAlloc =  0.383 MiB,  238.000 pages
0.010 MiB, HeapSys = 3.688 MiB, HeapAlloc =  0.414 MiB,  236.000 pages
0.095 MiB, HeapSys = 3.688 MiB, HeapAlloc =  0.477 MiB,  236.000 pages
0.954 MiB, HeapSys = 3.688 MiB, HeapAlloc =  1.336 MiB,  236.000 pages
9.537 MiB, HeapSys = 15.688 MiB, HeapAlloc =  9.914 MiB,  1004.000 pages
95.367 MiB, HeapSys = 111.688 MiB, HeapAlloc =  95.750 MiB,  7148.000 pages
953.674 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  954.055 MiB,  68332.000 pages
95.367 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  95.750 MiB,  68332.000 pages
9.537 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  9.914 MiB,  68332.000 pages
0.954 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  1.336 MiB,  68332.000 pages
0.095 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  0.477 MiB,  68332.000 pages
0.010 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  0.414 MiB,  68332.000 pages
0.001 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  0.383 MiB,  68332.000 pages
0.000 MiB, HeapSys = 1067.688 MiB, HeapAlloc =  0.375 MiB,  68332.000 pages
HeapSys = 1067.688 MiB, HeapAlloc =  0.375 MiB,  68332.000 pages

Обратите внимание, как в самом начале и в самом конце повторяется, что используется более трети мегабайта памяти (238 страниц). Более того, в самом конце программе остается выделено более 68 000 страниц, хотя ни одна структура данных не остается в области видимости main функции.

Inlining

Один из самых мощных приемов оптимизации, который может выполнить компилятор, - это инлайнинг функций: компилятор переносит некоторые из вызываемых функций непосредственно в вызывающие функции.

Go позволяет легко определить, какие функции инлайнятся. Мы также можем легко запросить, чтобы компиляция не инлайнилась, добавив строку //go:noinline прямо перед функцией.

Рассмотрим эту программу, содержащую два бенчмарка, в которых мы суммируем все нечетные целые числа в диапазоне.

package main

import (
    "fmt"
    "testing"
)

func IsOdd(i int) bool {
    return i%2 == 1
}

//go:noinline
func IsOddNoInline(i int) bool {
    return i%2 == 1
}

func BenchmarkCountOddInline(b *testing.B) {
    for n := 0; n < b.N; n++ {
        sum := 0
        for i := 1; i < 1000; i++ {
            if IsOdd(i) {
                sum += i
            }
        }
    }
}

func BenchmarkCountOddNoinline(b *testing.B) {
    for n := 0; n < b.N; n++ {
        sum := 0
        for i := 1; i < 1000; i++ {
            if IsOddNoInline(i) {
                sum += i
            }
        }
    }
}

func main() {
    res1 := testing.Benchmark(BenchmarkCountOddInline)
    fmt.Println("BenchmarkCountOddInline", res1)
    res2 := testing.Benchmark(BenchmarkCountOddNoinline)
    fmt.Println("BenchmarkCountOddNoinline", res2)
}

В Go флаг -gcflags=-m указывает компилятору на необходимость сообщать об основных оптимизациях, которые он выполняет. Если назвать эту программу simpleinline.go и скомпилировать ее командой go build -gcflags=-m simpleinline.go, то можно увидеть следующее:

$ go build -gcflags=-m simpleinline.go
./simpleinline.go:8:6: can inline IsOdd
./simpleinline.go:21:12: inlining call to IsOdd
...

Если вы запустите бенчмарк, то увидите, что инлайновая версия работает гораздо быстрее:

$ go run simpleinline.go
BenchmarkCountOddInline  3716786           294.6 ns/op
BenchmarkCountOddNoinline  1388792         864.8 ns/op

Инлайнинг не всегда полезен: в некоторых случаях он может генерировать большие двоичные файлы и даже замедлять работу программного обеспечения. Однако в тех случаях, когда он применим, он может иметь большой положительный эффект.

Go изо всех сил старается инлайнить функции, но у него есть ограничения. Например, компиляторы часто затрудняются встраивать рекурсивные функции. Рассмотрим две факториальные функции, одна из которых рекурсивная, а другая - нет.

package main

import (
    "fmt"
    "testing"
)

var array = make([]int, 1000)

func Factorial(n int) int {
    if n < 0 {
        return 0
    }
    if n == 0 {
        return 1
    }
    return n * Factorial(n-1)
}

func FactorialLoop(n int) int {
    result := 1
    for i := 1; i <= n; i++ {
        result *= i
    }
    return result
}

func BenchmarkFillNoinline(b *testing.B) {
    for n := 0; n < b.N; n++ {
        for i := 1; i < 1000; i++ {
            array[i] = Factorial(i)
        }
    }
}

func BenchmarkFillInline(b *testing.B) {
    for n := 0; n < b.N; n++ {
        for i := 1; i < 1000; i++ {
            array[i] = FactorialLoop(i)
        }
    }
}

func main() {
    res1 := testing.Benchmark(BenchmarkFillNoinline)
    fmt.Println("BenchmarkFillNoinline", res1)
    res2 := testing.Benchmark(BenchmarkFillInline)
    fmt.Println("BenchmarkFillInline", res2)
    fmt.Println(float64(res1.NsPerOp()) / float64(res2.NsPerOp()))
}

Хотя обе функции FactorialLoop и Factorial эквивалентны, выполняя эту программу, вы должны обнаружить, что нерекурсивная функция (FactorialLoop) работает гораздо быстрее.

Cache line

Наши компьютеры читают и записывают память, используя небольшие блоки памяти, называемые «cache lines». Длина кэш-линии обычно фиксирована и невелика (например, 64 или 128 байт). Чтобы попытаться измерить размер кэш-линии, мы можем использовать последовательное копирование(a strided copy). Из большого массива мы копируем каждые N байт в другой большой массив. Мы повторяем этот процесс N раз. Таким образом, если исходный массив содержит 1000 байт, мы всегда копируем 1024 байта, независимо от того, N = 1, N = 2, N = 4 или N = 8.

Когда N достаточно велико (скажем, N = 16), проблема должна быть по сути ограничена памятью: производительность ограничивается не количеством инструкций, а способностью системы загружать и хранить кэш-линии. Если N больше удвоенной кэш-линии, то я могу эффективно пропустить одну кэш-линию из двух. Если N меньше кэш-линии, то необходимо обращаться к каждой строке кэша. Вы ожидаете, что большая последовательность будет значительно быстрее.

Одно из ограничений этого подхода заключается в том, что процессоры могут извлекать больше кэш-линий, чем нужно, поэтому мы можем переоценить размер кэш-линии. Однако, если пропускная способность памяти не слишком велика, следует ожидать, что процессоры будут пытаться ограничить количество считываемых кэш-линии.

Давайте проведем эксперимент. Для каждого размера шага мы повторяем 10 раз и записываем максимальное, минимальное и среднее значение. Рассмотрим следующую программу.

package main

import (
    "fmt"
    "time"
)

const size = 33554432 // 32 MB
func Cpy(arr1 []uint8, arr2 []uint8, slice int) {
    for i := 0; i < len(arr1); i += slice {
        arr2[i] = arr1[i]
    }
}

func AverageMinMax(f func() float64) (float64, float64, float64) {
    var sum float64
    var minimum float64
    var maximum float64

    for i := 0; i < 10; i++ {
        arr1 = make([]uint8, size)
        arr2 = make([]uint8, size)

        v := f()
        sum += v
        if i == 0 || v < minimum {
            minimum = v
        }
        if i == 0 || v > maximum {
            maximum = v
        }
    }
    return sum / 10, minimum, maximum
}

var arr1 []uint8
var arr2 []uint8

func run(size int, slice int) float64 {
    start := time.Now()
    times := 10
    for i := 0; i < times*slice; i++ {
        Cpy(arr1, arr2, slice)
    }
    end := time.Now()
    dur := float64(end.Sub(start)) / float64(times*slice)
    return dur
}

func main() {
    for slice := 16; slice <= 4096; slice *= 2 {
        a, m, M := AverageMinMax(func() float64 { return run(size, slice-1) })
        fmt.Printf("%10d: %10.1f GB/s [%4.1f - %4.1f]\n", slice-1, float64(size)/a, float64(size)/M, float64(size)/m)
    }
}

Мы можем получить следующий результат:

$ go run cacheline.go                                  1
        15:       23.6 GB/s [21.3 - 24.4]
        31:       24.3 GB/s [23.8 - 24.5]
        63:       24.2 GB/s [23.6 - 24.6]
       127:       26.9 GB/s [23.8 - 27.9]
       255:       40.8 GB/s [37.8 - 43.6]
       511:      162.0 GB/s [130.4 - 203.4]
      1023:      710.0 GB/s [652.0 - 744.4]
      2047:      976.1 GB/s [967.1 - 983.8]
      4095:     1247.4 GB/s [1147.7 - 1267.0]

Мы видим, что производительность существенно возрастает, когда длина строки увеличивается от 127 до 255. Это говорит о том, что строка кэш-линии 128 байт. Если вы запустите этот же бенчмарк на своей системе, вы можете получить другой результат.

Полученные результаты следует интерпретировать с осторожностью: мы не оцениваем скорость копирования в 1247,4 ГБ/с. Скорее, мы можем копировать большие массивы с такой скоростью, если мы копируем только один байт из каждых 4095 байт.

CPU Cache

При программировании мы часто не думаем непосредственно о памяти. Когда мы задумываемся о том, что наши данные используют память, мы часто считаем ее однородной: память - это как большой однородный холст, на который компьютер записывает и считывает данные. Однако основная память (ОЗУ) обычно буферизируется с помощью небольшого объема памяти, расположенного рядом с ядром процессора (кэш процессора). Часто мы имеем несколько уровней кэш-памяти (например, L1, L2, L3): L1, как правило, небольшой, но очень быстрый, в то время как, например, L3 больше, но медленнее.

Вы можете эмпирически измерить влияние кэша. Если вы возьмете небольшой массив и перетасуете его случайным образом, то будете перемещать данные в основном в кэше процессора, что происходит быстро. Если вы возьмете массив большего размера, то будете перемещать данные в памяти без особой помощи кэша, а этот процесс гораздо медленнее. Таким образом, перемешивание все более крупных массивов - это способ определить размер кэша. Может оказаться сложным точно определить, сколько у вас слоев кэша и каков размер каждого из них. Однако обычно можно определить, что ваш массив значительно больше, чем кэш процессора.

Мы напишем функцию случайной перетасовки: Shuffle(arr []uint32). Она использует алгоритм под названием Fisher-Yates shuffle, который заключается в обратном проходе по массиву и замене каждого элемента другим, случайно выбранным из предшествующих ему. Функция использует переменную seed для генерации случайных чисел по математической формуле. Для наших целей мы используем упрощенный генератор чисел: умножаем seed на индекс. Функция bits.Mul64 вычисляет произведение двух 64-битных чисел и возвращает результат в виде двух 32-битных чисел: наиболее значимого (hi) и наименее значимого. Наиболее значимое значение обязательно находится в диапазоне от 0 до i (включительно). Мы используем это наиболее значимое значение в качестве случайного индекса. Затем функция меняет элементы местами, используя множественное присваивание. Мы вызываем функцию shuffle несколько раз, на входах разного размера. Мы показываем время, нормированное на размер входных данных.

package main

import (
    "fmt"
    "math/bits"
    "time"
)

func Shuffle(arr []uint32) {
    seed := uint64(1234)
    for i := len(arr) - 1; i > 0; i-- {
        seed += 0x9E3779B97F4A7C15
        hi, _ := bits.Mul64(seed, uint64(i+1))
        j := int(hi)
        arr[i], arr[j] = arr[j], arr[i]
    }
}

func AverageMinMax(f func() float64) (float64, float64, float64) {
    var sum float64
    var minimum float64
    var maximum float64

    for i := 0; i < 10; i++ {
        v := f()
        sum += v
        if i == 0 || v < minimum {
            minimum = v
        }
        if i == 0 || v > maximum {
            maximum = v
        }
    }
    return sum / 10, minimum, maximum
}

func run(size int) float64 {
    arr := make([]uint32, size)

    for i := range arr {
        arr[i] = uint32(i + 1)
    }
    start := time.Now()
    end := time.Now()
    times := 0
    for ; end.Sub(start) < 100_000_000; times++ {
        Shuffle(arr)
        end = time.Now()
    }
    dur := float64(end.Sub(start)) / float64(times)
    return dur / float64(size)
}

func main() {
    for size := 4096; size <= 33554432; size *= 2 {
        fmt.Printf("%20d KB ", size/1024*4)
        a, m, M := AverageMinMax(func() float64 { return run(size) })
        fmt.Printf(" %.2f [%.2f, %.2f]\n", a, m, M)
    }
}

Возможный результат выполнения этой программы может быть следующим:

⚡  go run cache.go 
                  16 KB  0.70 [0.66, 0.93]
                  32 KB  0.65 [0.64, 0.66]
                  64 KB  0.64 [0.64, 0.66]
                 128 KB  0.64 [0.64, 0.67]
                 256 KB  0.65 [0.64, 0.66]
                 512 KB  0.70 [0.70, 0.71]
                1024 KB  0.77 [0.76, 0.79]
                2048 KB  0.83 [0.82, 0.84]
                4096 KB  0.87 [0.86, 0.90]
                8192 KB  0.92 [0.91, 0.95]
               16384 KB  1.10 [1.06, 1.24]
               32768 KB  2.34 [2.28, 2.52]
               65536 KB  3.90 [3.70, 4.25]
              131072 KB  5.66 [4.80, 9.78]

Мы видим, что в диапазоне от 16 КБ до 16384 КБ время перетасовки одного элемента практически не увеличивается, даже если мы многократно удваиваем размер входных данных. Однако между 16384 КБ и 32768 КБ время перетасовки одного элемента удваивается. И далее оно последовательно удваивается при каждом удвоении размера массива. Это говорит о том, что размер кэша процессора в данном случае составляет около 16384 КБ.

Пропускная способность памяти

Считывать и записывать данные в память можно только с максимальной скоростью. Измерить такие пределы бывает непросто. В частности, для достижения оптимальной памяти вам может понадобиться несколько ядер в многоядерной системе. Для простоты рассмотрим максимальную скорость чтения.

Многие крупные системы не имеют единой пропускной способности. Например, многие крупные системы работают на основе NUMA: NUMA означает Non-Uniform Memory Access. В системе NUMA у каждого процессора есть своя локальная память, к которой он может обращаться быстрее, чем к памяти других процессоров.

Пропускная способность также в некоторой степени зависит от объема запрашиваемой памяти. Если память помещается в кэш процессора, то только первый доступ может быть дорогим. Очень большая область памяти может не поместиться в оперативной памяти и потребовать хранения на диске. Даже если она поместится в оперативной памяти, слишком большая область памяти может потребовать много страниц памяти, и обращение ко всем из них может вызвать хождение по страницам из-за ограничений буфер ассоциативной трансляции(translation lookaside buffer, TLB).

Если доступ к памяти осуществляется в случайных местах, системе будет сложно поддерживать максимальную пропускную способность, поскольку она не сможет легко предсказать, где произойдет следующая загрузка памяти. Чтобы получить максимальную пропускную способность, доступ к памяти должен быть линейным или по какой-то предсказуемой схеме.

Рассмотрим следующий код:

package main

import (
    "fmt"
    "time"
)

func run() float64 {
    bestbandwidth := 0.0
    arr := make([]uint8, 2*1024*1024*1024) // 4 GB
    for i := 0; i < len(arr); i++ {
        arr[i] = 1
    }
    for t := 0; t < 20; t++ {
        start := time.Now()
        acc := 0
        for i := 0; i < len(arr); i += 64 {
            acc += int(arr[i])
        }
        end := time.Now()
        if acc != len(arr)/64 {
            panic("!!!")
        }
        bandwidth := float64(len(arr)) / end.Sub(start).Seconds() / 1024 / 1024 / 1024
        if bandwidth > bestbandwidth {
            bestbandwidth = bandwidth
        }
    }
    return bestbandwidth
}

func main() {
    for i := 0; i < 10; i++ {
        fmt.Printf(" %.2f GB/s\n", run())
    }
}

В коде определены две функции: run и main. Функция main является точкой входа в программу, она вызывает функцию run 10 раз, каждый раз выводя результат. Функция run - это пользовательская функция, которая измеряет пропускную способность памяти системы. Для этого она выполняет следующие действия:

Она объявляет переменную bestbandwidth и инициализирует ее значением 0.0. В этой переменной хранится наибольшее значение пропускной способности, полученное во время выполнения функции. Создается слайс байтов (uint8) под названием arr, длина которого эквивалентна 4ГБ. Этот слайс инициализируется 1s. Цикл будет обращаться только к каждому 64-му элементу слайса, пропуская остальные. Учитывая, что в большинстве систем размер кэш-строки составляет 64 байта или более, достаточно обратиться к каждой строке кэша. Код рассчитывает пропускную способность, деля размер слайса (в байтах) на разницу между временем окончания и начала (в секундах), а затем деля на 1024 три раза, чтобы преобразовать результат в гигабайты в секунду (ГБ/с). Код повторяет измерение 20 раз и возвращает лучший результат, чтобы учесть возможные колебания производительности системы. Код распечатывает результат 10 раз, чтобы показать согласованность измерений.

Задержка памяти и параллелизм

Задержка(latency) часто описывается как временная задержка между началом запроса и моментом, когда вас обслуживают. Таким образом, если вы идете в ресторан, задержка, которая вас может интересовать, - это время, которое пройдет, прежде чем вы сможете начать есть. Задержка отличается от пропускной способности(throughput): ресторан может обслуживать сотни клиентов одновременно, но при этом иметь высокую задержку (длительные задержки для каждого клиента). Если вы поместите большое количество данных на очень большой диск, вы можете положить этот диск в грузовик и проехать на нем между двумя городами. Это может представлять собой большую пропускную способность (за единицу времени перемещается много данных), но задержка может быть весьма значительной(часы). Точно так же вы можете посветить лазером на своего партнера, когда ужин будет готов: информация может прийти без задержки, даже если вы находитесь очень далеко, но вы передаете мало информации (низкая пропускная способность).

Один из способов выразить этот компромисс между задержкой и пропускной способностью - закон Литтла: L = λW, где L - среднее количество элементов в системе, λ - пропускная способность (долгосрочная средняя скорость поступления новых элементов), а W - задержка, или среднее количество времени, которое элементы проводят в ожидании. Таким образом, если вы хотите, чтобы в вашем ресторане постоянно находилось L клиентов, а их прибывает все меньше, вам следует обслуживать клиентов с большими задержками. И так далее. Закон Литтла работает и с нашими подсистемами памяти: компьютеры могут выдержать максимальное количество запросов к памяти, каждый запрос имеет задержкуи и общую пропускную способность. Если задержка не улучшится, мы все равно можем улучшить пропускную способность, увеличив количество запросов, которые могут обрабатываться одновременно.

К сожалению, разработчики систем часто вынуждены делать такой выбор, поэтому нередко можно наблюдать застой или ухудшение задержек памяти, несмотря на быстрое улучшение ее пропускной способности. Распространенной иллюстрацией концепции задержки памяти является обход связанного списка. В информатике связный список - это структура данных, состоящая из узлов, каждый из которых связан (указателем) со следующим узлом. Узлы могут быть расположены в памяти не последовательно, но даже если это так, доступ к каждому следующему узлу требует, по крайней мере, небольшой задержки. На современных процессорах загрузка данных из памяти часто занимает не менее 3 тактов, даже если память находится в кэше. Таким образом, определение длины списка путем обхода всего связного списка может занять время, причем большая часть этого времени складывается из последовательных задержек. Следующий код показывает время, необходимое для обхода связанного списка, состоящего из миллиона узлов. Хотя это время зависит от вашей системы, оно может составлять значительные доли миллисекунды.

package main

import (
    "fmt"
    "testing"
)

type Node struct {
    data int
    next *Node
}

func build(volume int) *Node {
    var head *Node
    for i := 0; i < volume; i++ {
        head = &Node{i, head}
    }
    return head
}

var list *Node
var N int

func BenchmarkLen(b *testing.B) {
    for n := 0; n < b.N; n++ {
        len := 0
        for p := list; p != nil; p = p.next {
            len++
        }
        if len != N {
            b.Fatalf("invalid length: %d", len)
        }
    }
}

func main() {
    N = 1000000
    list = build(N)
    res := testing.Benchmark(BenchmarkLen)
    fmt.Println("milliseconds: ", float64(res.NsPerOp())/1e6)

    fmt.Println("nanoseconds per el.", float64(res.NsPerOp())/float64(N))
}

В этом коде определена структура Node с двумя полями: data - целое число, представляющее значение, хранящееся в узле, next - указатель на следующий узел в связанном списке. Можно также добавить указатель на предыдущий узел, но в нашем случае в этом нет необходимости. Функция build создает односвязный список узлов из целочисленного volume в качестве аргумента. Этот код демонстрирует, как создать связанный список, вычислить его длину и проверить производительность вычисления длины. Наши вычисления почти полностью ограничены (лимитированы) задержкой памяти, временем, которое требуется для доступа к памяти. Вычисления, которые мы выполняем (сравнения, инкременты), не имеют значения для производительности. Чтобы проиллюстрировать наше наблюдение, мы можем попробовать обойти два связанных списка одновременно, как в этом примере:

package main

import (
    "fmt"
    "testing"
)

type Node struct {
    data int
    next *Node
}

func build(volume int) *Node {
    var head *Node
    for i := 0; i < volume; i++ {
        head = &Node{i, head}
    }
    return head
}

var list1 *Node
var list2 *Node

var N int

func BenchmarkLen(b *testing.B) {
    for n := 0; n < b.N; n++ {
        len := 0
        for p1, p2 := list1, list2; p1 != nil && p2 != nil; p1, p2 = p1.next, p2.next {
            len++
        }
        if len != N {
            b.Fatalf("invalid length: %d", len)
        }
    }
}

func main() {
    N = 1000000
    list1 = build(N)
    list2 = build(N)

    res := testing.Benchmark(BenchmarkLen)
    fmt.Println("milliseconds: ", float64(res.NsPerOp())/1e6)

    fmt.Println("nanoseconds per el.", float64(res.NsPerOp())/float64(N))
}

Если вы запустите этот новый код, то обнаружите, что результаты бенчмарка близки к односписочным. Это неудивительно: процессор в основном просто ждет следующего узла, а ожидание двух узлов не намного дороже. По этой причине при программировании следует как можно больше ограничивать обращения к памяти. Используйте простые массивы, когда это возможно, вместо связных списков или древовидных структур на основе узлов. Мы хотели бы работать с произвольно большими структурами данных, чтобы напрячь доступ к памяти вне кэша. Алгоритм Саттоло - это вариант хорошо известной случайной перестановки, которая генерирует случайную циклическую перестановку массива или списка. Алгоритм Саттоло обеспечивает перестановку данных за один цикл. То есть, начиная с одного элемента в списке размера n, мы обнаруживаем, что этот элемент перемещается на другую позицию, которая сама перемещается на другую позицию, и так далее, пока после n перемещений мы не вернемся на нашу начальную позицию. Чтобы применить алгоритм Саттоло, задав массив или список элементов, мы начинаем с индекса i от 0 до n-1, где n - длина массива. Для каждого индекса i мы выбираем случайный индекс j такой, что i < j < n. Мы меняем местами элементы в индексах i и j. Например, предположим, что у нас есть массив [0, 1, 2, 3, 4]. Алгоритм может выдать циклическую перестановку типа [2, 0, 3, 1, 4]. С помощью этого алгоритма мы можем посетить все значения в массиве ровно один раз в случайном порядке. Из массива с индексами от 0 до n-1, перестановленного по алгоритму Саттоло, мы сначала загружаем первый элемент, считываем его значение, переходим на соответствующий индекс и так далее. После n операций мы должны вернуться на исходную позицию. Поскольку каждая операция связана с загрузкой памяти, она ограничена задержкой памяти. Мы можем попытаться работать быстрее с помощью параллелизма на уровне памяти: мы можем выбрать k позиций, разбросанных по циклу, и перемещаться из этих k начальных позиций n/k раз в течение цикла. Поскольку компьютеры могут загружать множество значений параллельно, этот алгоритм должен быть быстрее при больших значениях k. Однако с увеличением k мы можем увидеть все меньший выигрыш, поскольку системы имеют ограниченный параллелизм на уровне памяти и пропускную способность.

package main

import (
    "fmt"
    "math/rand"
    "time"
)

// makeCycle creates a cycle of a specified length starting at element 0
func makeCycle(length int) ([]uint64, []uint64) {
    array := make([]uint64, length)
    index := make([]uint64, length)
    // Create a cycle of maximum length within the big array
    for i := 0; i < length; i++ {
        array[i] = uint64(i)
    }

    // Sattolo shuffle
    for i := 0; i+1 < length; i++ {
        swapIdx := rand.Intn(length-i-1) + i + 1
        array[i], array[swapIdx] = array[swapIdx], array[i]
    }

    total := 0
    cur := uint64(0)
    for cur != 0 {
        index[total] = cur
        total++
        cur = array[cur]
    }
    return array, index
}

// setupPointers sets up pointers based on the given index
func setupPointers(index []uint64, length, mlp int) []uint64 {
    sp := make([]uint64, mlp)
    sp[0] = 0

    totalInc := 0
    for m := 1; m < mlp; m++ {
        totalInc += length / mlp
        sp[m] = index[totalInc]
    }
    return sp
}

func runBench(array []uint64, index []uint64, mlp int) time.Duration {
    length := len(array)
    sp := setupPointers(index, length, mlp)
    hits := length / mlp
    before := time.Now()
    for i := 0; i < hits; i++ {
        for m := 0; m < mlp; m++ {
            sp[m] = array[sp[m]]
        }
    }
    after := time.Now()
    return after.Sub(before)
}

func main() {
    const length = 100000000
    array, index := makeCycle(length)
    fmt.Println("Length:", length*8/1024/1024, "MB")
    base := runBench(array, index, 1)
    fmt.Println("Lanes:", 1, "Time:", base)

    for mlp := 2; mlp <= 40; mlp++ {
        t := runBench(array, index, mlp)
        fmt.Println("Lanes:", mlp, "Speedup:", fmt.Sprintf("%.1f", float64(base)/float64(t)))
    }
}

В целом, этот код генерирует цикл заданной длины, устанавливает указатели и оценивает время выполнения для разного количества дорожек. Основной целью является изучение распараллеливания с использованием нескольких дорожек. Функция runBench имитирует параллельное выполнение путем одновременного обновления указателей. Ускорение рассчитывается путем сравнения времени выполнения для разного количества дорожек. Чем больше ускорение, тем эффективнее параллельное выполнение на уровне памяти. Общий принцип заключается в том, что производительность системы с высокими задержками часто можно повысить за счет разрыва зависимостей между данными. Вместо того чтобы помещать все данные в длинную цепочку, постарайтесь обойтись без цепочек вообще или, если цепочки все-таки есть, используйте несколько небольших цепочек.

Суперскалярность и зависимость от данных

Большинство современных процессоров являются суперскалярными (в отличие от «скалярных»), то есть они могут выполнять(execute) и удалять(retire) несколько инструкций за один цикл работы процессора. То есть, даже если у вас одно ядро процессора, в нем присутствует значительный параллелизм. Некоторые процессоры могут удалять 8 инструкций за цикл или даже больше. Не все программы одинаково выигрывают от суперскалярного исполнения. Несколько факторов могут ограничивать производительность процессоров несколькими инструкциями за цикл. Одним из таких факторов является необходимость ожидания доступа к памяти. Другой распространенный фактор - зависимость от данных: когда следующая инструкция зависит от результата предыдущей… может потребоваться ожидание перед началом выполнения. В качестве иллюстрации рассмотрим функции, которые вычисляют последовательные разности между элементами массива (например, при заданных 5,7,6 вы можете получить начальное значение 5, затем 2 и -1), и обратную операцию, которая суммирует все разности для восстановления исходного значения. Вы можете реализовать эти функции следующим образом:

func successiveDifferences(arr []int) {
    base := arr[0]
    for i := 1; i < len(arr); i++ {
        base, arr[i] = arr[i], arr[i]-base
    }
}

func prefixSum(arr []int) {
    for i := 1; i < len(arr); i++ {
        arr[i] = arr[i] + arr[i-1]
    }
}

Если предположить, что компилятор не оптимизирует эти функции нетривиальным образом (например, с помощью SIMD-инструкций), мы можем относительно просто рассуждать о производительности. Для последовательных разностей нам требуется примерно одно вычитание на элемент массива. Для префиксной суммы требуется примерно одно сложение на элемент массива. С первого взгляда это выглядит довольно похоже. Однако зависимость от данных разная. Чтобы вычислить разность между любыми двумя значениями в массиве, не обязательно вычислять предыдущие разности. Однако префиксная сумма, как мы ее реализовали, требует вычисления всех предыдущих сумм, прежде чем будет вычислена следующая. Давайте напишем небольшую бенчмарк-программу, чтобы проверить разницу в производительности:

package main

import (
    "fmt"
    "math/rand"
    "testing"
)

func successiveDifferences(arr []int) {
    base := arr[0]
    for i := 1; i < len(arr); i++ {
        base, arr[i] = arr[i], arr[i]-base
    }
}

func prefixSum(arr []int) {
    for i := 1; i < len(arr); i++ {
        arr[i] = arr[i] + arr[i-1]
    }
}

var array []int

func BenchmarkPrefixSum(b *testing.B) {
    for n := 0; n < b.N; n++ {
        prefixSum(array)
    }
}

func BenchmarkSuccessiveDifferences(b *testing.B) {
    for n := 0; n < b.N; n++ {
        successiveDifferences(array)
    }
}

func main() {
    array = make([]int, 100)
    for i := range array {
        array[i] = rand.Int()
    }
    res2 := testing.Benchmark(BenchmarkSuccessiveDifferences)
    fmt.Println("BenchmarkSuccessiveDifferences", res2)
    res1 := testing.Benchmark(BenchmarkPrefixSum)
    fmt.Println("BenchmarkPrefixSum", res1)

}

Результат будет зависеть от вашей системы. Однако не стоит удивляться, если префиксная сумма займет больше времени. На системе Apple мы получили следующие результаты:

BenchmarkSuccessiveDifferences 39742334         30.04 ns/op
BenchmarkPrefixSum  8307944            142.8 ns/op

Префиксная сумма может быть в несколько раз медленнее, хотя на первый взгляд кажется, что она должна использовать сопоставимое количество инструкций. В общем, нельзя доверять поспешному анализу. Если кажется, что две функции выполняют одинаковый объем работы, это не значит, что они одинаково производительны. Необходимо учитывать несколько факторов, в том числе зависимость от данных.

Branch prediction

Поскольку процессоры мультискалярные, они были разработаны для спекулятивного выполнения: сталкиваясь с ветвлением, процессор пытается угадать направление, в котором оно будет выполнено, и начинает оптимистичные вычисления. Когда процессор делает правильное предсказание, он обычно повышает производительность, иногда на значительную величину. Однако, когда процессор не может точно предсказать ветвление, предсказание может стать чистым негативом. Действительно, если ветвь предсказана неверно, процессору приходится заново начинать вычисления с того места, где он сделал неверное предсказание, а это дорогостоящий процесс, который может отнять несколько циклов процессора. Для иллюстрации рассмотрим функцию, которая копирует содержимое одного фрагмента в другой фрагмент того же размера:

func Copy(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        dest[i] = v
    }
}

Более сложная функция может копировать только нечетные элементы:

func CopyOdd(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        if v&1 == 1 {
            dest[i] = v
        }
    }
}

Мы можем попытаться скопировать массив, содержащий случайные целые числа (как четные, так и нечетные), только нечетные или только четные целые числа. Это иллюстрирует следующая программа:

package main

import (
    "fmt"
    "math/rand"
    "testing"
)

func Copy(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        dest[i] = v
    }
}

func CopyOdd(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        if v&1 == 1 {
            dest[i] = v
        }
    }
}

var array []uint
var dest []uint

func BenchmarkCopyOdd(b *testing.B) {
    for n := 0; n < b.N; n++ {
        CopyOdd(dest, array)
    }
}

func BenchmarkCopy(b *testing.B) {
    for n := 0; n < b.N; n++ {
        Copy(dest, array)
    }
}

func main() {
    array = make([]uint, 10000)
    dest = make([]uint, len(array))

    for i := range array {
        array[i] = uint(rand.Uint32())
    }
    res0 := testing.Benchmark(BenchmarkCopy)
    fmt.Println("BenchmarkCopy (random)", res0)
    res1 := testing.Benchmark(BenchmarkCopyOdd)
    fmt.Println("BenchmarkCopyOdd (random)", res1)
    for i := range array {
        array[i] = uint(rand.Uint32()) | 1
    }
    res2 := testing.Benchmark(BenchmarkCopyOdd)
    fmt.Println("BenchmarkCopyOdd (odd data)", res2)
    for i := range array {
        array[i] = uint(rand.Uint32()) &^ 1
    }
    res3 := testing.Benchmark(BenchmarkCopyOdd)
    fmt.Println("BenchmarkCopyOdd (even data)", res3)
}

На системе Apple мы получили следующие результаты:

BenchmarkCopy (random)   414158       2936 ns/op
BenchmarkCopyOdd (random)    55408           19518 ns/op
BenchmarkCopyOdd (odd data)   402670          2975 ns/op
BenchmarkCopyOdd (even data)   402738         2896 ns/op

В последних трех таймингах используется одна и та же функция, различаются только входные данные. Мы видим, что все тайминги в этом случае похожи, за исключением бенчмарка, копирующего случайные данные: он в наших тестах работает в несколько раз медленнее. Значительно большее время работы связано с наличием непредсказуемой ветви во внутреннем цикле. Обратите внимание, что одна и та же функция при одинаковом объеме данных может иметь совершенно разные характеристики производительности, даже если вычислительная сложность функции не меняется: во всех случаях мы имеем линейную временную сложность. Если мы ожидаем, что наши данные приведут к плохому предсказанию ветвлений, мы можем уменьшить количество ветвлений в коде. Полученный код может быть почти без ветвлений или вообще без ветвлений. Например, мы можем использовать арифметическое и логическое выражение для замены копии условия:

func CopyOddBranchless(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        dest[i] ^= uint(-(v & 1)) & (v ^ dest[i])
    }
}

Рассмотрим сложное выражение:

  • v & 1: Эта операция проверяет, установлен ли младший бит v (то есть, является ли v нечетным).
  • -(v & 1): Это отрицает результат предыдущей операции. Если v нечетно, то результат становится -1, в противном случае -0. Однако -1 как беззнаковое целое число(unsigned integer) становится максимальным значением - тем, у которого все биты установлены в 1.
  • v ^ dest[i]: При этом происходит XOR значения v с соответствующим элементом в слайсе dest.
  • uint(-(v & 1)) & (v ^ dest[i]): Если v нечетно, возвращается XOR v с dest[i]; в противном случае возвращается 0.
  • Наконец, dest[i] ^= uint(-(v & 1)) & (v ^ dest[i]) оставляет dest[i] неизменным, если v четное, иначе заменяет на v, используя тот факт, что dest[i] ^ (v ^ dest[i]) == v.

Мы можем использовать эту функцию в бенчмарке:

package main

import (
    "fmt"
    "math/rand"
    "testing"
)

func CopyOdd(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        if v&1 == 1 {
            dest[i] = v
        }
    }
}

func CopyOddBranchless(dest []uint, arr []uint) {
    if len(dest) < len(arr) {
        panic("dest is too small")
    }
    for i, v := range arr {
        dest[i] ^= uint(-(v & 1)) & (v ^ dest[i])
    }
}

var array []uint
var dest []uint

func BenchmarkCopyOdd(b *testing.B) {
    for n := 0; n < b.N; n++ {
        CopyOdd(dest, array)
    }
}

func BenchmarkCopyOddBranchless(b *testing.B) {
    for n := 0; n < b.N; n++ {
        CopyOddBranchless(dest, array)
    }
}
func main() {
    array = make([]uint, 10000)
    dest = make([]uint, len(array))
    for i := range array {
        array[i] = uint(rand.Uint32())
    }
    res1 := testing.Benchmark(BenchmarkCopyOdd)
    fmt.Println("BenchmarkCopyOdd (random)", res1)
    res2 := testing.Benchmark(BenchmarkCopyOddBranchless)
    fmt.Println("BenchmarkCopyOddBranchless (random)", res2)
}

На системе Apple мы получили:

BenchmarkCopyOdd (random)    60782           19254 ns/op
BenchmarkCopyOddBranchless (random)   166863          7124 ns/op

В этом тесте безветвистый подход оказался намного быстрее. Следует подчеркнуть, что не всегда код без ветвления быстрее. На самом деле, по результатам нашего общего тестирования, функция без ветвлений значительно медленнее исходной, когда результаты предсказуемы (например, 2896 ns/op против 7124 ns/op). В реальном программном обеспечении вы должны попытаться распознать плохо предсказанные ветви и действовать в этих случаях, чтобы понять, может ли безветвистый подход быть быстрее. К счастью, на практике в большинстве проектов большинство ветвей хорошо предсказуемы.


Комментарии в Telegram-группе!