Существует как минимум два способа общения между двумя программными компонентами.

Компонент 1 может обратится к Компонент 2 для получения каких-то данных или выполнения каких-то операций. В этом случае Компонент 2 выполняет работу, когда его об этом просят.

Компонент 2 является активным, содержит собственный поток исполнения или каким-то другим способом следит за своим состоянием. Компонент 2 может уведомлять Компонент 1 о событиях.

Первая модель взаимодействия называется pull-моделью, а вторая — push-моделью.

This is an image

Pull-модель в рамках одного процесса — это вызов функции или метода. Для клиент-серверной архитектуры — это запрос клиента к серверу. Или запрос одного сервиса к другому.

This is an image

Push-модель в мире структурного программирования реализуется с помощью обратного вызова(callbacks). В мире функционального программирования push-модель представлена в виде реактивной модели. А в ООП реализуется с помощью паттерна Observer.

Паттерн PubSub(Publisher-Subscriber) является вариацией Observer. Взаимодействие между компонентами происходит через канал связи Event channel. Publisher отправляет события в event channel. Subscriber подписывается на нужное события и ждет его поступления в event channel.

This is an image

Ключевым различием между классическим Observer и PubSub является слабая связность(loose coupling). Publisher и Subscriber в PubSub не знают о существование друг друга, в отличии от Observer и Subject.

Паттерн PubSub подходит для асинхронного взаимодействия различных приложений в системе. В качестве event channal используют вариации брокером, шин событий и пр(broker, message broker,event bus, …).

Classic Observer

Определяет зависимость типа “один ко многим” между объектами таким образом, что при изменении состояния одного объекта все зависящие от него оповещаются об этом и автоматически обновляются.

This is an image

Subject имеет внутреннее состояние. На изменения этого состояния должны реагировать другие компоненты системы. Subject владеет списком компонентов подписанных на его изменения. А так же механизмом добавления/удаления подписчиков. Когда происходит события, Subject проходит по своему списку подписчиков и оповещает их об этом.

Observer должен иметь механизм получения уведомления о событиях. В классической реализации — это метод Update. Subject вызывает Update у всех своих подписчиков когда меняется его состояние.

Мы уже говорили, что есть push и pull модели взаимодействия. Паттерн Observer представляет собой push-модель взаимодействия между объектами, поскольку Subject самостоятельно “проталкивает” информацию о событии. Но у push-модели остается вопрос о кол-ве передаваемой информации.

В контексте передаваемой информации так же существует push и pull модели.

Subject может только информировать наблюдателей о событии, не передавая никаких подробностей, вызвав метод Update. Если у наблюдателя есть ссылка на издателя событий, то он может запросить дополнительную информацию. Или обратиться к другому компоненту системы. В хранилище данных или к стороннему сервису. Подписчик может даже проигнорировать событие. Такой подход можно назвать pull-модель получения данных.

При push-модель передачи данных, издатель сразу передаёт всю информацию. Например, через метод Update. Плюс такого подхода в том, что наблюдатель не связан с подписчиком. Он может получать различные события и обрабатывать их по своей логике.

Теперь Go

This is an image

Полный код

Посмотрим на классический паттерн Observer

type Observer interface {
   Update()
}

type Subject interface {
   Attach(Observer)
   Detach(Observer)
   //Notify()
}

У ConcreteSubject есть список подписчиков(observers) и его состояние(state). Реализованы методы подписки(Attach) и отписки(Detach). Состояние объекта можно изменить(SetState) или получить актуальное значение(GetState). При смене состояния вызывается метод оповещения о событии(notify). notify обходит всех подписчиков и вызывает их метод Update.

type ConcreteSubject struct {
   observers []Observer // список подписчиков
   state     interface{} // состояние
}

func (s *ConcreteSubject) Attach(o Observer) {
   s.observers = append(s.observers, o)
}
func (s *ConcreteSubject) Detach(o Observer) {
   for i, v := range s.observers {
      if o == v {
         s.observers = append(s.observers[:i], s.observers[i+1:]...)
         break
      }
   }
}
func (s *ConcreteSubject) notify() {
   for _, v := range s.observers {
      v.Update()
   }
}
func (s *ConcreteSubject) SetState(st interface{}) {
   s.state = st
   s.notify()
}
func (s *ConcreteSubject) GetState() interface{} {
   return s.state
}

