龙空技术网

Android 基于Jetpack LiveData实现消息总线

A闲暇视界 100

前言:

此刻兄弟们对“android获取html数据”可能比较注意,各位老铁们都想要分析一些“android获取html数据”的相关知识。那么小编同时在网上网罗了一些关于“android获取html数据””的相关资讯,希望兄弟们能喜欢,小伙伴们一起来了解一下吧!

morning

前面的文章介绍了LiveData的基本用法,本文来介绍一下LiveData的一个进阶用法 — 基于LiveData实现消息总线

消息总线

在Android开发中,跨页面传递数据(尤其是跨多个页面传递数据)是一个很常见的操作,可以通过Handler、接口回调等方式进行传递,但这几种方式都不太优雅,消息总线传递数据的方式相比更优雅。

消息总线最大的优势就是解耦,避免了类与类之间强耦合,通常消息总线有以下几种实现方式:

EventBus:RxBus : 基于RxJava实现的消息总线LiveDataBus:基于Jetpack中的LiveData实现,也是本文主要介绍的实现方式。EventBus

EventBus整体思想如下:

eventBus.png

EventBus基于发布/订阅模式,发布者和订阅者是一对多的关系,发布者只有一个,订阅者可以有多个,他们之间都是通过EventBus这个调度中心来进行数据处理与传递。其中发布者将数据传递到调度中心,然后调度中心会找到该发布者对应的订阅者,并将数据依次传递到订阅者,从而完成了数据的传递;如果没有订阅者,那么也就不会传递数据了。整个过程发布者和订阅者不需要知道彼此的存在,即数据传递过程是解耦的。

RxBus

RxBus本身是依赖RxJava的强大功能实现的。RxJava中有一个Subject,是一种特殊的存在,它既是Observable,又是Observer,可以将其看做一个桥梁或代理。Subject有以下四种:

AsyncSubject: 无论订阅发生在什么时候,Observer只会接收AsyncSubject发送的在onComplete()之前的最后一个数据,且onComplete()是必须要调用的。BehaviorSubject:Observer会先接收BehaviorSubject被订阅之前的最后一个事件,然后接收订阅之后发送的所有事件。PublishSubject: Observer只接收PublishSubject被订阅之后发送的事件。ReplaySubject:无论subscribe订阅是何时开始的,Observer会接收ReplaySubject发送的所有事件。

具体使用方式可以参考:RxJava中关于Subject和Processor的使用:

可以通过Subject来实现一个消息总线,因为不是本文的重点介绍,就不再贴代码了,可以自行搜索其实现方式。

LiveDataBus

LiveDataBus是基于LiveData实现的,上篇文章中详细介绍了其用法及优点:

确保界面符合数据状态

LiveData 遵循观察者模式。当数据发生变化时,LiveData 会通知 Observer 对象,那么Observer回调的方法中就可以进行UI更新,即数据驱动。不会发生内存泄漏

观察者会绑定到 Lifecycle 对象,并在其关联的生命周期遭到销毁(如Activity进入ONDESTROY状态)后进行自我清理。不会因 Activity 停止而导致崩溃

如果观察者的生命周期处于非活跃状态(如返回栈中的 Activity),则它不会接收任何 LiveData 事件。不再需要手动处理生命周期

界面组件只是观察相关数据,不会停止或恢复观察。LiveData 将自动管理所有这些操作,因为它在观察时可以感知相关的生命周期状态变化。数据始终保持最新状态

如果生命周期变为非活跃状态,它会在再次变为活跃状态时接收最新的数据。例如,曾经在后台的 Activity 会在返回前台后立即接收最新的数据。配置更改时自动保存数据

如果由于配置更改(如设备旋转)而重新创建了 Activity 或 Fragment,它会立即接收最新的可用数据。共享资源

使用单例模式扩展 LiveData 对象以封装系统服务,以便在应用中共享它们。LiveData 对象连接到系统服务一次,然后需要相应资源的任何观察者只需观察 LiveData 对象。原理消息:发布者发送,订阅者接收。消息可以是基本类型,也可以是自定义类型的消息。消息通道LiveData 扮演了消息通道的角色,不同的消息通道用不同的名字区分,名字是 String 类型的,可以通过名字获取到一个LiveData 消息通道。消息总线: 消息总线通过单例实现,不同的消息通道存放在一个HashMap 中。订阅:订阅者通过get() 获取消息通道,然后调用 observe() 订阅这个通道的消息。发布:发布者通过 get() 获取消息通道,然后调用 setValue()发布消息。

LiveDataBus.png

图片来源:[LiveData实现消息总线]:

LiveData实现消息总线的优势

相比于EventBusRxBus,使用LiveData实现消息总线有下面几个优势:

EventBus、RxBus、LiveDataBus都需要对事件进行注册、解注册。不同于EventBus、RxBus手动解注册,LiveData可以自动管理生命周期,所以也能实现自动解注册,避免忘记解注册而导致内存泄漏。LiveData实现简单,其为Jetpack中重要的一员,且为官方推出,支持更好LiveData相比于EventBus、RxBus,类更少,包更小。LiveData实现消息总线存在的隐患LiveData默认是粘性消息

