Существует как минимум два способа общения между двумя программными компонентами.
Компонент 1 может обратится к Компонент 2 для получения каких-то данных или выполнения каких-то операций. В этом случае Компонент 2 выполняет работу, когда его об этом просят.
Компонент 2 является активным, содержит собственный поток исполнения или каким-то другим способом следит за своим состоянием. Компонент 2 может уведомлять Компонент 1 о событиях.
Первая модель взаимодействия называется pull-моделью, а вторая — push-моделью.
Pull-модель в рамках одного процесса — это вызов функции или метода. Для клиент-серверной архитектуры — это запрос клиента к серверу. Или запрос одного сервиса к другому.
Push-модель в мире структурного программирования реализуется с помощью обратного вызова(callbacks). В мире функционального программирования push-модель представлена в виде реактивной модели. А в ООП реализуется с помощью паттерна Observer.
Паттерн PubSub(Publisher-Subscriber) является вариацией Observer. Взаимодействие между компонентами происходит через канал связи Event channel. Publisher отправляет события в event channel. Subscriber подписывается на нужное события и ждет его поступления в event channel.
Ключевым различием между классическим Observer и PubSub является слабая связность(loose coupling). Publisher и Subscriber в PubSub не знают о существование друг друга, в отличии от Observer и Subject.
Паттерн PubSub подходит для асинхронного взаимодействия различных приложений в системе. В качестве event channal используют вариации брокером, шин событий и пр(broker, message broker,event bus, …).
Classic Observer
Определяет зависимость типа “один ко многим” между объектами таким образом, что при изменении состояния одного объекта все зависящие от него оповещаются об этом и автоматически обновляются.
Subject имеет внутреннее состояние. На изменения этого состояния должны реагировать другие компоненты системы. Subject владеет списком компонентов подписанных на его изменения. А так же механизмом добавления/удаления подписчиков. Когда происходит события, Subject проходит по своему списку подписчиков и оповещает их об этом.
Observer должен иметь механизм получения уведомления о событиях. В классической реализации — это метод Update. Subject вызывает Update у всех своих подписчиков когда меняется его состояние.
Мы уже говорили, что есть push и pull модели взаимодействия. Паттерн Observer представляет собой push-модель взаимодействия между объектами, поскольку Subject самостоятельно “проталкивает” информацию о событии. Но у push-модели остается вопрос о кол-ве передаваемой информации.
В контексте передаваемой информации так же существует push и pull модели.
Subject может только информировать наблюдателей о событии, не передавая никаких подробностей, вызвав метод Update. Если у наблюдателя есть ссылка на издателя событий, то он может запросить дополнительную информацию. Или обратиться к другому компоненту системы. В хранилище данных или к стороннему сервису. Подписчик может даже проигнорировать событие. Такой подход можно назвать pull-модель получения данных.
При push-модель передачи данных, издатель сразу передаёт всю информацию. Например, через метод Update. Плюс такого подхода в том, что наблюдатель не связан с подписчиком. Он может получать различные события и обрабатывать их по своей логике.
Теперь Go
Посмотрим на классический паттерн Observer
type Observer interface {
Update()
}
type Subject interface {
Attach(Observer)
Detach(Observer)
//Notify()
}
У ConcreteSubject
есть список подписчиков(observers
) и его состояние(state
). Реализованы методы подписки(Attach
) и отписки(Detach
). Состояние объекта можно изменить(SetState
) или получить актуальное значение(GetState
). При смене состояния вызывается метод оповещения о событии(notify
). notify
обходит всех подписчиков и вызывает их метод Update
.
type ConcreteSubject struct {
observers []Observer // список подписчиков
state interface{} // состояние
}
func (s *ConcreteSubject) Attach(o Observer) {
s.observers = append(s.observers, o)
}
func (s *ConcreteSubject) Detach(o Observer) {
for i, v := range s.observers {
if o == v {
s.observers = append(s.observers[:i], s.observers[i+1:]...)
break
}
}
}
func (s *ConcreteSubject) notify() {
for _, v := range s.observers {
v.Update()
}
}
func (s *ConcreteSubject) SetState(st interface{}) {
s.state = st
s.notify()
}
func (s *ConcreteSubject) GetState() interface{} {
return s.state
}
У наблюдателя будет ссылка на наблюдаемый объект(subject
) и свои данные(state
). Метод Update
(тот самый который вызывает Subject) просто дублируют состояния издателя
type ConcreteObserver struct {
subject *ConcreteSubject
state interface{}
}
func (o *ConcreteObserver) Update() {
o.state = o.subject.GetState()
log.Printf("new state:%s", o.state)
}
Вот и вся реализация паттерна Observer.
Напишем тесты и убедимся, что все работает, как планировали.
Создаем издателя и двух подписчиков:
subject := ConcreteSubject{}
obs1 := ConcreteObserver{subject: &subject, state: "obs1"}
obs2 := ConcreteObserver{subject: &subject, state: "obs2"}
Две подписки и проверка:
t.Run("Attach", func(t *testing.T) {
subject.Attach(&obs1)
subject.Attach(&obs2)
if len(subject.observers) != 2 {
t.Errorf("expected len: 2, got:%d", len(subject.observers))
}
})
Меняем состояния издателя и проверяем, что оба подписчика были об этом проинформированы:
t.Run("Notify 2 obs", func(t *testing.T) {
state := "new test"
subject.SetState(state)
for _, o := range subject.observers {
obs := o.(*ConcreteObserver)
if obs.state != state {
t.Errorf("expected state:%v, got:%v", state, obs.state)
}
}
})
Тестируем отписку одного из наблюдателей:
t.Run("Detach", func(t *testing.T) {
subject.Detach(&obs1)
if len(subject.observers) != 1 {
t.Errorf("expected len: 1, got:%d", len(subject.observers))
}
})
Проверяем еще раз оповещения:
t.Run("Notify after detach", func(t *testing.T) {
state := "new test2"
subject.SetState(state)
for _, o := range subject.observers {
obs := o.(*ConcreteObserver)
if obs.state != state {
t.Errorf("expected state:%v, got:%v", state, obs.state)
}
}
})
Разобравшись с базовой идеей, можно посмотреть на различные вариации.
Event Listeners
Для начало попробуем сделать компоненты менее зависимыми друг от друга. Например, наблюдателю необязательно что-то знать про издателя. Наблюдателю достаточно получать сообщения о событии.
Предположим есть компонент который отслеживает изменения в файловой системе и сообщает об этом. Упрощенный вариант структуры файла и типов событий:
type FileInfo struct {
Name string
}
type EventType string
const (
EventTypeCreate EventType = "create"
EventTypeRemove EventType = "remove"
EventTypeModify EventType = "modify"
)
Наблюдатель больше не будет иметь ссылок на издателя, но при уведомлении будет получать всю необходимую информацию. Push-модель передачи данных. Сразу передается тип события(EventType
) и объект события(FileInfo
):
type Observer interface {
Update(EventType, FileInfo)
}
Предположим, что необходимо вести логи на все изменения в файловой системе. Добавим реализацию наблюдателя который сохраняет логи в свое хранилище:
type LoggingListener struct {
store io.Writer
}
func (l *LoggingListener) Update(et EventType, fi FileInfo) {
log := fmt.Sprintf("%s|%s", et, fi.Name)
l.store.Write([]byte(log))
}
Еще может понадобиться отправлять уведомления:
type AlertListener struct {
service io.Writer
}
func (l *AlertListener) Update(et EventType, fi FileInfo) {
log := fmt.Sprintf("%s|%s", et, fi.Name)
l.service.Write([]byte(log))
}
Subject очень похож на классическую реализацию, но появляется возможность подписывать наблюдателей на разные типы событий:
type Subject interface {
Attach(EventType, Observer)
Detach(EventType, Observer)
}
type EventManager struct {
observers map[EventType][]Observer
}
func (em *EventManager) Attach(et EventType, o Observer) {
em.observers[et] = append(em.observers[et], o)
}
func (em *EventManager) Detach(et EventType, o Observer) {
for i, v := range em.observers[et] {
if v == o {
em.observers[et] = append(em.observers[et][:i], em.observers[et][i+1:]...)
break
}
}
}
func (em *EventManager) Notify(et EventType, fi FileInfo) {
for _, o := range em.observers[et] {
o.Update(et, fi)
}
}
В данном случае Subject не имеет состояния и не генерирует события о которых нужно уведомлять. Subject управляет подписчиками и их уведомлением.
Компонент который следит за состоянием файловой системы имеет ссылку на EventManager. Когда что-то происходит с файлами он информирует об этом EventManager:
type Watcher struct {
eventManager *EventManager
}
func (w *Watcher) OnCreate(fi FileInfo) {
w.eventManager.Notify(EventTypeCreate, fi)
}
func (w *Watcher) OnRemove(fi FileInfo) {
w.eventManager.Notify(EventTypeRemove, fi)
}
func (w *Watcher) OnModify(fi FileInfo) {
w.eventManager.Notify(EventTypeModify, fi)
}
Получилась одна из вариаций паттерна Mediator. Обработчики событий и генераторы этих событий ничего не знают друг о друге. Они взаимодействуют через единый компонент, в данном случае EventManager.
Все готово, осталось протестировать.
Для тестов понадобится mock-объект в который наблюдатели будут сохранять полученные события. Mock-объекту нужно реализовать интерфейс Writer. Сделаем слайс строк и метод Write который пишет в них данные:
type MockWriter struct {
data []string
}
func (mw *MockWriter) Write(b []byte) (n int, err error) {
mw.data = append(mw.data, string(b))
return len(b), nil
}
Создаем EventManager и связанный с ним Watcher:
evManager := EventManager{observers: map[EventType][]Observer{}}
fileWatcher := Watcher{eventManager: &evManager}
LoggingListener и MockWriter:
store := MockWriter{}
logger := LoggingListener{store: &store}
AlertListener и MockWriter:
service := MockWriter{}
alerts := AlertListener{service: &service}
Добавим обработчик LoggingListener на все три события и обработчик AlertListener на два. Проверим, что у create и modify по два обработчика, а у delete только один:
evManager.Attach(EventTypeCreate, &logger)
evManager.Attach(EventTypeRemove, &logger)
evManager.Attach(EventTypeModify, &logger)
evManager.Attach(EventTypeCreate, &alerts)
evManager.Attach(EventTypeModify, &alerts)
if len(evManager.observers[EventTypeCreate]) != 2 {
t.Errorf("expected 2 listeners for EventTypeCreate, got:%d", len(evManager.observers[EventTypeCreate]))
}
if len(evManager.observers[EventTypeModify]) != 2 {
t.Errorf("expected 2 listeners for EventTypeModify, got:%d", len(evManager.observers[EventTypeModify]))
}
if len(evManager.observers[EventTypeRemove]) != 1 {
t.Errorf("expected 1 listeners for EventTypeRemove, got:%d", len(evManager.observers[EventTypeRemove]))
}
Для тестирования обработки событий потребуется две переменные, которые должны быть равны ожидаемому состоянию LoggingListener и AlertListener:
expectedDataInStore := make([]string, 0)
expectedDataInService := make([]string, 0)
Вспомогательная функция которая сравнивает эти слайсы со слайсами обработчиков:
eqDataOfWriter := func(t *testing.T) {
t.Helper()
if !reflect.DeepEqual(store.data, expectedDataInStore) {
t.Errorf("store expected:%v, got:%v", expectedDataInStore, store.data)
}
if !reflect.DeepEqual(service.data, expectedDataInService) {
t.Errorf("service expected:%v, got:%v", expectedDataInService, store.data)
}
}
Тестируем создание файла. Событие должно обработаться в двух наблюдателях:
fi := FileInfo{Name: "testName"}
fileWatcher.OnCreate(fi)
expectedDataInStore = append(expectedDataInStore, "create|testName")
expectedDataInService = append(expectedDataInService, "create|testName")
eqDataOfWriter(t)
Таким же образом тестируем удаление и изменения:
t.Run("OnRemove", func(t *testing.T) {
fi := FileInfo{Name: "testName"}
fileWatcher.OnRemove(fi)
expectedDataInStore = append(expectedDataInStore, "remove|testName")
eqDataOfWriter(t)
})
t.Run("OnModify", func(t *testing.T) {
fi := FileInfo{Name: "testName"}
fileWatcher.OnModify(fi)
expectedDataInStore = append(expectedDataInStore, "modify|testName")
expectedDataInService = append(expectedDataInService, "modify|testName")
eqDataOfWriter(t)
})
Publish/Subscribe
Паттер PubSub — это вариация Observer. У pubsub есть event channal. Благодаря event channal у pubsub появляются такие свойства, как слабая связность между компонентами и асинхронность.
В Go на роль event channal отлично подходят встроенные каналы.
Интерфейсы publisher и subscriber могут выглядит следующим образом:
type Subscriber interface {
Notify(interface{})
Close()
}
type Publisher interface {
start()
AddSubscriber() chan<- Subscriber
RemoveSubscriber() chan<- Subscriber
PublishMessage() chan<- interface{}
Stop()
}
Значения в каналы можно передавать на месте, при вызове методов. Так же появляется возможность получить канал и передать его в другие компоненты системы.
Publisher
У publisher есть список подписчиков(subscribers
), канал добавления подписчиков(addSubCh
), канал удаления подписчиков(removeSubCh
), канал получения сообщений(inMsg
) и сигнал остановки(stop
):
type publisher struct {
subscribers []Subscriber
addSubCh chan Subscriber
removeSubCh chan Subscriber
inMsg chan interface{}
stop chan struct{}
addSubHandler func(Subscriber)
removeSubHandler func(Subscriber)
}
Методы дают доступ к каналам:
func (p *publisher) AddSubscriber() chan<- Subscriber {
return p.addSubCh
}
func (p *publisher) RemoveSubscribe() chan<- Subscriber {
return p.removeSubCh
}
func (p *publisher) PublishMessage() chan<- interface{} {
return p.inMsg
}
func (p *publisher) Stop() {
close(p.stop)
}
Добавляем нового подписчика в список через канал addSubCh
. onAddSubscriber
позволяет клиенту publisher
добавить дополнительную логику в процесс подписки. Например, добавить логирование. Так же можно реализовать переопределения всей логики подписки, в плоть до обработки получения данных из канала addSubCh
. Через removeSubCh
получаем подписчика на удаление. inMsg
получает сообщения и отправляет всем подписчикам. stop
вызывает Close
у всех подписчиков и закрывает каналы издателя:
func (p *publisher) start() {
for {
select {
case sub := <-p.addSubCh:
{
p.subscribers = append(p.subscribers, sub)
p.onAddSubscriber(sub)
}
case sub := <-p.removeSubCh:
{
for i, s := range p.subscribers {
if sub == s {
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
s.Close()
p.onRemoveSubscriber(sub)
break
}
}
}
case msg := <-p.inMsg:
{
for _, sub := range p.subscribers {
sub.Notify(msg)
}
}
case <-p.stop:
{
for _, sub := range p.subscribers {
sub.Close()
}
close(p.addSubCh)
close(p.removeSubCh)
close(p.inMsg)
return
}
}
}
}
Создания и инициализация:
func NewPublisher() *publisher {
em := publisher{
addSubCh: make(chan Subscriber),
removeSubCh: make(chan Subscriber),
inMsg: make(chan interface{}),
stop: make(chan struct{}),
}
go em.start()
return &em
}
Publisher и subscriber могут быть совсем разными компонентами, поэтому есть смысл их тестировать по отдельности.
mock для subscriber:
type mockSubscriber struct {
isClose bool
testNotify *func(string)
}
func (s *mockSubscriber) Notify(msg interface{}) {
(*s.testNotify)(msg.(string))
}
func (s *mockSubscriber) Close() {
s.isClose = true
}
Тестируем добавления подписчиков. Добавляем их в разных горутинах. А addSubHandler
поможет определить момент завершения добавления в список. Запускаем 50 горутин с добавлением и в конце сверяем с кол-вом в pub.subscribers
:
t.Run("AddSubscriber", func(t *testing.T) {
cntSub := 50
wg := sync.WaitGroup{}
pub.addSubHandler = func(s Subscriber) {
wg.Done()
}
for i := 0; i < cntSub; i++ {
wg.Add(1)
go func() {
sub := mockSubscriber{
isClose: false,
testNotify: &testFunNotify,
}
pub.addSubCh <- &sub
}()
}
wg.Wait()
if cntSub != len(pub.subscribers) {
t.Errorf("expected cnt sub:%d, got:%d", cntSub, len(pub.subscribers))
}
})
Указатель на функцию testFunNotify
мы присвоили всем подписчикам. Эта функция будет вызвана при получении сообщения. Отправим сообщение и проверим его через эту функцию.
t.Run("PublishMessage", func(t *testing.T) {
msg := "Test Msg"
testFunNotify = func(s string) {
if msg != s {
t.Errorf("expected:%s got:%s", msg, s)
}
}
pub.PublishMessage() <- msg
})
Удаление подписчика из списка издателя:
t.Run("RemoveSubscribe", func(t *testing.T) {
cntSub := 40
wg := sync.WaitGroup{}
pub.removeSubHandler = func(s Subscriber) {
wg.Done()
}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
pub.removeSubCh <- pub.subscribers[0]
}()
}
wg.Wait()
if cntSub != len(pub.subscribers) {
t.Errorf("expected cnt sub:%d, got:%d", cntSub, len(pub.subscribers))
}
})
Subscriber
У subscriber
будет канал получения сообщений(in
), канал для сигнала остановки(stop
) и хранилище с интерфейсом Writer(store
):
type subscriber struct {
in chan interface{}
stop chan struct{}
store io.Writer
}
Полученное сообщение отправляем в канал in и дальше из него в хранилище:
func (s subscriber) Notify(msg interface{}) {
s.in <- msg
}
func (s subscriber) Close() {
close(s.stop)
}
func (s subscriber) start() {
for {
select {
case msg := <-s.in:
{
s.store.Write([]byte(msg.(string)))
}
case <-s.stop:
{
close(s.in)
return
}
}
}
}
Получился простой пример subscriber.
Осталось добавить тестов. Mock для store:
type mockWriter struct {
data []string
}
func (mw *mockWriter) Write(b []byte) (n int, err error) {
mw.data = append(mw.data, string(b))
return len(b), nil
}
Создаем 50 подписчиков и отправляем им 50 сообщений:
func TestSubscriber_Notify(t *testing.T) {
excepted := make([]string, 50)
subs := make([]subscriber, 50)
for i := 0; i < 50; i++ {
store := mockWriter{}
subs[i] = NewSubscriber(&store)
}
msg := "test msg"
for i := 0; i < 50; i++ {
excepted[i] = msg
for _, sub := range subs {
sub.Notify(msg)
}
}
time.Sleep(1 * time.Second)
for _, sub := range subs {
if !reflect.DeepEqual(sub.store.(*mockWriter).data, excepted) {
t.Errorf("excepted:%v got:%v", sub.store.(*mockWriter).data, excepted)
}
}
}
У каждого subscriber должно быть в store слайс из 50 сообщений.
PubSub в упрощенном виде готов. Дальше его можно изменять и подстраивать под конкретную задачу.
Рассмотрели концепции взаимодействия компонентов в системе. Подходы push и pull моделей. Реализацию push модели через паттерн Observer и его вариацию PubSub.
https://github.com/GermanGorelkin/go-patterns/tree/master/behavioral/observer
Комментарии в Telegram-группе!