参考:
PS:本文不会将所有的 Rx 操作符都做详细说明,重点在于总结 UniRx
与 Rx 文档
中的不同之处
创建操作
Return / Empty / Never / Throw
Return
:创建一个发射指定值的Observable
,对应Rx 文档
的Just
ReturnUnit
相当于Observable.Return(Unit.Default)
Empty
:创建一个不发射任何数据但是正常终止的Observable
Never
:创建一个不发射数据也不终止的Observable
Throw
:创建一个不发射数据以一个错误终止的Observable
Create
使用一个函数从头开始创建一个 Observavle
1 | Observable.Create<int>(observer => |
需要返回 Disposable
对象:
一般情况下,可以返回
Disposable.Empty
或者null
(内部自动使用Disposable.Empty
)如果需要监听
disposed()
,可以返回Disposable.Create(()=>{})
如果需要监听
disposed()
,而且闭包需要访问外部变量,为了避免 closure capture(generated class),可以选择返回Disposable.CreateWithState(state,_=>{})
CreateWithState
用于避免闭包捕获外部变量
CreateSafe
Safe
表示当 onNext
出现异常时,自动注销订阅
Defer
直到有观察者订阅时才创建 Observable
,并且为每个观察者创建一个新的 Observable
使用 Create
VS Defer
的简单例子:
1 | private IObservable<int> subscribeValueByCreate() |
详细分析可参考:【译】使用RxJava实现延迟订阅
From
将其它种类的对象和数据类型转换为 Observable
FromCoroutine
—— 从协程创建 Observable
内部
onNext()
只发射Unit.Default
,与yield return
返回的值和类型都无关,默认在协程完成时触发一次,可传参设置每次调用yield return
也都触发支持取消操作(向协程传递
CancellationToken
对象)支持向协程传递
IObserver
对象
FromCoroutineVlaue
—— 从协程创建 Observable
,并发射返回值
内部
onNext()
发射yield return xxx
返回的值(注意:当调用yield return null
时,不会调用onNext()
???)支持取消操作(向协程传递
CancellationToken
对象)
FromMicroCoroutine
—— 从微协程创建 Observable
微协程只支持
yield return null
微协程支持帧数类型:
Update
、FixedUpdate
和EndOfFrame
内部
onNext()
只发射Unit.Default
,默认在微协程完成时触发一次,可传参设置每次调用yield return null
也都触发支持取消操作(向微协程传递
CancellationToken
对象)支持向微协程传递
IObserver
对象
FromAsyncPattern
—— 从异步模式创建 Observable
异步编程模型(APM,Asynchronous Programming Model)不推荐使用???
FromEvent
—— 从事件创建 Observable
基于事件的异步模式 (EAP,Event-based Asynchronous Pattern) 不推荐使用???
FromEventPattern
—— 从事件模式创建 Observable
基于事件的异步模式 (EAP,Event-based Asynchronous Pattern) 不推荐使用???
Start
从委托创建 Observable
支持的委托函数:
Func<T>
或者Action
支持延时
注意:
Start
默认在异步线程执行
1 | void Start() |
Interval
创建一个按固定时间间隔发射整数序列的 Observable
IntervalFrame
创建一个按固定帧数间隔发射整数序列的 Observable
1 | void Start() |
Timer
创建一个 Observable
,它在一个给定的延迟后发射一个特殊的值
1 | void Start() |
Range
创建一个发射特定整数序列的 Observable
1 | void Start() |
Repeat
创建一个发射特定数据重复多次的 Observable
1 | void Start() |
RepeatInfinite
无限重复
RepeatSafe
与 Repeat
相同,但若连续出现 onComplete()
时,重复停止
变换操作
Select(对应 Rx 文档的 Map)
对 Observable
发射的每一项数据应用一个函数,执行变换操作
变换函数支持输出数据序号:
1 | Observable.Range(1, 3) |
SelectMany(对应 Rx 文档的 FlatMap)
将一个发射数据的 Observable
变换为多个 Observables
,然后将它们发射的数据合并后放进一个单独的 Observable
注意:SelectMany
对这些 Observables
发射的数据做的是合并(merge)操作,因此它们可能是交错的
ContinueWith
将原 Observable
发射的最后一个值变换为一个新的 Observable
,并发射新 Observable
的数据
1 | Observable.Range(0, 10) |
Buffer
定期收集 Observable
的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值
Pairwise
将原 Observable
发射的相邻的数据打包成一个 Pair
数据发射(只有 1 个数据时,不发射新数据)
GroupBy
将一个 Observable
分拆为一些 Observables
集合,它们中的每一个发射原始Observable
的一个子序列
Cast
将原始 Observable
发射的每一项数据都强制转换为一个指定的类型,然后再发射数据
PS:Cast
内部使用强转,失败会报异常
Scan
扫描:连续地对数据序列的每一项应用一个函数,然后连续发射结果
Aggregate
聚合:连续地对数据序列的每一项应用一个函数,然后只发射结果
过滤操作
Throttle(对应 Rx 文档的 Debounce)
仅在过了一段指定的时间还没发射数据时才发射一个数据
ThrottleFirst
定期发射这个时间段里源 Observable
发射的第一个数据
ThrottleFrame
仅在过了一段指定的帧数还没发射数据时才发射一个数据,可以指定帧数类型
ThrottleFirstFrame
定期发射指定的帧数时间段里源 Observable
发射的第一帧
Distinct
抑制(过滤掉)重复的数据项
DistinctUntilChanged
抑制(过滤掉)与直接前驱数据相同的数据项
Where(对应 Rx 文档的 Filter)
只发射通过了谓词测试的数据项
谓词函数支持输出数据序号
OfType
作用与 Cast
相似,但内部在强转前会用 is
先判断类型,会忽略不能转换类型的数据
First
只发射第一项(或者满足某个条件的第一项)数据
FirstOrDefault
只发射第一项数据,没有则发射 null
Single
如果原始 Observable
发射超过一个的数据,会抛出异常
SingleOrDefault
如果原始 Observable
发射超过一个的数据,会发射错误通知
IgnoreElements
不发射任何数据,只发射 Observable
的终止(或错误)通知
Last
只发射最后一项(或者满足某个条件的最后一项)数据
LastOfDefault
只发射最后一项(或者满足某个条件的最后一项)数据,没有则发射 null
Sample
采样:定期发射 Observable
最近发射的数据项
SampleFrame
帧采样
Skip
抑制 Observable
发射的前 N 项数据
SkipUntil
跳过源 Observable
发出的值,直到提供的 Observable
发出值
SkipWhile
跳过源 Observable
发出的值,直到提供的表达式结果为 false
Take
只发射前面的 N 项数据
TakeLast
只发射后面 N 项数据
TakeUntil
发射前面的 N 项数据,直到提供的 Observable
发出值
TakeWhile
发射前面的 N 项数据,直到提供的表达式结果为 false
TakeUntilDestroy
发射前面的 N 项数据,直到 Unity
组件被破环
TakeUntilDisable
发射前面的 N 项数据,直到 Unity
组件被禁用
结合操作
CombineLatest
当两个 Observables
中的任何一个发射了数据时,使用一个函数结合每个 Observable
发射的最近数据项,并且基于这个函数的结果发射数据
WithLatestFrom
类似于 CombineLatest
,但是只在单个原始 Observable 发射了一条数据时才发射数据
WhenAll
// Observable.WhenAll is for parallel asynchronous operation
// (It’s like Observable.Zip but specialized for single async operations like Task.WhenAll)
只会将多个源 Observable
最后发射的数据组合成新数据并发射
Merge
合并多个 Observables
的发射物
使用 Merge
操作符你可以将多个 Observables
的输出合并,就好像它们是一个单个的 Observable
一样
Merge
可能会让合并的 Observables
发射的数据交错(有一个类似的操作符 Concat
不会让数据交错,它会按顺序一个接着一个发射多个 Observables 的发射物)
Concat
类似 Merge
,但组合的数据不会发生交错
StartWith
在数据序列的开头插入一条指定的项
Switch
将一个发射多个 Observables
的 Observable
转换成另一个单独的 Observable
,后者发射那些 Observables
最近发射的数据项
注意:当原始 Observable
发射了一个新的 Observable
时(不是这个新的Observable
发射了一条数据时),它将取消订阅之前的那个 Observable
。(简单理解:切换一个发数据的 Observable
)
Zip
通过一个函数将多个 Observables 的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项
只发射与发射数据项最少的那个 Observable 一样多的数据
ZipLatest
类似 Zip
,不同的是:ZipLatest
会将多个源 Observables 最新发射的那个值
结合,这意味着如果某个源 Observable
此时已发射多个值也只会取最新值去结合,其余的抛弃
错误处理
Catch
从 onError
通知中恢复发射数据
Catch
操作符拦截原始 Observable
的 onError
通知,将它替换为其它的数据项或数据序列,让产生的 Observable
能够正常终止或者根本不终止
CatchIgnore
Catch
操作符拦截原始 Observable
的 onError
通知,将它替换为 Observable.Empty
Retry
如果原始 Observable
遇到错误,重新订阅它期望它能正常终止
Retry
操作符不会将原始 Observable
的 onError
通知传递给观察者,它会订阅这个Observable
,再给它一次机会无错误地完成它的数据序列
Retry
总是传递 onNext
通知给观察者,由于重新订阅,可能会造成数据项重复
OnErrorRetry
与 Retry
类似,可进行 onError
处理
Finally
原始 Observable
正常完成或遇到错误都会调起闭包方法
辅助操作
Delay
延迟一段指定的时间再发射来自 Observable
的发射物
DelaySubscription
可以延迟订阅原始 Observable
DelayFrame
DelayFrameSubscription
Do
DoOnCancel
DoOnCompleted
DoOnError
DoOnSubscribe
DoOnTerminate
Materialize/Dematerialize
Materialize
将数据项和事件通知都当做数据项发射,Dematerialize
刚好相反
Materialize
将来自原始 Observable
的通知转换为 Notification
对象,然后它返回的 Observable
会发射这些数据
ObserveOn
指定一个观察者在哪个调度器上观察这个 Observable
ObserveOnMainThread
Subscribe
操作来自 Observable
的发射物和通知
SubscribeOn
指定Observable自身在哪个调度器上执行
TimeInterval
将一个发射数据的 Observable
转换为发射那些数据发射时间间隔的 Observable
FrameTimeInterval
Timeout
对原始 Observable
的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知
TimeoutFrame
Timestamp
给 Observable
发射的数据项附加一个时间戳
To
将 Observable
转换为另一个对象或数据结构
ToArray
ToList
ToAwaitableEnumerator
过时的,推荐使用 ToYieldInstruction
ToYieldInstruction
将 Observable
转为 Coroutine
条件和布尔操作
Amb
给定两个或多个 Observables
,它只发射首先发射数据或通知的那个 Observable
的所有数据
DefaultIfEmpty
发射来自原始 Observable
的值,如果原始 Observable
没有发射任何值,就发射一个默认值