catch

在 RxJS 中的 catch 能够回传一个 observable 来送出新的值

直接看🌰:

当错误发生后就会 catch 并重新处理一个新的 observable 我们可以利用这个新的 observable 来送出我们想送的值

1
2
3
4
5
6
7
8
9
10
11
const source = Rx.observable.from(['a', 'b', 'c', 'd', 2])
.zip(Rx.Observable.interval(500), (x, y) => x)
const example = source
.map(x => x.toUpperCase())
.catch(err => Rx.Observable.of('h'))
example.subscribe({
next: value => console.log(value),
error: err => console.log(err),
complete: _ => console.log('complete!')
})
// A B C D h complete!

也可以在遇到错误后, 让 observable 结束, 如下

1
2
3
4
5
6
const source = Rx.observable.from(['a', 'b', 2, 'd', 2])
.zip(Rx.Observable.interval(500), (x, y) => x)
const example = source
.map(x => x.toUpperCase())
.catch(err => Rx.Observable.empty())
// A B complete!

catch 的第二个参数接收的是当前的 observable

1
2
3
4
5
6
const source = Rx.observable.from(['a', 'b', 2, 'd', 2])
.zip(Rx.Observable.interval(500), (x, y) => x)
const example = source
.map(x => x.toUpperCase())
.catch((err, obs) => obs)
// A B A B A B A B...

这里可以看到我们直接回传了当前的 observable ,也就是 example 来重新执行

因为只是简单的示范, 所以这里会一直无限循环, 通常会用在断线重连的情景

上面的处理方式有一个简化的写法, 叫做 retry

retry

如果我们想要实现 当 observable 发生错误时重新尝试就可以用 retry 这个方法

1
2
3
4
5
6
7
8
9
const source = Rx.Observable.from(['a', 'b', 'c', 'd', 2])
.zip(Rx.Observable.interval(500), (x, y) => x)
const example = source.map(v => v.toUpperCase()).retry()
example.subscribe({
next: v => console.log(v),
error: err => console.log(err),
complete: () => console.log('complete')
})
// A B C D A B C D A B C D...

我们也可以设定只尝试几次, 譬如retry(1)

retry 很适合用在 HTTP request 失败的场景中, 我们可以设定重新发送几次后在展示错误信息

retryWhen

一般我们会使用 retryWhen 来做错误通知或者意外收集, 🌰如下:

1
2
3
4
5
const source = Rx.Observable.from(['a', 'b', 'c', 'd', 2])
.zip(Rx.Observable.interval(500), (x, y) => x)
const example = source.map(v => v.toUpperCase())
.retryWhen(errObs => errObs.map(err => fetch('...')))
// example 订阅部分省略..

这里的errObs => errObs.map(err => fetch('...'))可以把 errObs 里的每个错误变成 api 发送.

retryWhen 实际上是建立了一个 Subject 并把错误放入, 然后对这个 Subject 进行内部的订阅. 这个 Subject 预设是无限的, 如果我们把它结束,原本的 observable 也会跟着结束

repeat

repeat 的行为跟 retry 基本一致, 只是 retry 只有在例外发生时才触发

一个实例:

1
2
3
4
5
6
7
8
9
10
11
// 错误处理
const title = document.getElementById('title')
const source = Rx.Observable.from(['a','b','c','d',2])
.zip(Rx.Observable.interval(500), (x,y) => x)
.map(x => x.toUpperCase())
// 通常 source 会建立即时同步的连线, 像是 web socket
const example = source.catch((err, obs) => {
Rx.Observable.empty()
.startWith('连线发生错误: 5s 后重连')
.concat(obs.delay(5000))
})

利用 catch 返回一个新的 observable, 这个 observable 会先送出错误信息, 并且把原本的 observable 延迟5秒在做合并