首页 >> 大全

深入理解RxJava编程思想

2023-10-03 大全 24 作者:考证青年

深入理解编程思想 前言

在中,一个实现了接口的对象可以订阅()一个 类的实例。订阅者()对发射(emit)的任何数据或数据序列作出响应。这种模式简化了并发操作,因为它不需要阻塞等待发射数据,而是创建了一个处于待命状态的观察者哨兵,哨兵在未来某个时刻响应的通知。

为什么学习?

通过学习改变思维来提升效率

要想弄明白实现原理,就先学会怎么使用它。

应用场景 核心思想

是一种响应式编程思维 (起点--------->终点)

举一个生活中的例子:

起点(分发事件:我饿了)----------->下楼---------->去餐厅------------>点餐------------>终点(吃饭 消费事件)

程序中的例子:

起点(分发事件:点击登录)---------->登录API--------->请求服务器---------->获取响应码--------->终点(更新UI 登录成功 消费事件)

也就是说,我们的起点和终点始终是链接在一起的,没有被断掉,中间的过程是一环扣一环。

我们通过一个实现下载图片的功能的代码来比较传统思维和Rx思维

首先我们有一个需求:

网络获取图片显示在UI上。

传统的思维方式下载图片:

// 传统方式 思维 无法固定 (后面接手你写的项目,看不懂)
// A程序员:35356453 自己的思维 不同  封装方法....
// B程序员:46576576 自己的思维 不同  全部写在一起
// C程序员:43643654 自己的思维 不同  new Thread
// D程序员:66545655 自己的思维 不同  使用 线程池
// .....
// 零零散散 麻烦
public void downloadImageAction(View view) {progressDialog = new ProgressDialog(this);progressDialog.setTitle("下载图片中.....");progressDialog.show();new Thread(new Runnable() {@Overridepublic void run() {try {URL url = new URL(PATH);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setRequestMethod("GET");connection.setConnectTimeout(5000);connection.connect();int responseCode = connection.getResponseCode();Log.d(TAG, "apply: " + responseCode);if (responseCode == HttpURLConnection.HTTP_OK) {InputStream inputStream = connection.getInputStream();Bitmap bitmap = BitmapFactory.decodeStream(inputStream);Message message = handler.obtainMessage();message.obj = bitmap;handler.sendMessage(message);}} catch (Exception e) {e.printStackTrace();}}}).start();
}private final Handler handler = new Handler(new Handler.Callback() {@Overridepublic boolean handleMessage(@NonNull Message msg) {Bitmap bitmap = (Bitmap) msg.obj;imageView.setImageBitmap(bitmap);if (progressDialog != null) {progressDialog.dismiss();}return false;}
});

首先创建一个,把网络图片地址传进去,通过进行网络请求,将输入流转换成,通过将发送给主线程更新UI。这是我们用传统思维实现的功能。

接下来我们使用Rx思维:

我们首先画一个基本流程图来描述事件的流向:

接下来我们用代码实现:

public void rxJavaDownloadImageAction(View view) {//起点Observable.just(PATH) //内部会分发  //TODO 第二步//TODO 第三步  卡片式拦截  把String拦截成Bitmap.map(new Function<String, Bitmap>() {@Overridepublic Bitmap apply(String s) throws Exception {URL url = new URL(PATH);HttpURLConnection connection = (HttpURLConnection) url.openConnection();connection.setConnectTimeout(5000);int responseCode = connection.getResponseCode();Log.d(TAG, "apply: " + responseCode);if (responseCode == HttpURLConnection.HTTP_OK) {InputStream inputStream = connection.getInputStream();Bitmap bitmap = BitmapFactory.decodeStream(inputStream);return bitmap;}return null;}})//加水印.map(new Function<Bitmap, Bitmap>() {@Overridepublic Bitmap apply(Bitmap bitmap) throws Exception {Paint paint = new Paint();paint.setTextSize(88);paint.setColor(Color.RED);return drawTextToBitmap(bitmap, "同学们大家好", paint, 88, 88);}})//日志记录.map(new Function<Bitmap, Bitmap>() {@Overridepublic Bitmap apply(Bitmap bitmap) throws Exception {Log.d(TAG, "apply: 这个时候下载了图片啊: " + System.currentTimeMillis());return bitmap;}}).subscribeOn(Schedulers.io())  //给上面的代码分配异步线程.observeOn(AndroidSchedulers.mainThread())  // 给下面的代码执行主线程//订阅  起点和终点订阅起来.subscribe(//终点new Observer<Bitmap>() {//订阅开始@Overridepublic void onSubscribe(@io.reactivex.annotations.NonNull Disposable d) {//预备 开始要分发//TODO 第一步progressDialog = new ProgressDialog(DownloadActivity.this);progressDialog.setTitle("download run");progressDialog.show();}//TODO 第四步//拿到事件@Overridepublic void onNext(@io.reactivex.annotations.NonNull Bitmap bitmap) {imageView.setImageBitmap(bitmap);}//错误事件@Overridepublic void onError(@io.reactivex.annotations.NonNull Throwable e) {}//TODO 第五步//完成事件@Overridepublic void onComplete() {if (progressDialog != null)progressDialog.dismiss();}});
}//图片上绘制文字,添加水印
private final Bitmap drawTextToBitmap(Bitmap bitmap, String text, Paint paint, int paddingLeft, int paddingTop) {Bitmap.Config bitmapConfig = bitmap.getConfig();paint.setDither(true); //获取更清晰的图片采样paint.setFilterBitmap(true);// 过滤一些if (bitmapConfig == null) {bitmapConfig = Bitmap.Config.ARGB_8888;}bitmap = bitmap.copy(bitmapConfig, true);Canvas canvas = new Canvas(bitmap);canvas.drawText(text, paddingLeft, paddingTop, paint);return bitmap;}

我们使用了Rx编程思维后,从起点到终点这条线一直是连着的,没有断掉,中间需要加需求(加水印、日志记录等)我们可以进行卡片式拦截,把上游的拦截成我们需要的新的,再将新的游下去,直到终点显示UI。这种思维也叫链式思维。

配合

其实是一个管理者,控制请求网络,请求是通过,请求的结果丢给处理,拿到结果后通过起点流向下去,显示UI。

现在有一个需求:通过配合来获取 里面的项目总数据和Item数据。

同样我们先用流程图来理清思路:

现在用代码实现:

创建客户端API

public interface WanAndroidApi {// 总数据@GET("project/tree/json")Observable<ProjectBean> getProject();  // 异步线程 耗时操作// ITem数据@GET("project/list/{pageIndex}/json") // ?cid=294Observable<ProjectItemBean> getProjectItem(@Path("pageIndex") int pageIndex, @Query("cid") int cid);  // 异步线程 耗时操作
}

封装网络请求框架

public class HttpUtil {private static final String TAG = "HttpUtils";private static final String BASE_URL = "https://www.wanandroid.com/";/*** 根据各种配置创建出retrofit** @return  返回创建好的retrofit*/public static Retrofit getOnlineCookieRetrofit() {// OKHttp客户端OkHttpClient.Builder httpBuilder = new OkHttpClient.Builder();// 各种参数配置OkHttpClient okHttpClient = httpBuilder.addNetworkInterceptor(new StethoInterceptor()).readTimeout(10000, TimeUnit.SECONDS).connectTimeout(10000, TimeUnit.SECONDS).writeTimeout(10000, TimeUnit.SECONDS).build();return new Retrofit.Builder().baseUrl(BASE_URL)//TODO 请求用OKHttp.client(okHttpClient)//TODO 响应用RxJava//添加一个json解析的工具.addConverterFactory(GsonConverterFactory.create(new Gson()))//添加RxJava处理工具.addCallAdapterFactory(RxJava2CallAdapterFactory.create()).build();}
}

将服务器返回的JSON数据转换为

方式一:通过在线转化

方式二:使用 里面的插件进行转化

用来实现我们的数据流向

/*** TODO Retrofit+RxJava 查询 项目分类  (总数据查询)* @param view*/
@SuppressLint("CheckResult")
public void getProjectAction(View view) {//获取网络APIapi.getProject().subscribeOn(Schedulers.io()) //上面  异步.observeOn(AndroidSchedulers.mainThread()) //下面 主线程.subscribe(new Observer<ProjectBean>() {@Overridepublic void onSubscribe(Disposable d) {progressDialog = new ProgressDialog(UseActivity.this);progressDialog.setTitle("资源获取中....");progressDialog.show();}@Overridepublic void onNext(ProjectBean projectBean) {Log.d(TAG, "onNext: " + projectBean);textView.setText(projectBean.toString());}@Overridepublic void onError(Throwable e) {Log.d(TAG, "onError: " + e);}@Overridepublic void onComplete() {if (progressDialog != null) {progressDialog.dismiss();}}});
}/*** TODO Retrofit+RxJava 查询 项目列表数据  (Item数据查询)* @param view*/
@SuppressLint("CheckResult")
public void getProjectListAction(View view) {api.getProjectItem(3, 294).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<ProjectItemBean>() {@Overridepublic void accept(ProjectItemBean projectItemBean) throws Exception {Log.d(TAG, "accept: " + projectItemBean);}});
}

防抖+网络嵌套

我们先了解什么是抖动?

假如我有个,写一个自动化脚本来实现2s之内点击20次,那我就得请求20次网络,这样是很过分的,这就是抖动;如果我2s之内还是点击20次,可我只响应你一次,这种情况就是没有抖动。我们要做的事情就是防止抖动。

Rx家族成员有、RxJS、…,里面就有防抖的功能。

通过上面的查询我们得到了项目分类数据和项目列表数据,如果我们想通过项目id来查询项目列表数据呢?这时候我们就需要进行网络嵌套,如果我们用传统的网络嵌套,那么会造成代码堆叠。(负面教程,嵌套的太厉害了)

/*** RxBinding  防抖动  TODO  网络嵌套*/
@SuppressLint("CheckResult")
public void antiShakeActon() {//对哪个控件防抖动?Button bt_anti_shake = findViewById(R.id.bt_anti_shake);RxView.clicks(bt_anti_shake).throttleFirst(2000, TimeUnit.MICROSECONDS)  //2秒钟之内,响应一次.subscribe(new Consumer<Object>() {@Overridepublic void accept(Object o) throws Exception {api.getProject()  //查询主数据.compose(DownloadActivity.rxud()).subscribe(new Consumer<ProjectBean>() {@Overridepublic void accept(ProjectBean projectBean) throws Exception {for (ProjectBean.DataBean dataBean : projectBean.getData()) {api.getProjectItem(1, dataBean.getId()) //查询Item数据.compose(DownloadActivity.rxud()).subscribe(new Consumer<ProjectItemBean>() {@Overridepublic void accept(ProjectItemBean projectItemBean) throws Exception {Log.d(TAG, "accept: " + projectItemBean);}});}}});}});
}

这样,虽然我们实现了通过项目ID查询列表数据的功能,但是我们发现,如果我们嵌套层级太多,那我们的代码会形成一个阶梯状的局势,这样的代码是很不好看的,而且如果有问题,我们排查起来十分困难。难道没有其他办法了吗?不不不,相信我们Rx爸爸,早已为我们铺好了道路。

通过里面的就能愉快地解决网络嵌套的问题。

示意图:

这个示意图的意思就是:可以自己分发多个数据给下面,终点就得到发的多个数据。

熟悉了的原理后,我们用代码实现:

/*** TODO 功能防抖 + 网络嵌套 (解决嵌套问题) flatMap*/
@SuppressLint("CheckResult")
public void antiShakeActonUpdate() {Button bt_anti_shake = findViewById(R.id.bt_anti_shake);RxView.clicks(bt_anti_shake).throttleFirst(2000, TimeUnit.MICROSECONDS)//我只给下面切换异步线程.observeOn(Schedulers.io()).flatMap(new Function<Object, ObservableSource<ProjectBean>>() {@Overridepublic ObservableSource<ProjectBean> apply(Object o) throws Exception {return api.getProject(); //主数据}}).flatMap(new Function<ProjectBean, ObservableSource<ProjectBean.DataBean>>() {@Overridepublic ObservableSource<ProjectBean.DataBean> apply(ProjectBean projectBean) throws Exception {return Observable.fromIterable(projectBean.getData());  //我自己搞一个发射器,发送多次}}).flatMap(new Function<ProjectBean.DataBean, ObservableSource<ProjectItemBean>>() {@Overridepublic ObservableSource<ProjectItemBean> apply(ProjectBean.DataBean dataBean) throws Exception {return api.getProjectItem(1, dataBean.getId());  //Item数据}}).observeOn(AndroidSchedulers.mainThread())  //给下面切换主线程.subscribe(new Consumer<ProjectItemBean>() {@Overridepublic void accept(ProjectItemBean projectItemBean) throws Exception {Log.d(TAG, "accept-------->: " + projectItemBean);textView.setText(projectItemBean.toString());}});
}

运用

*  需求:
*  1.请求服务器注册操作
*  2.注册完成之后,更新注册UI
*  3.马上去登录服务器操作
*  4.登录完成之后,更新登录的UI

现在我们有这样一个需求,首先我们分开写:

/*** TODO 方式一  分开写* @param view*/
@SuppressLint("CheckResult")
public void request(View view) {// 1.请求服务器注册操作// 2.注册完成之后,更新注册UIMyRetrofit.createRetrofit().create(IRequestNetwork.class).registerAction(new RegisterRequest()).compose(DownloadActivity.rxud()).subscribe(new Consumer<RegisterResponse>() {@Overridepublic void accept(RegisterResponse registerResponse) throws Exception {// 更新注册UI}});// 3.请求服务器登录操作// 4.登录完成之后,更新登录UIMyRetrofit.createRetrofit().create(IRequestNetwork.class).loginAction(new LoginRequest()).compose(DownloadActivity.rxud()).subscribe(new Consumer<LoginResponse>() {@Overridepublic void accept(LoginResponse loginResponse) throws Exception {// 更新登录UI}});}

我们用来实现(一行代码)

/*** todo  方式二 doOnNext* @param view*/
@SuppressLint("CheckResult")
public void request2(View view) {MyRetrofit.createRetrofit().create(IRequestNetwork.class)// todo 1.请求服务器注册操作.registerAction(new RegisterRequest()).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).doOnNext(new Consumer<RegisterResponse>() {@Overridepublic void accept(RegisterResponse registerResponse) throws Exception {// todo 2.注册完成后,更新注册UI}})// todo 3.请求登录服务器操作.observeOn(Schedulers.io()).flatMap(new Function<RegisterResponse, ObservableSource<LoginResponse>>() {@Overridepublic ObservableSource<LoginResponse> apply(RegisterResponse registerResponse) throws Exception {return MyRetrofit.createRetrofit().create(IRequestNetwork.class).loginAction(new LoginRequest());}}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<LoginResponse>() {@Overridepublic void onSubscribe(Disposable d) {progressDialog = new ProgressDialog(RequestActivity.this);progressDialog.show();disposable = d;}@Overridepublic void onNext(LoginResponse loginResponse) {// todo 4.登录完成后,更新登录UI//}@Overridepublic void onError(Throwable e) {}@Overridepublic void onComplete() {//todo 5.杀青了if (progressDialog != null) {progressDialog.dismiss();}}});
}

为什么使用呢?因为返回的是对象,使得我们的链条没有被打断,我们可以继续向下流动,直到终点,如果我们刚开始使用,我们的链条就被打断了。

模式与原理 标准观察者与观察者 标准观察者设计模式

首先举一个生活中常见的例子:当今时代,几乎我们人人都会玩微信,而在微信里面,会有一个微信公众号可供我们订阅,当我们订阅了某一个微信公众号后,我们会不定期收到该公共号的推送消息。

在这个微信公众号服务中,我们用户充当了观察者的角色,而公众号充当了被观察者的角色,当被观察者发生改变时,观察者会立即收到被观察者的改变,一个被观察者可以同时拥有多个观察者,他们之间时一对多的关系。

我们用代码来实现这个例子:

首先抽象一个被观察者和观察者

// todo 抽象层 被观察者
public interface Observable {//关注void addObserver(Observer observer);//取消关注void removeObserver(Observer observer);//被观察者发出改变void notifyObservers();//微信公众号的服务  编辑部门  发布一条消息void pushMessage(String message);
}

// todo 抽象层 观察者
public interface Observer {void update(Object o);
}

实现被观察者(这里我们就指微信公众号)

// 被观察者 实现
public class WechatServerObservable implements Observable {private List<Observer> observers = new ArrayList<>();private final static String TAG = MainActivity.class.getSimpleName();private String message;@Overridepublic void addObserver(Observer observer) {observers.add(observer);}@Overridepublic void removeObserver(Observer observer) {observers.remove(observer);}@Overridepublic void notifyObservers() {//遍历容器for (Observer observer : observers) {observer.update(message);}}@Overridepublic void pushMessage(String message) {this.message = message;Log.d(TAG, "微信服务号更新了消息: " + message);notifyObservers();}
}

实现观察者(这里指用户)

//观察者 实现
public class UserPerson implements Observer {private static final String TAG = MainActivity.class.getSimpleName();private String name;private String message;public UserPerson(String name) {this.name = name;}@Overridepublic void update(Object o) {this.message = (String) o;readMessage();}public void readMessage() {Log.d(TAG, name + " 读取到消息: " + message);}
}

写测试程序

public static void test() {//编辑部,编辑好的文案内容String msg = "学习Android必须学习Kotlin,哈哈";//创建一个微信公众号(被观察者)Observable server = new WechatServerObservable();//创建两个用户(观察者)Observer zhangsan = new UserPerson("张三");Observer lisi = new UserPerson("李四");//关注微信公众号server.addObserver(zhangsan);server.addObserver(lisi);//lisi取消了关注server.removeObserver(lisi);//微信公众号推送消息server.pushMessage(msg);}

上面就是一个标准的观察者设计模式

Hook点

首先了解什么是Hook?

Hook 技术又叫做钩子函数,在系统没有调用该函数之前,钩子程序就先捕获该消息,钩子函数先得到控制权,这时钩子函数既可以加工处理(改变)该函数的执行行为,还可以强制结束消息的传递。简单来说,就是把系统的程序拉出来变成我们自己执行代码片段。

找到的Hook点

/*** todo RxJava Hook点*/
@SuppressLint("CheckResult")
public static void testHook() {Observable.create(new ObservableOnSubscribe<Object>() {@Overridepublic void subscribe(ObservableEmitter<Object> emitter) throws Exception {emitter.onNext("A");}}).map(new Function<Object, String>() {@Overridepublic String apply(Object o) throws Exception {return "null";}}).subscribe(new Consumer<String>() {@Overridepublic void accept(String s) throws Exception {}});
}

全局监听 hook

//Hook之前的监听
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {@Overridepublic Observable apply(Observable observable) throws Exception {Log.d(TAG, "apply: 整个项目全局监听 到底多少个地方使用RxJava:" + observable);return observable;  //todo 不破坏人家的功能}
});

结论:很多操作符,都会经过【】监听

的观察者模式 创建创建使用()订阅

首先我们分析的源码:

public interface Observer<T> {void onSubscribe(@NonNull Disposable d);  //一订阅就会执行void onNext(@NonNull T t);  //拿到上一个卡片流下来的数据void onError(@NonNull Throwable e);  //拿到上一个卡片流下来的错误数据void onComplete();  //事件结束}

的源码很简单,就是一个泛型接口,有4个函数,真正实现这个接口就在这里:

 new Observer<String>() {@Overridepublic void onSubscribe(@NonNull Disposable d) {}@Overridepublic void onNext(String s) {}@Overridepublic void onError(@NonNull Throwable e) {}@Overridepublic void onComplete() {}
}

接下来看创建过程,源码分析:

_深入理解RxJava编程思想_深入理解RxJava编程思想

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {ObjectHelper.requireNonNull(source, "source is null");return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

点进creat里面,看到creat传进去一个soure,这个我们称之为自定义,然后再将自定义丢进中,再进入看:

public final class ObservableCreate<T> extends Observable<T> {final ObservableOnSubscribe<T> source;public ObservableCreate(ObservableOnSubscribe<T> source) {this.source = source;}

这时候new 的时候,将soure = 自定义,所以最终得出结论:

new ObservableCreate() {souce = 自定义source}

再分析订阅过程,源码分析:

public final void subscribe(Observer<? super T> observer) {ObjectHelper.requireNonNull(observer, "observer is null");try {observer = RxJavaPlugins.onSubscribe(this, observer);ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");subscribeActual(observer);}

传进去一个,我们称之为自定义观察者,最终会把自定义观察者丢进中,而是一个抽象函数,实现这个抽象函数的一定是在中,因为是通过点出来的,我们再进入中看,这个函数把我们的自定义观察者包装成一个发射器

protected void subscribeActual(Observer<? super T> observer) {CreateEmitter<T> parent = new CreateEmitter<T>(observer);observer.onSubscribe(parent);try {source.subscribe(parent);} catch (Throwable ex) {Exceptions.throwIfFatal(ex);parent.onError(ex);}
}

这时候我们用图片来展示流程:

我们不难发现,整体是一个U型流程

与 订阅的过程时序图如下:

标准的观察者设计模式与观察者设计模式区别

从直观角度分析

在标准的观察者设计模式中:是一个“被观察者”,多个“观察者”,并且需要“被观察者”发出改变通知时,所有的“观察者”才能观察到。

在观察者设计模式中:是多个“被观察者”,一个“观察者”,并且需要起点和终点在”订阅“一次后,才发出改变通知,终点(观察者)才能观察到。

从模式角度分析

在标准的观察者设计模式中:当发出通知改变时,会遍历里面的容器,此容器里面有10个,就会通知10个

在观察者设计模式中:分发事件时,会拿到发射器,通过发射器关联到我们自定义的,发射器调用到我们自定义的

Map变换操作符原理

map示意图:

通过源码分析总结的流程图:

其实Map实现变换的本质就是不断进行封包裹和拆包裹,说白就是一个洋葱模型,具体实现变换的代码在类中的()函数中,通过.apply(t)将类型变换

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {.....@Overridepublic void onNext(T t) {.....try {v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");} catch (Throwable ex) {fail(ex);return;}downstream.onNext(v);}public interface Function<T, R> {/*** Apply some calculation to the input value and return some other value.* @param t the input value* @return the output value* @throws Exception on error*/R apply(@NonNull T t) throws Exception;
}

装饰模型

线程切换原理

给上面代码分配线程

首先分析.io()这个流程:

从源码看到,第一步过程很简单,就是将实例对象扔进线程池。

再分析()过程:

这时候我们发现在这个流程中,所有执行都在异步线程中,回到终点岂不是崩溃??所以应该在终点切换为主线程,那么继续看下一个线程切换:

给下面的代码切换主线程

首先分析.()的流程:

第一步流程就是创建了主线程的,把传进去。

接下来看() 的流程:

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了