一个RxBus实现


Vimous · 2016-06-20 · Android

1. RxJava基本概念

Rxjava是一个响应式编程框架, 目的是通过观察者模式, 异步通知事件.

在RxJava的世界里,我们有四种角色: + Observable (可观察者,即被观察者) + Observer (观察者) + Subscriber (订阅) + Subjects ObservableSubjects是两个“生产”实体,ObserversSubscribers是两个“消费”实体。

2. RxJava基本语法

1. 创建Observer(new OnClickListener())
Observer<String> observer = new Observer<String>() {
    @Override
    public void onNext(String s) {
        Log.d(tag, "Item: " + s);
    }

    @Override
    public void onCompleted() {
        Log.d(tag, "Completed!");
    }

    @Override
    public void onError(Throwable e) {
        Log.d(tag, "Error!");
    }
};

除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber Subscriber 相对于 Observer有增加了控制生命周期的方法. 针对Observer每个方法 都有对应的Action方法 : Action1<String>, Action1<Throwable>, Action0()

2. 创建 Observable(new Button())
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("Hi");
        subscriber.onNext("Aloha");
        subscriber.onCompleted();
    }
});

当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用 除了create()方法之外, 还有just(),from()等等快速创建Observable的方法

3.subscribe订阅 (Button.setOnClickListener(OnClickListener))
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);

3.线程控制

1.Scheduler
Observable.just(1, 2, 3, 4)
    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
    .subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer number) {
            Log.d(tag, "number:" + number);
        }
    });
  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

4.RxBus

在Rxjava中 Subject即充当了Observable的角色又充当了Observer的角色, 因此我们可以设计一个单例模式, 通过Subject来实现一个RxBus.

public class RxBus {
    private static final String TAG = RxBus.class.getSimpleName();

    private static class RxBusHolder {
        private static RxBus instance = new RxBus();
    }

    public static RxBus getInstance() {
        return RxBusHolder.instance;
    }

    public static RxBus instance(){
        return RxBusHolder.instance;
    }

    private Subject mSubject;
    private Subject<RxEvent, RxEvent> mEventSubject;

    private RxBus() {
        mSubject = new SerializedSubject<>(PublishSubject.create());
        mEventSubject = new SerializedSubject<>(PublishSubject.<RxEvent>create());
    }

    public <T> Observable toObservable(Class<T> eventType) {
        return mSubject.ofType(eventType);
    }

    @SuppressWarnings("unchecked")
    public void post(Object o) {
        mSubject.onNext(o);
    }

    public Observable listen(final RxEvent event){
        return mEventSubject.filter(new Func1<RxEvent, Boolean>() {
            @Override
            public Boolean call(RxEvent e) {
                return e == event;
            }
        });
    }

    public void post(RxEvent event){
        mEventSubject.onNext(event);
    }
}

public interface RxEvent {
    int EVENT_BASE = 1024;
    int EVENT_BASE_EMPTY = EVENT_BASE + 1;

    int getUniqueID();
}

测试代码如下:

//P1
public class P1  {
    Subscription s1,s2;

    @SuppressWarnings("unchecked")
    public void listen(final String tag){
        s1 = RxBus.getInstance().toObservable(M1.class)
                .subscribe(new Action1<M1>() {
                    @Override
                    public void call(M1 o) {
                        Log.i(tag,o.getName());
                    }
                });

        s2 = RxBus.getInstance().toObservable(M2.class)
                .subscribe(new Action1<M2>() {
                    @Override
                    public void call(M2 o) {
                        Log.i(tag,o.getName());
                    }
                });


    }

    public void destroy(){
        if(s1 != null && !s1.isUnsubscribed()) s1.unsubscribe();
        if(s2 != null && !s2.isUnsubscribed()) s2.unsubscribe();
    }
}

//M1
public class M1 {

    public String getName(){
        return "M1111111111111111";
    }
}

//M2
public class M2 {

    public String getName(){
        return "M222222222222222";
    }
}


//Activity
public class TestActivity extends BaseActivity {
    P1 p1;
    P2 p2;

    public static void start(Context context) {
        Intent starter = new Intent(context, TestActivity.class);
        context.startActivity(starter);
    }

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.ac_test);

        p1 = new P1();
        p1.listen("--->T1");
        p2 = new P2();
        p2.listen();
    }

    @Override
    protected void onResume() {
        super.onResume();

        new Handler().postDelayed(new Runnable() {
            @Override
            public void run() {
                RxBus.getInstance().post(new M1());
                RxBus.getInstance().post(new M2());
            }
        }, 3000);
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        p1.destroy();
        p2.destroy();
    }
}