type
status
date
slug
summary
tags
category
icon
password
- 都需要定义 Observer,并且通过 register() 函数注册 Observer,也都需要通过调用某个函数(比如,EventBus 中的 post() 函数)来给 Observer 发送消息(在 EventBus 中消息被称作事件 event)。
- 但在实现细节方面,它们又有些区别。基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。
一、观察者模式概述
- 创建型设计模式主要解决“对象的创建”问题
- 结构型设计模式主要解决“类或对象的组合或组装”问题
- 行为型设计模式主要解决的就是“类或对象之间的交互”问题
设计模式要干的事情就是解耦。创建型模式是将创建和使用代码解耦,结构型模式是将不同功能代码解耦,行为型模式是将不同的行为代码解耦,具体到观察者模式,它是将观察者和被观察者代码解耦。
观察者模式又被称为发布-订阅(Publish/Subscribe)模式,它定义了一种一对多的依赖关系,让多个观察者对象同时监听某一个主题对象。这个主题对象在状态变化时,会通知所有的观察者对象,使他们能够自动更新自己。
根据应用场景的不同,观察者模式会对应不同的代码实现方式:
- 有同步阻塞的实现方式
- 异步非阻塞的实现方式
- 进程内的实现方式
- 跨进程的实现方式
观察者模式中有如下角色结构:
- Subject:抽象主题(抽象被观察者),抽象主题角色把所有观察者对象保存在一个集合里,每个主题都可以有任意数量的观察者,抽象主题提供三个接口,可以增加和删除观察者对象以及通知所有观察者。
- ConcreteSubject:具体主题(具体被观察者),该角色将有关状态存入具体观察者对象,在具体主题的内部状态发生改变时,给所有注册过的观察者发送通知。
- Observer:抽象观察者,是观察者的抽象类,它定义了一个更新接口,使得在得到主题更改通知时更新自己。
- ConcrereObserver:具体观察者,实现抽象观察者定义的更新接口,以便在得到主题更改通知时更新自身的状态。
二、观察者模式实现
案例:气象站可以将每天测量到的温度,湿度,气压等等以公告的形式发布出去(比如发布到自己的网站或第三方)。
问题方案
问题如下:
- 其他第三方接入气象站获取数据的问题,无法在运行时动态的添加第三方
- 违反 ocp 原则,当加入第三方时,需要修改WeatherData代码,不利于维护。
观察者模式方案
三、观察者模式不同实现方式
- 同步阻塞
- 异步非阻塞
- 进程内的
- 跨进程的
前边讲到的实现方式就是同步阻塞的例子,观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。
如果被观察者方法是一个调用比较频繁的接口,对性能非常敏感,就需要改为异步非阻塞的实现方式,以此来减少响应时间。即可以开一个新线程额外去处理即可,将推送变为异步处理,主进程不会被阻塞。
不过还有更加优雅的实现方式,那就是基于 EventBus 来实现,是 Google Guava EventBus 框架,后面进行简单实现。
上面两种实现,同步或异步都是进程内的实现方式。如果推送结果还要推给独立系统,就需要跨进程的通信了。
- 如果独立系统提供了 RPC 调用接口,则可以沿用之前方案,直接调用 RPC 接口。
- 更加优雅实现方式则可以使用消息队列,被观察者只管发送消息到消息队列,观察者只管从消息队列中读取消息来执行相应的逻辑
- 缺点:引入新的中间件,增加维护成本
- 优点:更加优雅,被观察者和观察者彻底解耦
四、异步非阻塞框架 Google EventBus 简单实现
1、开新线程处理的问题
前置代码:
基于开新线程的两种方案:
问题?
- 第一种实现方式:频繁地创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。
- 第二种实现方式:尽管利用了线程池解决了第一种实现方式的问题,但线程池、异步执行逻辑都耦合在了 register() 函数中,增加了这部分业务代码的维护成本。
- 如果需要在同步阻塞和异步非阻塞之间灵活切换,那就要不停地修改 UserController 的代码。除此之外,如果在项目中,不止一个业务模块需要用到异步非阻塞观察者模式,那这样的代码实现也无法复用。
2、EventBus 框架功能需求介绍
框架的作用有:隐藏实现细节,降低开发难度,做到代码复用,解耦业务与非业务代码,让程序员聚焦业务开发。
EventBus 翻译为“事件总线”,它提供了实现观察者模式的骨架代码。我们可以基于此框架,非常容易地在自己的业务场景中实现观察者模式,不需要从零开始开发。其中,Google Guava EventBus 就是一个比较著名的 EventBus 框架,它不仅仅支持异步非阻塞模式,同时也支持同步阻塞模式。
使用框架进行代码重构
使用 Google EventBus 对上面代码进行一下重构:
使用 eventBus 框架与之前相比:
- 都需要定义 Observer,并且通过 register() 函数注册 Observer,也都需要通过调用某个函数(比如,EventBus 中的 post() 函数)来给 Observer 发送消息(在 EventBus 中消息被称作事件 event)。
- 实现细节方面,有些区别。基于 EventBus,我们不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。
EventBus 功能分析
1、EventBus、AsyncEventBus
EventBus 实现了同步阻塞的观察者模式,AsyncEventBus 继承自 EventBus,提供了异步非阻塞的观察者模式。
2、register() 函数
EventBus 类提供了 register() 函数用来注册观察者。它可以接受任何类型(Object)的观察者。与经典观察者模式不同。
3、unregister() 函数
相对于 register() 函数,unregister() 函数用来从 EventBus 中删除某个观察者。
4、post() 函数
EventBus 类提供了 post() 函数,用来给观察者发送消息。
与经典的观察者模式的不同之处在于,当我们调用 post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息(post 函数定义中的 event)类型的父类。
5、@Subscribe 注解
EventBus 通过 @Subscribe 注解来标明,某个函数能接收哪种类型的消息。
当通过 register() 函数将 DObserver 类对象注册到 EventBus 的时候,EventBus 会根据 @Subscribe 注解找到 f1() 和 f2(),并且将两个函数能接收的消息类型记录下来(PMsg->f1,QMsg->f2)。当我们通过 post() 函数发送消息(比如 QMsg 消息)的时候,EventBus 会通过之前的记录(QMsg->f2),调用相应的函数(f2)。
3、自己实现 EventBus 框架
简单原理
- 最关键的一个数据结构是 Observer 注册表,记录了消息类型和可接收消息函数的对应关系。
- 当调用 register() 函数注册观察者的时候,EventBus 通过解析 @Subscribe 注解,生成 Observer 注册表。
- 当调用 post() 函数发送消息的时候,EventBus 通过注册表找到相应的可接收消息的函数,然后通过 Java 的反射语法来动态地创建对象、执行函数。
- 对于同步阻塞模式,EventBus 在一个线程内依次执行相应的函数。对于异步非阻塞模式,EventBus 通过一个线程池来执行相应的函数。
- 整个小框架的代码实现包括 5 个类:EventBus、AsyncEventBus、Subscribe、ObserverAction、ObserverRegistry。
Subscribe
Subscribe 是一个注解,用于标明观察者中的哪个函数可以接收消息。
ObserverAction
ObserverAction 类用来表示 @Subscribe 注解的方法,其中,target 表示观察者类,method 表示方法。它主要用在 ObserverRegistry 观察者注册表中。
ObserverRegistry
- ObserverRegistry 类就是 Observer 注册表,是最复杂的一个类,框架中几乎所有的核心逻辑都在这个类中。这个类大量使用了 Java 的反射语法,其中,一个比较有技巧的地方是 CopyOnWriteArraySet 的使用。
- CopyOnWriteArraySet,顾名思义,在写入数据的时候,会创建一个新的 set,并且将原始数据 clone 到新的 set 中,在新的 set 中写入数据完成之后,再用新的 set 替换老的 set。这样就能保证在写入数据的时候,不影响数据的读取操作,以此来解决读写并发问题。除此之外,CopyOnWriteSet 还通过加锁的方式,避免了并发写冲突。
EventBus
EventBus 实现的是阻塞同步的观察者模式。看代码可能会有些疑问,这明明就用到了线程池 Executor 啊。实际上,MoreExecutors.directExecutor() 是 Google Guava 提供的工具类,看似是多线程,实际上是单线程。之所以要这么实现,主要还是为了跟 AsyncEventBus 统一代码逻辑,做到代码复用。
AsyncEventBus
有了 EventBus,AsyncEventBus 的实现就非常简单了。为了实现异步非阻塞的观察者模式,它就不能再继续使用 MoreExecutors.directExecutor() 了,而是需要在构造函数中,由调用者注入线程池。
- 作者:ITNXD
- 链接:https://blog.itnxd.eu.org/article/behavioral-pattern-observer-design-pattern
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。