У наблюдателя будет ссылка на наблюдаемый объект(subject) и свои данные(state). Метод Update(тот самый который вызывает Subject) просто дублируют состояния издателя

type ConcreteObserver struct {
   subject *ConcreteSubject
   state   interface{}
}

func (o *ConcreteObserver) Update() {
   o.state = o.subject.GetState()
   log.Printf("new state:%s", o.state)
}

Вот и вся реализация паттерна Observer.

Напишем тесты и убедимся, что все работает, как планировали.

Создаем издателя и двух подписчиков:

subject := ConcreteSubject{}

obs1 := ConcreteObserver{subject: &subject, state: "obs1"}
obs2 := ConcreteObserver{subject: &subject, state: "obs2"}

Две подписки и проверка:

t.Run("Attach", func(t *testing.T) {
   subject.Attach(&obs1)
   subject.Attach(&obs2)

   if len(subject.observers) != 2 {
      t.Errorf("expected len: 2, got:%d", len(subject.observers))
   }
})

Меняем состояния издателя и проверяем, что оба подписчика были об этом проинформированы:

t.Run("Notify 2 obs", func(t *testing.T) {
   state := "new test"

   subject.SetState(state)

   for _, o := range subject.observers {
      obs := o.(*ConcreteObserver)
      if obs.state != state {
         t.Errorf("expected state:%v, got:%v", state, obs.state)
      }
   }
})

Тестируем отписку одного из наблюдателей:

t.Run("Detach", func(t *testing.T) {
   subject.Detach(&obs1)

   if len(subject.observers) != 1 {
      t.Errorf("expected len: 1, got:%d", len(subject.observers))
   }
})

Проверяем еще раз оповещения:

t.Run("Notify after detach", func(t *testing.T) {
   state := "new test2"

   subject.SetState(state)

   for _, o := range subject.observers {
      obs := o.(*ConcreteObserver)
      if obs.state != state {
         t.Errorf("expected state:%v, got:%v", state, obs.state)
      }
   }
})

Разобравшись с базовой идеей, можно посмотреть на различные вариации.

Event Listeners

Для начало попробуем сделать компоненты менее зависимыми друг от друга. Например, наблюдателю необязательно что-то знать про издателя. Наблюдателю достаточно получать сообщения о событии.

Полный код

Предположим есть компонент который отслеживает изменения в файловой системе и сообщает об этом. Упрощенный вариант структуры файла и типов событий:

type FileInfo struct {
   Name string
}

type EventType string
const (
   EventTypeCreate EventType = "create"
   EventTypeRemove EventType = "remove"
   EventTypeModify EventType = "modify"
)

Наблюдатель больше не будет иметь ссылок на издателя, но при уведомлении будет получать всю необходимую информацию. Push-модель передачи данных. Сразу передается тип события(EventType) и объект события(FileInfo):

type Observer interface {
   Update(EventType, FileInfo)
}

Предположим, что необходимо вести логи на все изменения в файловой системе. Добавим реализацию наблюдателя который сохраняет логи в свое хранилище:

type LoggingListener struct {
   store io.Writer
}

func (l *LoggingListener) Update(et EventType, fi FileInfo) {
   log := fmt.Sprintf("%s|%s", et, fi.Name)
   l.store.Write([]byte(log))
}

Еще может понадобиться отправлять уведомления:

type AlertListener struct {
   service io.Writer
}

func (l *AlertListener) Update(et EventType, fi FileInfo) {
   log := fmt.Sprintf("%s|%s", et, fi.Name)
   l.service.Write([]byte(log))
}

Subject очень похож на классическую реализацию, но появляется возможность подписывать наблюдателей на разные типы событий:

type Subject interface {
   Attach(EventType, Observer)
   Detach(EventType, Observer)
}

type EventManager struct {
   observers map[EventType][]Observer
}

