Когда одна служба синхронно вызывает другую, всегда существует вероятность того, что другая служба недоступна или имеет такую высокую задержку, что она практически непригодна.
Драгоценные ресурсы, такие как потоки, могут быть использованы вызывающей стороной в ожидании ответа другой службы. Это может привести к исчерпанию ресурсов, из-за чего вызывающая служба не сможет обрабатывать другие запросы.
Сбой одного сервиса может потенциально каскадироваться с другими сервисами по всему приложению.
Stop doing it if it hurts
Клиент должен вызывать удаленный сервис через прокси-сервер, который функционирует аналогично автоматическому выключателю.
Когда количество последовательных сбоев пересекает пороговое значение, автоматический выключатель отключается, и в течение периода времени ожидания все попытки вызвать удаленную службу немедленно завершатся неудачей. После истечения времени ожидания автоматический выключатель пропускает ограниченное количество тестовых запросов. Если эти запросы выполнены успешно, выключатель возобновляет нормальную работу. В случае сбоя период ожидания начинается снова.
Circuit Breaker выступает как прокси-сервис между приложением и удаленным сервисом.
Circuit Breaker предотвращает попытки приложения выполнить операцию, которая скорее всего завершится неудачно, что позволяет продолжить работу дальше не тратя важные ресурсы, пока известно, что проблема не устранена. Приложение должно быстро принять сбой операции и обработать его.
В отличии от паттерна Retry, Circuit Breaker рассчитан на менее ожидаемые ошибки, которые могут длиться намного дольше: обрыв сети, отказ сервиса, оборудования…
В этих ситуациях при повторной попытке отправить аналогичный запрос с большой долей вероятности мы получим аналогичную ошибку.
3 состояния
Closed
Запрос от приложения направляется напрямую к сервису.
Счетчик ошибок = 0 и приложение спокойно функционирует.
Circuit Breaker увеличивает счетчик ошибок, если операция завершилась не успешно.
Если количество ошибок за некоторый промежуток времени превышает заранее заданный порог значений, то Circuit Breaker переходит в состояние Open и запускает таймер времени ожидания.
Когда таймер истекает, он переходит в состояние Half-Open.
Назначение таймера — дать сервису время для решения проблемы, прежде чем разрешить приложению попытаться выполнить операцию еще раз.
Open
Запрос от приложения немедленно завершает с ошибкой и исключение возвращается в приложение.
Half-Open
Ограниченному количеству запросов от приложения разрешено обратиться к сервису. Если эти запросы успешны, то считаем что предыдущая ошибка исправлена и прокси-сервис переходит в состояние Closed (счетчик ошибок сбрасывается на 0).
Если любой из запросов завершился ошибкой, то считается, что ошибка все еще присутствует, тогда прокси-сервис возвращается в состояние Open и перезапускает таймер, чтобы дать системе дополнительное время на восстановление после сбоя.
Состояние Half-Open помогает предотвратить быстрый рост запросов к сервису. Т.к. после начала работы сервиса, некоторое время он может быть способен обрабатывать ограниченное число запросов до полного восстановления.
Приложение, вызывающее операцию через Circuit Breaker, должно быть подготовлено к обработке исключений, возникающих, если операция недоступна.
Способ обработки исключения будет зависеть от приложения. Например, приложение может временно понизить функциональность, вызвать альтернативную операцию для выполнения той же задачи или получения тех же данных или сообщить об исключении пользователю и попросить его повторить попытку позже.
Пишем код
Circuit Breaker является небольшой стейт-машиной.
3 состояния:
type State int
const (
StateClosed State = iota
StateOpen
StateHalfOpen
)
Структура которая будет хранить кол-во запросов, ошибок и пр.:
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
Circuit Breaker:
type CircuitBreaker struct {
maxRequests uint32
timeout time.Duration
readyToTrip func(counts Counts) bool
state State
counts Counts
expiry time.Time
}
maxRequests
— максимальное кол-во запросов которые может пропустить через себя Circuit Breaker пока находится в состоянии Half-Open.
timeout
— период нахождения Circuit Breaker в состоянии Open до перехода в Half-Open.
readyToTrip
— стратегия перехода из состояния Closed в Open.
Например, если было больше 5 ошибок подряд:
func defaultReadyToTrip(counts Counts) bool {
return counts.ConsecutiveFailures > 5
}
Дальше нам нужна функция которую будем вызывать когда запрос прошел успешно:
func (cb *CircuitBreaker) onSuccess(state State) {
switch state {
case StateClosed: // (1)
cb.counts.onSuccess()
case StateHalfOpen: // (2)
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed)
}
}
}
-
При состоянии Closed ничего не меняется. Просто увеличиваем счетчики.
-
Если состояния Circuit Breaker Half Open и кол-во последовательно успешных запросов больше или равна установленному значению — это значит сервис восстановил стабильную работу и можно переходить в штатный режим. Меняем состояния на Closed.
Функция которую будем вызывать когда запрос вернул ошибку:
func (cb *CircuitBreaker) onFailure(state State) {
switch state {
case StateClosed: // (1)
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.expiry = time.Now().Add(cb.timeout)
cb.setState(StateOpen)
}
case StateHalfOpen: // (2)
cb.expiry = time.Now().Add(cb.timeout)
cb.setState(StateOpen)
}
}
-
При ошибки увеличиваем счетчики и проверяем нашу стратегию перехода из состояния Closed в Open. Если запросы необходимо прекратить, тогда запускаем таймер и переходим в состояние Open.
-
Ошибка в состоянии Half Open может означать, что сервис восстановился не до конца и пока нужно прекратить отправлять запросы.
Осталось отправлять запросы:
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
// before
if cb.state == StateOpen && cb.expiry.Before(time.Now()) { // (1)
cb.expiry = time.Time{} // zero time
cb.setState(StateHalfOpen)
}
if cb.state == StateOpen { // (2)
return nil, ErrOpenState
} else if cb.state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests { // (3)
return nil, ErrTooManyRequests
}
cb.counts.onRequest()
// execute
result, err := req()
// after
if err != nil {
cb.onFailure(cb.state)
} else {
cb.onSuccess(cb.state)
}
return result, err
}
- Если состояние Open и таймер истек, то переходим в Helf Open
- Если состояние Open, то возвращаем ошибку
- При Half Open проверяем лимит доступных запросов
Самое главное. Тестирование
cb := NewCircuitBreaker(Config{
Name: "test circuit breaker",
MaxRequests: 2,
ReadyToTrip: func(counts Counts) bool {
return counts.ConsecutiveFailures > 5
},
})
// 5 запусков с ошибкой
for i := 0; i < 5; i++ {
assert.Nil(t, fail(cb))
}
// состояние все еще Closed т.к. нужно > 5 ошибок подрят
assert.Equal(t, StateClosed, cb.state)
assert.Equal(t, Counts{5, 0, 5, 0, 5}, cb.counts)
// успешный запуск. должен сбросить ConsecutiveFailures
assert.Nil(t, succeed(cb))
assert.Equal(t, StateClosed, cb.state)
assert.Equal(t, Counts{6, 1, 5, 1, 0}, cb.counts)
// ошибка. статус все еще Closed т.к. ConsecutiveFailures=1
assert.Nil(t, fail(cb))
assert.Equal(t, StateClosed, cb.state)
assert.Equal(t, Counts{7, 1, 6, 0, 1}, cb.counts)
// StateClosed to StateOpen
for i := 0; i < 5; i++ {
assert.Nil(t, fail(cb)) // 6 consecutive failures
}
assert.Equal(t, StateOpen, cb.state)
assert.Equal(t, Counts{0, 0, 0, 0, 0}, cb.counts)
assert.False(t, cb.expiry.IsZero())
// в Open запросы не проходят
assert.Error(t, succeed(cb))
assert.Error(t, fail(cb))
assert.Equal(t, Counts{0, 0, 0, 0, 0}, cb.counts)
pseudoSleep(cb, time.Duration(59)*time.Second)
assert.Equal(t, StateOpen, cb.state)
// StateOpen to StateHalfOpen
pseudoSleep(cb, time.Duration(1)*time.Second) // over Timeout
assert.Nil(t, succeed(cb))
assert.Equal(t, StateHalfOpen, cb.state)
assert.True(t, cb.expiry.IsZero())
assert.Equal(t, Counts{1, 1, 0, 1, 0}, cb.counts)
// StateHalfOpen to StateOpen
assert.Nil(t, fail(cb))
assert.Equal(t, StateOpen, cb.state)
assert.False(t, cb.expiry.IsZero())
assert.Equal(t, Counts{0, 0, 0, 0, 0}, cb.counts)
// StateOpen to StateHalfOpen
pseudoSleep(cb, time.Duration(60)*time.Second) // over Timeout
assert.Nil(t, succeed(cb))
assert.Equal(t, StateHalfOpen, cb.state)
assert.True(t, cb.expiry.IsZero())
assert.Equal(t, Counts{1, 1, 0, 1, 0}, cb.counts)
// StateHalfOpen to StateClosed
assert.Nil(t, succeed(cb)) // ConsecutiveSuccesses(2) >= MaxRequests(2)
assert.Equal(t, StateClosed, cb.state)
assert.Equal(t, Counts{0, 0, 0, 0, 0}, cb.counts)
assert.True(t, cb.expiry.IsZero())
Полный код на github
https://github.com/GermanGorelkin/go-patterns/tree/master/circuit-breaker
Пример может помочь для понимания проблем и их возможных решений. Для продакшена он не предназначен.
Как можно улучшить
-
Важно сделать Circuit Breaker потокобезопасным. Добавить блокировки на критические секции.
-
Разные типы ошибок могут говорить о разных проблемах с точки зрения доступности сервиса. В таком случае имеет смысл учитывать не только кол-во ошибок, но и их тип. Можно добавить веса или осуществлять немедленный переход в Open при возникновении определенных ошибок.
-
Возможность добавить различные стратегии для переходов в другое состояние. Например, не всегда нужно дожидаться таймаута для перехода из Open в Half Open, а имеет смысл автоматически опросить health check нужного сервиса.
-
Логирования изменений состояния Circuit Breaker и их причины.
-
Мониторинг текущего статуса и возможность управлять им из вне.
gobreaker
За основу примера был взят пакет gobreaker от sony.
var cb *breaker.CircuitBreaker
func Get(url string) ([]byte, error) {
body, err := cb.Execute(func() (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
})
if err != nil {
return nil, err
}
return body.([]byte), nil
}