LiveData发送的消息为粘性消息,即先发布后订阅也能收到消息,再把订阅observe()的逻辑贴出来:

@MainThreadpublic void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {    assertMainThread("observe");    if (owner.getLifecycle().getCurrentState() == DESTROYED) {        //如果当前观察者处于DESTROYED状态,直接返回        return;    }    //将LifecycleOwner、Observer包装成LifecycleBoundObserver    LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);    //ObserverWrapper是LifecycleBoundObserver的父类    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);    //如果mObservers中存在该Observer且跟传进来的LifecycleOwner不同,直接抛异常,一个Observer只能对应一个LifecycleOwner    if (existing != null && !existing.isAttachedTo(owner)) {        throw new IllegalArgumentException("Cannot add the same observer"                + " with different lifecycles");    }    //如果已经存在Observer且跟传进来的LifecycleOwner是同一个,直接返回    if (existing != null) {        return;    }    //通过Lifecycle添加观察者    owner.getLifecycle().addObserver(wrapper);}

最后执行addObserver()后,内部通过LifecycleRegistry添加Observer,进而会执行到onStateChanged()方法,该方法辗转又调用到dispatchingValue方法(setValue/postValue最终也会调用到该方法),接着会调用到我们最关心的considerNotify():

    void dispatchingValue(@Nullable ObserverWrapper initiator) {        if (mDispatchingValue) {            mDispatchInvalidated = true;            return;        }        mDispatchingValue = true;        do {            mDispatchInvalidated = false;            if (initiator != null) {                //2、通过observe()的方式会调用这里                considerNotify(initiator);                initiator = null;            } else {                //1、通过setValue/postValue的方式会调用这里,遍历所有观察者并进行分发                for (Iterator<Map.Entry<Observer<? super T>, ObserverWrapper>> iterator =                        mObservers.iteratorWithAdditions(); iterator.hasNext(); ) {                    considerNotify(iterator.next().getValue());                    if (mDispatchInvalidated) {                        break;                    }                }            }        } while (mDispatchInvalidated);        mDispatchingValue = false;    }    private void considerNotify(ObserverWrapper observer) {        if (!observer.mActive) {            //观察者不在活跃状态 直接返回            return;        }        //如果是observe(),则是在STARTED、RESUMED状态时活跃;如果是ObserveForever(),则认为一直是活跃状态        if (!observer.shouldBeActive()) {            observer.activeStateChanged(false);            return;        }        //Observer中的Version必须小于LiveData中的Version,防止重复发送        if (observer.mLastVersion >= mVersion) {            return;        }        observer.mLastVersion = mVersion;        //回调Observer的onChange方法并接收数据        observer.mObserver.onChanged((T) mData);    }

可以看到considerNotify()里有这么一个逻辑:

if (observer.mLastVersion >= mVersion) {   return;   }

mVersion代表版本号,发送方、订阅方都有这个变量,默认是-1。发送方每发送一个消息,mVersion都会进行+1操作;而Observer中的mVersion每成功接收一次消息,都会将发送方最新的version赋值给自己的mLastVersion,当Observer中的mLastVersion>=发送方mVersion时,Observer会拒绝接收消息,防止重复发送消息。

所以,如果当发送方之前的mVersion不是默认值-1,说明LiveData发送过消息。如果此时执行LiveData.observe(),因为Observer中的mLastVersion为默认值-1,小于发送方的mVersion,所以该消息不会被拦截,Observer一定可以拿到之前发送的消息,即粘性消息。

LiveData.postValue可能会丢失消息

当频繁使用LiveData.postValue发送多个消息时,LiveData.observe()接收消息时可能会发生丢失,为什么会这样呢?来看postValue()的内部实现

//LiveData.java//postValue发送数据,可以在子线程中使用protected void postValue(T value) {    boolean postTask;    synchronized (mDataLock) {       //mPendingData默认值是NOT_SET,第一次发送时postTask是true       postTask = mPendingData == NOT_SET;       //将发送的值赋值给mPendingData       mPendingData = value;    }    //第一次发送时postTask是true,当第一个消息还未处理时,后面再发送消息时postTask会变成false,所以后面的消息都会被拦截,但是发送的值可以更新到第一次发送里里面    if (!postTask) {       return;    }    ArchTaskExecutor.getInstance().postToMainThread(mPostValueRunnable); }//setValue发送数据,只能在主线程中使用protected void setValue(T value) {    assertMainThread("setValue");    mVersion++;    mData = value;    dispatchingValue(null);  }private final Runnable mPostValueRunnable = new Runnable() {        @SuppressWarnings("unchecked")        @Override        public void run() {            Object newValue;            synchronized (mDataLock) {                //将mPendingData中的值通过setValue传给Observer,并将自身格式化为NOT_SET                newValue = mPendingData;                mPendingData = NOT_SET;            }            setValue((T) newValue);     } };

详细的过程写在注释中了,主要的原因就是postValue发送消息时,会判断之前的消息是否已经处理,如果还未处理,会将当前发送的最新值更新到之前的消息中去(之前的消息存在mPendingData中,直接更新之),所以当多次频繁使用postValue发送消息时,Observer收到的为最后一次发送的最新值。个人猜测官方这么实现的目的主要是LiveDataMVVM架构中使用,既主要为了更新UI的最新数据即可,但是当用LiveData实现的消息总线时,可能就会出现丢失消息的隐患了,这是我们不想看到的,那么怎么解决呢?放弃使用postValue,都通过setValue去发送消息,如果是在子线程中发送消息,自行构建Handler发送到主线程中即可,后续贴代码。

解决方案支持粘性、非粘性消息

因为LiveData默认即是粘性消息,我们只需要添加非粘性消息支持即可,LiveDatamVersion默认是private的,如果想在其他类中使用,可以通过反射获取,但是效率相对低;还可以通过androidx.lifecycle包名来避免反射获取LiveData.mVersion,代码如下:

//package androidx.lifecycleopen class ExternalLiveData<T> : MutableLiveData<T>() {    companion object {        //通过androidx.lifecycle包名来避免反射获取LiveData.START_VERSION        const val START_VERSION = LiveData.START_VERSION    }    override fun observe(owner: LifecycleOwner, observer: Observer<in T>) {        if (owner.lifecycle.currentState == Lifecycle.State.DESTROYED) {            // ignore            return        }        try {            val wrapper = ExternalLifecycleBoundObserver(owner, observer)            val existing =                callMethodPutIfAbsent(observer, wrapper) as? LiveData<*>.LifecycleBoundObserver            require(!(existing != null && !existing.isAttachedTo(owner))) {                ("Cannot add the same observer" + " with different lifecycles")            }            if (existing != null) return            owner.lifecycle.addObserver(wrapper)        } catch (e: Exception) {            //ignore        }    }    //继承父类并将修饰符改为public,可以对外暴露    public override fun getVersion(): Int {        return super.getVersion()    }    internal inner class ExternalLifecycleBoundObserver(        owner: LifecycleOwner,        observer: Observer<in T>?    ) : LifecycleBoundObserver(owner, observer) {        override fun shouldBeActive(): Boolean {            return mOwner.lifecycle.currentState.isAtLeast(observerActiveLevel())        }    }    /**     * @return Lifecycle.State     */    protected open fun observerActiveLevel(): Lifecycle.State {        return Lifecycle.State.STARTED    }    //反射获取LiveData.mObservers    private val fieldObservers: Any        get() {            val fieldObservers = LiveData::class.java.getDeclaredField("mObservers")            fieldObservers.isAccessible = true            return fieldObservers        }    /**     * 反射调用LiveData的putIfAbsent方法     */    private fun callMethodPutIfAbsent(observer: Any, wrapper: Any): Any? {        val mObservers = fieldObservers.javaClass        val putIfAbsent =            mObservers.getDeclaredMethod("putIfAbsent", Any::class.java, Any::class.java)        putIfAbsent.isAccessible = true        return putIfAbsent.invoke(mObservers, observer, wrapper)    }}

这样外面就可以使用mVersion了,整体思路是通过装饰者模式对Observer进行控制,如:

/** * Observer装饰者模式 */class ObserverWrapper<T>(        private val observer: Observer<T>,        var preventNextEvent: Boolean = false) : Observer<T> {    override fun onChanged(t: T) {        if (preventNextEvent) {            preventNextEvent = false            return        }        observer.onChanged(t)    }}

非粘性消息

val observerWrapper = ObserverWrapper(observer)observerWrapper.preventNextEvent = liveData.version > ExternalLiveData.START_VERSIONliveData.observe(owner, observerWrapper)

liveData.version > ExternalLiveData.START_VERSION说明liveData里发送过消息,version值已经不是初始值,如果是后注册的观察者,observerWrapper.preventNextEvent返回的是true,即会屏蔽当前消息,观察者不执行;如果是先注册的观察者,则不受影响,这样就是实现了非粘性消息。

粘性消息

val observerWrapper = ObserverWrapper(observer)liveData.observe(owner, observerWrapper)

没什么可说的,默认就是粘性的,无需特殊处理。

支持子线程发送消息

判断是否在主线程:

object ThreadUtils {    /**     * 是否是在主线程     */    fun isMainThread(): Boolean {        return Looper.myLooper() == Looper.getMainLooper()    }}

发送消息时判断当前所在线程:

private val mainHandler = Handler(Looper.getMainLooper())override fun post(value: T) {    if (ThreadUtils.isMainThread()) {        postInternal(value)    } else {        mainHandler.post(PostValueTask(value))    }}@MainThreadprivate fun postInternal(value: T) {    liveData.value = value}inner class PostValueTask(val newValue: T) : Runnable {    override fun run() {        postInternal(newValue)    }}

post消息时,先判断当前所在线程,主线程的话直接发送,在子线程的话通过MainHandler将消息发送到主线程再发送,从而支持了在子线程发送消息。

参考

【1】

【2】文中代码主要来自

标签: #android获取html数据