首页 >> 大全

rxjs-流式编程

2023-09-13 大全 28 作者:考证青年

订阅是一个表示一次性资源的对象,通常是一个可观察对象的执行。

它有一个重要的方法:,顾名思义。。。

比如的例子:

var observable = Rx.Observable.create(function (observer) {observer.next(1);observer.next(2);observer.next(3);setTimeout(() => {observer.next(4);observer.complete();}, 1000);
});
var observer={next:x=>console.log('Observer got a next value: ' + x),error: err => console.error('Observer got an error: ' + err),complete: () => console.log('Observer got a complete notification')
};
observable.subscribe(observer);
//返回
Observer got a next value: 1
Observer got a next value: 2
Observer got a next value: 3
Observer got a next value: 4 //after 1s return
Observer got a complete notification

如果在最后调用.();那么4就不会执行,也不会执行,就会取消掉这个观察。

是允许值被多播到多个观察者的一种特殊的。然而纯粹的可观察对象是单播的(每一个订阅的观察者拥有单独的可观察对象的执行)。

是对象,并且自带next,error,函数,所以我们不用在定义:

var subject = new Rx.Subject();subject.subscribe({next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({next: (v) => console.log('observerB: ' + v)
});subject.next(1);
subject.next(2);
//返回
observerA: 1
observerB: 1
observerA: 2
observerB: 2

由于自带next等等的函数,所以它也是个,也可以这样用:

var subject = new Rx.Subject();subject.subscribe({next: (v) => console.log('observerA: ' + v)
});
subject.subscribe({next: (v) => console.log('observerB: ' + v)
});var observable = Rx.Observable.from([1, 2, 3]);observable.subscribe(subject); // You can subscribe providing a Subject

操作

rx因为强大,我们可以流式的处理主要因为有在。

操作符是可观察对象上定义的方法,例如.map(...),.(...),.merge(...),等等。他们类似fp,返回新的而对象也会继承。

比如

Rx.Observable.interval(500).filter(x => x%2==1).subscribe( res => console.log(res) );
// 一秒输出一个数,返回单数。

这里的就是操作符,我们通过操作符来完成一系列的神奇操作。

调度

什么是调度者?调度者控制着何时启动一个订阅和何时通知被发送。

名称类型属性描述

queue

在当前事件帧中调度队列( 调度器)。迭代操作符使用此调度器。

asap

微任务队列上的调度, 使用尽可能快的转化机制, 或者是 Node.js 的 .(),或者是 Web 的消息通道,或者 , 或者其他。异步转化使用此调度器.

async

使用 调度工作。基于时间的操作符使用此调度器。

使用 e 调度工作。与平台的重绘同步使用此调度器。

var observable = Rx.Observable.create(function (observer) {observer.next(1);observer.next(2);observer.next(3);observer.complete();
})
.observeOn(Rx.Scheduler.async);console.log('just before subscribe');
observable.subscribe({next: x => console.log('got value ' + x),error: err => console.error('something wrong occurred: ' + err),complete: () => console.log('done'),
});
console.log('just after subscribe');//返回
just before subscribe
just after subscribe
got value 1
got value 2
got value 3
done

这是因为(Rx..async)在.和最终的之间引入了一个代理。

var proxyObserver = {next: (val) => {Rx.Scheduler.async.schedule((x) => finalObserver.next(x),0 /* delay */,val /* will be the x for the function above */);},// ...
}

使用rxjs 搜索功能


之前实现一个搜索效果,其实需要这样的代码,应用到函数节流还需要写为

  clearTimeout(timer);// 定时器,在 250 毫秒后触发timer = setTimeout(() => {console.log('发起请求..');},250)

还要考虑一种情况,如果我们搜索了a,然后马上改为了b,会返回a的结果,这样我们就需要判断一下:

    clearTimeout(timer)timer = setTimeout(() => {// 声明一个当前所搜的状态变量currentSearch = '书'; var searchText = e.target.value;$.ajax({url: `xx.com/${searchText}`,success: data => {// 判断后台返回的标志与我们存的当前搜索变量是否一致if (data.search === currentSearch) {// 渲染展示render(data);} else {// ..}}           });

这种代码其实就很杂乱了。

如果用rxjs,我们的代码能简单并且清楚很多:

var text = document.querySelector('#text');
var inputStream = Rx.Observable.fromEvent(text, 'keyup').debounceTime(250).pluck('target', 'value').switchMap(url => Http.get(url)).subscribe(data => render(data));

rxjs几个操作符

rxjs版的.all

const getPostOne$ = Rx.Observable.timer(1000).mapTo({id: 1});
const getPostTwo$ = Rx.Observable.timer(2000).mapTo({id: 2});Rx.Observable.forkJoin(getPostOne$, getPostTwo$).subscribe(res => console.log(res)) 
//返回
[ { id: 1 }, { id: 2 } ]

可以保存上一个值

Rx.Observable.fromEvent(document, 'scroll').map(e => window.pageYOffset).pairwise().subscribe(pair => console.log(pair)); // pair[1] - pair[0]

v2-c430821c80912f4e204dd93b214795ff_b.gif

合并两个流的值,并只发出最新的值

const clicks$ = Rx.Observable.fromEvent(document, 'click');
const innerObservable$ = Rx.Observable.interval(1000);clicks$.switchMap(event => innerObservable$).subscribe(val => console.log(val));

每次点击触发才发送值,并且点击之后重新发送,取消掉之前的值。如果是,则不取消之前的值。

返回

let source = Rx.Observable.of(42).toPromise();source.then((value) => console.log('Value: %s', value));
// => Value: 42

将 转化为 。

var result = Rx.Observable.fromPromise(fetch('http://myserver.com/'));
result.subscribe(x => console.log(x), e => console.error(e));

有了和相互转化的api,就很方便的用rx,ng2中内置rx,用着不爽就任意改成来写。

(: ):

发出源 发出的值,直到: 发出值。

rx.Observable.interval(1000).takeUntil(rx.Observable.fromEvent(document,'click'))

触发,然后每次点击停止触发。

所以它还有一个用法就是建立一个stop流,来避免手动调用。

   const data$ = this.getData();const cancelBtn = this.element.querySelector('.cancel-button');const rangeSelector = this.element.querySelector('.rangeSelector');const cancel$ = Observable.fromEvent(cancelBtn, 'click');const range$ = Observable.fromEvent(rangeSelector, 'change').map(e => e.target.value);const stop$ = Observable.merge(cancel$, range$.filter(x => x > 500))this.subscription = data$.takeUntil(stop$).subscribe(data => this.updateData(data));

rxjs在ng2 先提

继承自类,它储存着要发射给消费者的最新的值。

无论何时一个新的观察者订阅它,都会立即接受到这个来自的"当前值"。

比如

var subject = new Rx.BehaviorSubject(0); // 0 is the initial valuesubject.subscribe({next: (v) => console.log('observerA: ' + v)
});subject.next(1);
subject.next(2);subject.subscribe({next: (v) => console.log('observerB: ' + v)
});subject.next(3);//返回
observerA: 0
observerA: 1
observerA: 2
observerB: 2
observerA: 3
observerB: 3

每次next就传一个值,在里面写函数处理。

例子

我们有一个 table的例子来看。

代码看文最后

我们做的是一个table中的功能,类似find item by name。

一般的思路就是获取这个input的值,函数节流,在我们的table数据中这个name,然后给原来绑定的data赋值。

对于rx的写法就很清楚了。

 Observable.fromEvent(this.filter.nativeElement, 'keyup').debounceTime(150).distinctUntilChanged().subscribe(() => {if (!this.dataSource) { return; }this.dataSource.filter = this.filter.nativeElement.value;});

我们获取输入的值,节流,去重,赋值给this.,this.其实是的实例。

类是生成数据的类,可以忽略,是我们做处理的一个类,暴露了一个方法,返回的直接绑定table的data。

主要的处理在里:

export class ExampleDataSource extends DataSource {_filterChange = new BehaviorSubject('');get filter(): string { return this._filterChange.value; }set filter(filter: string) { this._filterChange.next(filter); }constructor(private _exampleDatabase: ExampleDatabase) {super();}/** Connect function called by the table to retrieve one stream containing the data to render. */connect(): Observable {const displayDataChanges = [this._exampleDatabase.dataChange,this._filterChange,];return Observable.merge(...displayDataChanges).map(() => {return this._exampleDatabase.data.slice().filter((item: UserData) => {let searchStr = (item.name + item.color).toLowerCase();return searchStr.indexOf(this.filter.toLowerCase()) != -1;});});}

我们设置了这个属性的get和set,每次我们按下按键,给this..赋值的时候,实际上,我们调用了的next方法,

发了一个事件。我们还需要merge一下.事件,为了当table数据改变的时候,我们能做出相应的处理。

然后就用map操作符,一下我们的data数据。给table数据绑定已经帮我们做了。

附文:

import {Component, ElementRef, ViewChild} from '@angular/core';
import {DataSource} from '@angular/cdk';
import {BehaviorSubject} from 'rxjs/BehaviorSubject';
import {Observable} from 'rxjs/Observable';
import 'rxjs/add/operator/startWith';
import 'rxjs/add/observable/merge';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/debounceTime';
import 'rxjs/add/operator/distinctUntilChanged';
import 'rxjs/add/observable/fromEvent';@Component({selector: 'table-filtering-example',styleUrls: ['table-filtering-example.css'],templateUrl: 'table-filtering-example.html',
})
export class TableFilteringExample {displayedColumns = ['userId', 'userName', 'progress', 'color'];exampleDatabase = new ExampleDatabase();dataSource: ExampleDataSource | null;@ViewChild('filter') filter: ElementRef;ngOnInit() {this.dataSource = new ExampleDataSource(this.exampleDatabase);Observable.fromEvent(this.filter.nativeElement, 'keyup').debounceTime(150).distinctUntilChanged().subscribe(() => {if (!this.dataSource) { return; }this.dataSource.filter = this.filter.nativeElement.value;});}
}/** Constants used to fill up our data base. */
const COLORS = ['maroon', 'red', 'orange', 'yellow', 'olive', 'green', 'purple','fuchsia', 'lime', 'teal', 'aqua', 'blue', 'navy', 'black', 'gray'];
const NAMES = ['Maia', 'Asher', 'Olivia', 'Atticus', 'Amelia', 'Jack','Charlotte', 'Theodore', 'Isla', 'Oliver', 'Isabella', 'Jasper','Cora', 'Levi', 'Violet', 'Arthur', 'Mia', 'Thomas', 'Elizabeth'];export interface UserData {id: string;name: string;progress: string;color: string;
}/** An example database that the data source uses to retrieve data for the table. */
export class ExampleDatabase {/** Stream that emits whenever the data has been modified. */dataChange: BehaviorSubject = new BehaviorSubject([]);get data(): UserData[] { return this.dataChange.value; }constructor() {// Fill up the database with 100 users.for (let i = 0; i < 100; i++) { this.addUser(); }}/** Adds a new user to the database. */addUser() {const copiedData = this.data.slice();copiedData.push(this.createNewUser());this.dataChange.next(copiedData);}/** Builds and returns a new User. */private createNewUser() {const name =NAMES[Math.round(Math.random() * (NAMES.length - 1))] + ' ' +NAMES[Math.round(Math.random() * (NAMES.length - 1))].charAt(0) + '.';return {id: (this.data.length + 1).toString(),name: name,progress: Math.round(Math.random() * 100).toString(),color: COLORS[Math.round(Math.random() * (COLORS.length - 1))]};}
}/*** Data source to provide what data should be rendered in the table. Note that the data source* can retrieve its data in any way. In this case, the data source is provided a reference* to a common data base, ExampleDatabase. It is not the data source's responsibility to manage* the underlying data. Instead, it only needs to take the data and send the table exactly what* should be rendered.*/
export class ExampleDataSource extends DataSource {_filterChange = new BehaviorSubject('');get filter(): string { return this._filterChange.value; }set filter(filter: string) { this._filterChange.next(filter); }constructor(private _exampleDatabase: ExampleDatabase) {super();}/** Connect function called by the table to retrieve one stream containing the data to render. */connect(): Observable {const displayDataChanges = [this._exampleDatabase.dataChange,this._filterChange,];return Observable.merge(...displayDataChanges).map(() => {return this._exampleDatabase.data.slice().filter((item: UserData) => {let searchStr = (item.name + item.color).toLowerCase();return searchStr.indexOf(this.filter.toLowerCase()) != -1;});});}disconnect() {}
}

关于我们

最火推荐

小编推荐

联系我们


版权声明:本站内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 88@qq.com 举报,一经查实,本站将立刻删除。备案号:桂ICP备2021009421号
Powered By Z-BlogPHP.
复制成功
微信号:
我知道了