一个RxBus实现
Vimous · 2016-06-20 · Android
1. RxJava基本概念
Rxjava是一个响应式编程框架, 目的是通过观察者模式, 异步通知事件.
在RxJava的世界里,我们有四种角色:
+ Observable (可观察者,即被观察者)
+ Observer (观察者)
+ Subscriber (订阅)
+ Subjects
Observable和Subjects是两个“生产”实体,Observers和Subscribers是两个“消费”实体。
2. RxJava基本语法
1. 创建Observer(new OnClickListener())
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}
};
除了 Observer 接口之外,RxJava 还内置了一个实现了 Observer 的抽象类:Subscriber
Subscriber 相对于 Observer有增加了控制生命周期的方法.
针对Observer每个方法 都有对应的Action方法 : Action1<String>, Action1<Throwable>, Action0()
2. 创建 Observable(new Button())
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}
});
当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用 除了create()方法之外, 还有just(),from()等等快速创建Observable的方法
3.subscribe订阅 (Button.setOnClickListener(OnClickListener))
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
// 自动创建 Subscriber ,并使用 onNextAction 来定义 onNext()
observable.subscribe(onNextAction);
// 自动创建 Subscriber ,并使用 onNextAction 和 onErrorAction 来定义 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自动创建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 来定义 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
3.线程控制
1.Scheduler
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
- Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
4.RxBus
在Rxjava中 Subject即充当了Observable的角色又充当了Observer的角色, 因此我们可以设计一个单例模式, 通过Subject来实现一个RxBus.
public class RxBus {
private static final String TAG = RxBus.class.getSimpleName();
private static class RxBusHolder {
private static RxBus instance = new RxBus();
}
public static RxBus getInstance() {
return RxBusHolder.instance;
}
public static RxBus instance(){
return RxBusHolder.instance;
}
private Subject mSubject;
private Subject<RxEvent, RxEvent> mEventSubject;
private RxBus() {
mSubject = new SerializedSubject<>(PublishSubject.create());
mEventSubject = new SerializedSubject<>(PublishSubject.<RxEvent>create());
}
public <T> Observable toObservable(Class<T> eventType) {
return mSubject.ofType(eventType);
}
@SuppressWarnings("unchecked")
public void post(Object o) {
mSubject.onNext(o);
}
public Observable listen(final RxEvent event){
return mEventSubject.filter(new Func1<RxEvent, Boolean>() {
@Override
public Boolean call(RxEvent e) {
return e == event;
}
});
}
public void post(RxEvent event){
mEventSubject.onNext(event);
}
}
public interface RxEvent {
int EVENT_BASE = 1024;
int EVENT_BASE_EMPTY = EVENT_BASE + 1;
int getUniqueID();
}
测试代码如下:
//P1
public class P1 {
Subscription s1,s2;
@SuppressWarnings("unchecked")
public void listen(final String tag){
s1 = RxBus.getInstance().toObservable(M1.class)
.subscribe(new Action1<M1>() {
@Override
public void call(M1 o) {
Log.i(tag,o.getName());
}
});
s2 = RxBus.getInstance().toObservable(M2.class)
.subscribe(new Action1<M2>() {
@Override
public void call(M2 o) {
Log.i(tag,o.getName());
}
});
}
public void destroy(){
if(s1 != null && !s1.isUnsubscribed()) s1.unsubscribe();
if(s2 != null && !s2.isUnsubscribed()) s2.unsubscribe();
}
}
//M1
public class M1 {
public String getName(){
return "M1111111111111111";
}
}
//M2
public class M2 {
public String getName(){
return "M222222222222222";
}
}
//Activity
public class TestActivity extends BaseActivity {
P1 p1;
P2 p2;
public static void start(Context context) {
Intent starter = new Intent(context, TestActivity.class);
context.startActivity(starter);
}
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.ac_test);
p1 = new P1();
p1.listen("--->T1");
p2 = new P2();
p2.listen();
}
@Override
protected void onResume() {
super.onResume();
new Handler().postDelayed(new Runnable() {
@Override
public void run() {
RxBus.getInstance().post(new M1());
RxBus.getInstance().post(new M2());
}
}, 3000);
}
@Override
protected void onDestroy() {
super.onDestroy();
p1.destroy();
p2.destroy();
}
}