func (em *EventManager) Attach(et EventType, o Observer) {
   em.observers[et] = append(em.observers[et], o)
}
func (em *EventManager) Detach(et EventType, o Observer) {
   for i, v := range em.observers[et] {
      if v == o {
         em.observers[et] = append(em.observers[et][:i], em.observers[et][i+1:]...)
         break
      }
   }
}
func (em *EventManager) Notify(et EventType, fi FileInfo) {
   for _, o := range em.observers[et] {
      o.Update(et, fi)
   }
}

В данном случае Subject не имеет состояния и не генерирует события о которых нужно уведомлять. Subject управляет подписчиками и их уведомлением.

Компонент который следит за состоянием файловой системы имеет ссылку на EventManager. Когда что-то происходит с файлами он информирует об этом EventManager:

type Watcher struct {
   eventManager *EventManager
}

func (w *Watcher) OnCreate(fi FileInfo) {
   w.eventManager.Notify(EventTypeCreate, fi)
}
func (w *Watcher) OnRemove(fi FileInfo) {
   w.eventManager.Notify(EventTypeRemove, fi)
}
func (w *Watcher) OnModify(fi FileInfo) {
   w.eventManager.Notify(EventTypeModify, fi)
}

Получилась одна из вариаций паттерна Mediator. Обработчики событий и генераторы этих событий ничего не знают друг о друге. Они взаимодействуют через единый компонент, в данном случае EventManager.

Все готово, осталось протестировать.

Для тестов понадобится mock-объект в который наблюдатели будут сохранять полученные события. Mock-объекту нужно реализовать интерфейс Writer. Сделаем слайс строк и метод Write который пишет в них данные:

type MockWriter struct {
   data []string
}

func (mw *MockWriter) Write(b []byte) (n int, err error) {
   mw.data = append(mw.data, string(b))
   return len(b), nil
}

Создаем EventManager и связанный с ним Watcher:

evManager := EventManager{observers: map[EventType][]Observer{}}
fileWatcher := Watcher{eventManager: &evManager}

LoggingListener и MockWriter:

store := MockWriter{}
logger := LoggingListener{store: &store}

AlertListener и MockWriter:

service := MockWriter{}
alerts := AlertListener{service: &service}

Добавим обработчик LoggingListener на все три события и обработчик AlertListener на два. Проверим, что у create и modify по два обработчика, а у delete только один:

evManager.Attach(EventTypeCreate, &logger)
evManager.Attach(EventTypeRemove, &logger)
evManager.Attach(EventTypeModify, &logger)
evManager.Attach(EventTypeCreate, &alerts)
evManager.Attach(EventTypeModify, &alerts)

if len(evManager.observers[EventTypeCreate]) != 2 {
   t.Errorf("expected 2 listeners for EventTypeCreate, got:%d", len(evManager.observers[EventTypeCreate]))
}
if len(evManager.observers[EventTypeModify]) != 2 {
   t.Errorf("expected 2 listeners for EventTypeModify, got:%d", len(evManager.observers[EventTypeModify]))
}
if len(evManager.observers[EventTypeRemove]) != 1 {
   t.Errorf("expected 1 listeners for EventTypeRemove, got:%d", len(evManager.observers[EventTypeRemove]))
}

Для тестирования обработки событий потребуется две переменные, которые должны быть равны ожидаемому состоянию LoggingListener и AlertListener:

expectedDataInStore := make([]string, 0)
expectedDataInService := make([]string, 0)

Вспомогательная функция которая сравнивает эти слайсы со слайсами обработчиков:

eqDataOfWriter := func(t *testing.T) {
   t.Helper()

   if !reflect.DeepEqual(store.data, expectedDataInStore) {
      t.Errorf("store expected:%v, got:%v", expectedDataInStore, store.data)
   }
   if !reflect.DeepEqual(service.data, expectedDataInService) {
      t.Errorf("service expected:%v, got:%v", expectedDataInService, store.data)
   }
}

Тестируем создание файла. Событие должно обработаться в двух наблюдателях:

fi := FileInfo{Name: "testName"}
fileWatcher.OnCreate(fi)

expectedDataInStore = append(expectedDataInStore, "create|testName")
expectedDataInService = append(expectedDataInService, "create|testName")

eqDataOfWriter(t)

