Android 主流三方库源码分析:EventBus
一、简单示例
1. 定义事件实体
首先,定义要传递的事件实体类。
public class CollectEvent {
private String message;
public CollectEvent(String message) {
this.message = message;
}
public String getMessage() {
return message;
}
}
2. 准备订阅者
声明并注解你的订阅方法。
@Subscribe(threadMode = ThreadMode.MAIN)
public void onMessageEvent(CollectEvent event) {
Log.d("EventBus", "Received: " + event.getMessage());
}
3. 注册与解注册
在订阅者所在的类中(如 Activity),注册和解注册你的订阅者。
@Override
protected void onStart() {
super.onStart();
EventBus.getDefault().register(this);
}
@Override
protected void onStop() {
super.onStop();
EventBus.getDefault().unregister(this);
}
4. 发送事件
EventBus.getDefault().post(new CollectEvent("Hello World"));
二、背景与原理
在正式讲解之前,需要对一些基础性的概念进行详细的讲解。众所周知,EventBus 出现之前,开发者一般使用 Android 四大组件中的广播进行组件间的消息传递。那么为什么要使用事件总线机制来替代广播呢?主要是因为:
- 广播:耗时、容易被捕获(不安全)、全局可见。
- 事件总线:更节省资源、更高效,能将信息传递给原生以外的各种对象,且支持线程模式控制。
什么是事件总线?
事件总线机制通过记录对象、使用观察者模式来通知对象各种事件。(当然,你也可以发送基本数据类型如 int,String 等作为一个事件)。
对于 EventBus 而言,它的优缺点总结如下:
- 优点:开销小,代码更优雅、简洁,解耦发送者和接收者,可动态设置事件处理线程和优先级。
- 缺点:每个事件必须自定义一个事件类,增加了维护成本;若未正确注销可能导致内存泄漏。
观察者模式
EventBus 是基于观察者模式扩展而来的。观察者模式又可称为发布 - 订阅模式,它定义了对象间的一种 1 对多的依赖关系,每当这个对象的状态改变时,其它的对象都会接收到通知并被自动更新。
观察者模式有以下角色:
- 抽象观察者:将所有已注册的观察者对象保存在一个集合中。
- 具体观察者:当内部状态发生变化时,将会通知所有已注册的观察者。
- 抽象被观察者:定义了一个更新接口,当被观察者状态改变时更新自己。
- 具体被观察者:实现抽象观察者的更新接口。
EventBus 的扩展
EventBus 的观察者模式和一般的观察者模式不同,它使用了扩展的观察者模式对事件进行订阅和分发。这里的扩展是指使用了 EventBus 来作为中介者,抽离了许多职责。
每次我们在 register 之后,都必须进行一次 unregister,这是因为 register 是强引用,它会让对象无法得到内存回收,导致内存泄露。所以必须在 unregister 方法中释放对象所占的内存。
EventBus 版本对比
有些同学可能之前使用的是 EventBus 2.x 的版本,那么它与 EventBus 3.x 的版本有哪些区别呢?
- EventBus 2.x:使用的是运行时注解,采用了反射的方式对整个注册的类的所有方法进行扫描来完成注册,因而会对性能有一定影响。
- EventBus 3.x:使用的是编译时注解,Java 文件会编译成.class 文件,再对 class 文件进行打包等一系列处理。在编译成.class 文件时,EventBus 会使用 EventBusAnnotationProcessor 注解处理器读取 @Subscribe() 注解并解析、处理其中的信息,然后生成 Java 类来保存所有订阅者的订阅信息。这样就创建出了对文件或类的索引关系,并将其编入到 apk 中。
- 对象池:从 EventBus 3.0 开始使用了对象池缓存减少了创建对象的开销。
EventBus vs RxBus
除了 EventBus,其实现在比较流行的事件总线还有 RxBus。那么,它与 EventBus 相比又如何呢?
- RxJava 的 Observable:有 onError、onComplete 等状态回调。
- 组合方式:RxJava 使用组合而非嵌套的方式,避免了回调地狱。
- 线程调度:RxJava 的线程调度设计的更加优秀,更简单易用。
- 操作符:RxJava 可使用多种操作符来进行链式调用来实现复杂的逻辑。
- 效率:RxJava 的信息效率高于 EventBus 2.x,低于 EventBus 3.x。
在新项目选型时,如果项目中使用了 RxJava,则使用 RxBus,否则使用 EventBus 3.x。
三、源码分析:register
接下来将按以下顺序来进行 EventBus 的源码分析:
- 订阅者:
EventBus.getDefault().register(this)
- 发布者:
EventBus.getDefault().post(new CollectEvent())
- 订阅者:
EventBus.getDefault().unregister(this)
1. 获取实例
首先,我们从获取 EventBus 实例的方法 getDefault() 开始分析:
public static EventBus getDefault() {
if (defaultInstance == null) {
synchronized (EventBus.class) {
if (defaultInstance == null) {
defaultInstance = new EventBus();
}
}
}
return defaultInstance;
}
在 getDefault() 中使用了双重校验并加锁的单例模式来创建 EventBus 实例。
2. 构造方法
接着,我们看到 EventBus 的默认构造方法中做了什么:
private static final EventBusBuilder DEFAULT_BUILDER = new EventBusBuilder();
public EventBus() {
this(DEFAULT_BUILDER);
}
在 EventBus 的默认构造方法中又调用了它的另一个有参构造方法,将一个类型为 EventBusBuilder 的 DEFAULT_BUILDER 对象传递进去了。这里的 EventBusBuilder 很明显是一个 EventBus 的建造器,以便于 EventBus 能够添加自定义的参数和安装一个自定义的默认 EventBus 实例。
3. 核心字段初始化
我们再看一下 EventBus 的这个有参构造方法:
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
private final Map<Object, List<Class<?>>> typesBySubscriber;
private final Map<Class<?>, Object> stickyEvents;
EventBus(EventBusBuilder builder) {
subscriptionsByEventType = new HashMap<>();
typesBySubscriber = new HashMap<>();
stickyEvents = new ConcurrentHashMap<>();
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
backgroundPoster = new BackgroundPoster(this);
asyncPoster = new AsyncPoster(this);
subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
builder.strictMethodVerification, builder.ignoreGeneratedIndex);
executorService = builder.executorService;
}
- 注释 1:创建了一个
subscriptionsByEventType 对象,key 为 Event 类型,value 为 Subscription 链表。Subscription 保存了 subscriber(注册的对象)和 subscriberMethod(被@Subscribe 注解的方法)。
- 注释 2:新建了一个
typesBySubscriber 对象,key 为 subscriber 对象,value 为该对象中所有的 Event 类型链表,用于判断某个对象是否注册过。
- 注释 3:新建了一个
stickyEvents 对象,专用于粘性事件处理,key 为事件的 Class 对象,value 为当前的事件。
- 注释 4:新建了三个不同类型的事件发送器:
mainThreadPoster:主线程事件发送器。
backgroundPoster:后台事件发送器,保证任一时间只且仅能有一个任务会被线程池执行。
asyncPoster:异步运行的,可以同时接收多个任务。
- 注释 5:新建了一个
subscriberMethodFinder 对象,这是从 EventBus 中抽离出的订阅方法查询的一个对象。
- 注释 6:从 builder 中取出了一个默认的线程池对象,由
Executors.newCachedThreadPool() 方法创建。
4. register 流程
接着我们查看 EventBus 的 register() 方法:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
在注释 1 处,根据当前注册类获取 subscriberMethods 这个订阅方法列表。在注释 2 处,使用了增强 for 循环令 subscriber 对象对 subscriberMethods 中每个 SubscriberMethod 进行订阅。
findSubscriberMethods
接着我们查看 SubscriberMethodFinder 的 findSubscriberMethods() 方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
注释 2 处,先详细说说这个 ignoreGeneratedIndex 字段,它用来判断是否使用生成的 APT 代码去优化寻找接收事件的过程。如果我们没有在项目中接入 EventBus 的 APT,那么可以将 ignoreGeneratedIndex 字段设为 false 以提高性能。这里 ignoreGeneratedIndex 默认为 false,所以会执行 findUsingInfo() 方法。
findUsingInfo
接着我们查看 SubscriberMethodFinder 的 findUsingInfo() 方法:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
在注释 1 处,调用了 prepareFindState() 方法创建了一个新的 FindState 类。它是 SubscriberMethodFinder 的内部类,主要做初始化、回收对象等工作。这里使用了对象池模式,先从 FIND_STATE_POOL 池中取出可用的 FindState,如果没有的话,则直接新建一个新的 FindState 对象。
在注释 3 处,如果找不到预生成的索引信息,则回退到反射查找。这个方法通过反射的方式获取订阅者类中的所有声明方法,然后在这些方法里面寻找以 @Subscribe 作为注解的方法进行处理。在经过一轮检查后,将方法名、threadMode、优先级、是否为 sticky 方法等信息封装到 SubscriberMethod 对象中,最后添加到 subscriberMethods 列表中。
subscribe
最后,在 EventBus 的 register() 方法的最后会调用 subscribe 方法:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
(Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
(eventType.isAssignableFrom(candidateEventType)) {
entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} {
stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
- 注释 1:根据
eventType 在 subscriptionsByEventType 去查找一个 CopyOnWriteArrayList,如果没有则创建一个新的。
- 注释 2:添加
newSubscription 对象,按照优先级添加。优先级越高,会插到在当前 List 靠前面的位置。
- 注释 3:对
typesBySubscriber 进行添加,用于判断该 Subscriber 对象是否已被注册过。
- 注释 4:判断是否是 sticky 事件。如果是 sticky 事件的话,会调用
checkPostStickyEventToSubscription() 方法。
四、源码分析:post
1. post 入口
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
注释 1 处,currentPostingThreadState 是一个 ThreadLocal 类型的对象,里面存储了 PostingThreadState。接着把传入的 event,保存到了当前线程中的一个变量 PostingThreadState 的 eventQueue 中。在注释 2 处,最后调用了 postSingleEvent() 方法。
2. postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |=
postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
}
}
- 注释 1:判断
eventInheritance 标志位,默认为 true。如果设为 true,会在发射事件的时候判断是否需要发射父类事件,设为 false 能够提高一些性能。
- 注释 2:调用
lookupAllEventTypes() 方法,取出 Event 及其父类和接口的 class 列表。
- 注释 3:调用
postSingleEventForEventType() 方法。
3. postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
这里直接根据 Event 类型从 subscriptionsByEventType 中取出对应的 subscriptions 对象,最后调用了 postToSubscription() 方法。
4. postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
从上面可以看出,这里通过 threadMode 来判断在哪个线程中去执行方法:
- POSTING:执行
invokeSubscriber() 方法,内部直接采用反射调用。
- MAIN:首先去判断当前是否在 UI 线程,如果是的话则直接反射调用,否则调用
mainThreadPoster 的 enqueue() 方法,即把当前的方法加入到队列之中,然后通过 handler 去发送一个消息,在 handler 的 handleMessage 中去执行方法。
- MAIN_ORDERED:与 MAIN 类似,不过是确保是顺序执行的。
- BACKGROUND:判断当前是否在 UI 线程,如果不是的话则直接反射调用,是的话通过
backgroundPoster 的 enqueue() 方法将方法加入到后台的一个队列,最后通过线程池去执行。注意,backgroundPoster 在 Executor 的 execute() 方法上添加了 synchronized 关键字并设立了一个控制标记 flag,保证任一时间只且仅能有一个任务会被线程池执行。
- ASYNC:逻辑实现类似于 BACKGROUND,将任务加入到后台的一个队列,最终由 EventBus 中的一个线程池去调用。不同于
backgroundPoster 的保证任一时间只且仅能有一个任务会被线程池执行的特性,这里 asyncPoster 则是异步运行的,可以同时接收多个任务。
五、源码分析:unregister
public synchronized void unregister(Object subscriber) {
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
for (Class<?> eventType : subscribedTypes) {
unsubscribeByEventType(subscriber, eventType);
}
typesBySubscriber.remove(subscriber);
}
}
- 注释 1:
unsubscribeByEventType() 方法中对 subscriptionsByEventType 移除了该 subscriber 的所有订阅信息。
- 注释 2:移除了注册对象和其对应的所有 Event 事件链表。
六、粘性事件处理
如果想要发射 sticky 事件需要通过 EventBus 的 postSticky() 方法,内部源码如下所示:
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
在注释 1 处,先将该事件放入 stickyEvents 中,接着在注释 2 处使用 post() 发送事件。
在 register() 方法的最后部分,有关粘性事件的源码如下:
if (subscriberMethod.sticky) {
Object stickyEvent = stickyEvents.get(eventType);
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
可以看到,在这里会判断当前事件是否是 sticky 事件,如果是,则从 stickyEvents 中拿出该事件并执行 postToSubscription() 方法。
七、最佳实践与总结
1. 内存泄漏防护
EventBus 最核心的风险在于内存泄漏。由于 register 是强引用,如果 Activity 或 Fragment 销毁时未调用 unregister,会导致持有该对象的 EventBus 实例无法被 GC 回收。因此,务必在 onDestroy 或 onStop 中取消注册。
2. 线程模式选择
- POSTING:默认模式,在发送事件的线程执行。适用于轻量级操作。
- MAIN:在主线程执行。适用于更新 UI,但需注意不要在主线程阻塞太久。
- BACKGROUND:在后台线程执行。适用于耗时操作,避免 ANR。
- ASYNC:完全异步。适用于需要独立线程处理的复杂逻辑。
3. Sticky 事件的使用
粘性事件适合用于传递状态数据(如用户登录状态)。但要注意,粘性事件会一直保存在内存中,直到手动清除或应用重启。频繁发送粘性事件会增加内存负担。
4. 总结
EventBus 的源码在 Android 主流三方库源码分析系列中算是比较简单的了。但是,它其中的一些思想和设计是值得借鉴的。比如它使用 FindState 复用池来复用 FindState 对象,在各处使用了 synchronized 关键字进行代码块同步的一些优化操作。其中上面分析了这么多,EventBus 最核心的逻辑就是利用了 subscriptionsByEventType 这个重要的列表,将订阅对象,即接收事件的方法存储在这个列表,发布事件的时候在列表中查询出相对应的方法并执行。
通过合理使用 EventBus,可以显著降低组件间的耦合度,提升代码的可维护性。但在实际开发中,也需警惕其带来的潜在问题,如内存泄漏和调试困难。建议结合项目实际情况,选择合适的通信方案。