Таким же образом тестируем удаление и изменения:

t.Run("OnRemove", func(t *testing.T) {
   fi := FileInfo{Name: "testName"}
   fileWatcher.OnRemove(fi)

   expectedDataInStore = append(expectedDataInStore, "remove|testName")

   eqDataOfWriter(t)
})

t.Run("OnModify", func(t *testing.T) {
   fi := FileInfo{Name: "testName"}
   fileWatcher.OnModify(fi)

   expectedDataInStore = append(expectedDataInStore, "modify|testName")
   expectedDataInService = append(expectedDataInService, "modify|testName")

   eqDataOfWriter(t)
})

Publish/Subscribe

This is an image

Паттер PubSub — это вариация Observer. У pubsub есть event channal. Благодаря event channal у pubsub появляются такие свойства, как слабая связность между компонентами и асинхронность.

В Go на роль event channal отлично подходят встроенные каналы.

Интерфейсы publisher и subscriber могут выглядит следующим образом:

type Subscriber interface {
   Notify(interface{})
   Close()
}

type Publisher interface {
   start()
   AddSubscriber() chan<- Subscriber
   RemoveSubscriber() chan<- Subscriber
   PublishMessage() chan<- interface{}
   Stop()
}

Значения в каналы можно передавать на месте, при вызове методов. Так же появляется возможность получить канал и передать его в другие компоненты системы.

Полный код

Publisher

This is an image

У publisher есть список подписчиков(subscribers), канал добавления подписчиков(addSubCh), канал удаления подписчиков(removeSubCh), канал получения сообщений(inMsg) и сигнал остановки(stop):

type publisher struct {
   subscribers []Subscriber
   addSubCh    chan Subscriber
   removeSubCh chan Subscriber
   inMsg       chan interface{}
   stop        chan struct{}

   addSubHandler    func(Subscriber)
   removeSubHandler func(Subscriber)
}

Методы дают доступ к каналам:

func (p *publisher) AddSubscriber() chan<- Subscriber {
   return p.addSubCh
}
func (p *publisher) RemoveSubscribe() chan<- Subscriber {
   return p.removeSubCh
}
func (p *publisher) PublishMessage() chan<- interface{} {
   return p.inMsg
}
func (p *publisher) Stop() {
   close(p.stop)
}

Добавляем нового подписчика в список через канал addSubCh. onAddSubscriber позволяет клиенту publisher добавить дополнительную логику в процесс подписки. Например, добавить логирование. Так же можно реализовать переопределения всей логики подписки, в плоть до обработки получения данных из канала addSubCh. Через removeSubCh получаем подписчика на удаление. inMsg получает сообщения и отправляет всем подписчикам. stop вызывает Close у всех подписчиков и закрывает каналы издателя:

func (p *publisher) start() {
   for {
      select {
      case sub := <-p.addSubCh:
         {
            p.subscribers = append(p.subscribers, sub)
            p.onAddSubscriber(sub)
         }
      case sub := <-p.removeSubCh:
         {
            for i, s := range p.subscribers {
               if sub == s {
                  p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
                  s.Close()
                  p.onRemoveSubscriber(sub)
                  break
               }
            }
         }
      case msg := <-p.inMsg:
         {
            for _, sub := range p.subscribers {
               sub.Notify(msg)
            }
         }
      case <-p.stop:
         {
            for _, sub := range p.subscribers {
               sub.Close()
            }

            close(p.addSubCh)
            close(p.removeSubCh)
            close(p.inMsg)

            return
         }
      }
   }
}

Создания и инициализация:

func NewPublisher() *publisher {
   em := publisher{
      addSubCh:    make(chan Subscriber),
      removeSubCh: make(chan Subscriber),
      inMsg:       make(chan interface{}),
      stop:        make(chan struct{}),
   }
   go em.start()
   return &em
}

Publisher и subscriber могут быть совсем разными компонентами, поэтому есть смысл их тестировать по отдельности.

mock для subscriber:

type mockSubscriber struct {
   isClose    bool
   testNotify *func(string)
}

func (s *mockSubscriber) Notify(msg interface{}) {
   (*s.testNotify)(msg.(string))
}
func (s *mockSubscriber) Close() {
   s.isClose = true
}

Тестируем добавления подписчиков. Добавляем их в разных горутинах. А addSubHandler поможет определить момент завершения добавления в список. Запускаем 50 горутин с добавлением и в конце сверяем с кол-вом в pub.subscribers:

t.Run("AddSubscriber", func(t *testing.T) {
   cntSub := 50
   wg := sync.WaitGroup{}
   pub.addSubHandler = func(s Subscriber) {
      wg.Done()
   }

   for i := 0; i < cntSub; i++ {
      wg.Add(1)
      go func() {
         sub := mockSubscriber{
            isClose:    false,
            testNotify: &testFunNotify,
         }

         pub.addSubCh <- &sub
      }()
   }

   wg.Wait()

   if cntSub != len(pub.subscribers) {
      t.Errorf("expected cnt sub:%d, got:%d", cntSub, len(pub.subscribers))
   }
})

Указатель на функцию testFunNotify мы присвоили всем подписчикам. Эта функция будет вызвана при получении сообщения. Отправим сообщение и проверим его через эту функцию.

t.Run("PublishMessage", func(t *testing.T) {
   msg := "Test Msg"

   testFunNotify = func(s string) {
      if msg != s {
         t.Errorf("expected:%s got:%s", msg, s)
      }
   }

   pub.PublishMessage() <- msg
})

Удаление подписчика из списка издателя:

t.Run("RemoveSubscribe", func(t *testing.T) {
   cntSub := 40
   wg := sync.WaitGroup{}
   pub.removeSubHandler = func(s Subscriber) {
      wg.Done()
   }

   for i := 0; i < 10; i++ {
      wg.Add(1)
      go func() {
         pub.removeSubCh <- pub.subscribers[0]
      }()
   }

   wg.Wait()

   if cntSub != len(pub.subscribers) {
      t.Errorf("expected cnt sub:%d, got:%d", cntSub, len(pub.subscribers))
   }
})

Subscriber

This is an image

У subscriber будет канал получения сообщений(in), канал для сигнала остановки(stop) и хранилище с интерфейсом Writer(store):

type subscriber struct {
   in    chan interface{}
   stop  chan struct{}
   store io.Writer
}

Полученное сообщение отправляем в канал in и дальше из него в хранилище:

func (s subscriber) Notify(msg interface{}) {
   s.in <- msg
}
func (s subscriber) Close() {
   close(s.stop)
}
func (s subscriber) start() {
   for {
      select {
      case msg := <-s.in:
         {
            s.store.Write([]byte(msg.(string)))
         }
      case <-s.stop:
         {
            close(s.in)
            return
         }
      }
   }
}

Получился простой пример subscriber.

Осталось добавить тестов. Mock для store:

type mockWriter struct {
   data []string
}

func (mw *mockWriter) Write(b []byte) (n int, err error) {
   mw.data = append(mw.data, string(b))
   return len(b), nil
}

Создаем 50 подписчиков и отправляем им 50 сообщений:

func TestSubscriber_Notify(t *testing.T) {
   excepted := make([]string, 50)
   subs := make([]subscriber, 50)

   for i := 0; i < 50; i++ {
      store := mockWriter{}
      subs[i] = NewSubscriber(&store)
   }

   msg := "test msg"
   for i := 0; i < 50; i++ {
      excepted[i] = msg
      for _, sub := range subs {
         sub.Notify(msg)
      }
   }
   time.Sleep(1 * time.Second)

   for _, sub := range subs {
      if !reflect.DeepEqual(sub.store.(*mockWriter).data, excepted) {
         t.Errorf("excepted:%v got:%v", sub.store.(*mockWriter).data, excepted)
      }
   }
}

У каждого subscriber должно быть в store слайс из 50 сообщений.

PubSub в упрощенном виде готов. Дальше его можно изменять и подстраивать под конкретную задачу.


Рассмотрели концепции взаимодействия компонентов в системе. Подходы push и pull моделей. Реализацию push модели через паттерн Observer и его вариацию PubSub.

https://github.com/GermanGorelkin/go-patterns/tree/master/behavioral/observer