UniRx 学习——操作符

参考:

PS:本文不会将所有的 Rx 操作符都做详细说明,重点在于总结 UniRxRx 文档 中的不同之处

创建操作

Return / Empty / Never / Throw

  • Return:创建一个发射指定值的 Observable,对应 Rx 文档Just

    • ReturnUnit 相当于 Observable.Return(Unit.Default)
  • Empty:创建一个不发射任何数据但是正常终止的 Observable

  • Never:创建一个不发射数据也不终止的 Observable

  • Throw:创建一个不发射数据以一个错误终止的 Observable

Create

使用一个函数从头开始创建一个 Observavle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.Create<int>(observer =>
{
try
{
observer.OnNext(1);
observer.OnNext(2);
observer.OnNext(3);

observer.OnCompleted();
}
catch (Exception e)
{
observer.OnError(e);
}

// Return disposable handler
return Disposable.Empty;
})
.Subscribe(x => Debug.Log("x = " + x));

需要返回 Disposable 对象:

  • 一般情况下,可以返回 Disposable.Empty 或者 null(内部自动使用Disposable.Empty)

  • 如果需要监听 disposed(),可以返回 Disposable.Create(()=>{})

  • 如果需要监听 disposed(),而且闭包需要访问外部变量,为了避免 closure capture(generated class),可以选择返回 Disposable.CreateWithState(state,_=>{})

CreateWithState

用于避免闭包捕获外部变量

CreateSafe

Safe 表示当 onNext 出现异常时,自动注销订阅

Defer

直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable

使用 Create VS Defer的简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private IObservable<int> subscribeValueByCreate()
{
return Observable.Create<int>(o =>
{
// 可能的耗时操作
_value = 1;
o.OnNext(_value);
o.OnCompleted();
return Disposable.Empty;
});
}

private IObservable<int> subscribeValueByDefer()
{
return Observable.Defer(() =>
{
// 可能的耗时操作
_value = 2;
return Observable.Return(_value);
});
}

详细分析可参考:【译】使用RxJava实现延迟订阅

From

将其它种类的对象和数据类型转换为 Observable

FromCoroutine —— 从协程创建 Observable

  • 内部 onNext() 只发射 Unit.Default,与 yield return 返回的值和类型都无关,默认在协程完成时触发一次,可传参设置每次调用 yield return 也都触发

  • 支持取消操作(向协程传递 CancellationToken 对象)

  • 支持向协程传递 IObserver 对象

FromCoroutineVlaue —— 从协程创建 Observable,并发射返回值

  • 内部 onNext() 发射 yield return xxx 返回的值(注意:当调用yield return null 时,不会调用 onNext()???)

  • 支持取消操作(向协程传递 CancellationToken 对象)

FromMicroCoroutine —— 从微协程创建 Observable

补充:对 MicroCoroutine 的理解

  • 微协程只支持 yield return null

  • 微协程支持帧数类型:UpdateFixedUpdateEndOfFrame

  • 内部 onNext() 只发射 Unit.Default,默认在微协程完成时触发一次,可传参设置每次调用 yield return null 也都触发

  • 支持取消操作(向微协程传递 CancellationToken 对象)

  • 支持向微协程传递 IObserver 对象

FromAsyncPattern —— 从异步模式创建 Observable

异步编程模型(APM,Asynchronous Programming Model)不推荐使用???

FromEvent —— 从事件创建 Observable

基于事件的异步模式 (EAP,Event-based Asynchronous Pattern) 不推荐使用???

FromEventPattern —— 从事件模式创建 Observable

基于事件的异步模式 (EAP,Event-based Asynchronous Pattern) 不推荐使用???

Start

从委托创建 Observable

  • 支持的委托函数: Func<T> 或者 Action

  • 支持延时

  • 注意:Start 默认在异步线程执行

1
2
3
4
5
6
7
8
9
void Start()
{
Observable.Start(() => { Debug.Log("Start"); });
Observable.Start(() =>
{
Debug.Log("Start");
return 1;
});
}

Interval

创建一个按固定时间间隔发射整数序列的 Observable

IntervalFrame

创建一个按固定帧数间隔发射整数序列的 Observable

1
2
3
4
5
void Start()
{
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(_ => { });
Observable.IntervalFrame(16, FrameCountType.FixedUpdate).Subscribe(_ => { });
}

Timer

创建一个 Observable,它在一个给定的延迟后发射一个特殊的值

1
2
3
4
void Start()
{
Observable.Timer(TimeSpan.FromSeconds(1)).Subscribe(l => Debug.Log(" l = " + l));
}

Range

创建一个发射特定整数序列的 Observable

1
2
3
4
void Start()
{
Observable.Range(1, 100).Subscribe(x => { });
}

Repeat

创建一个发射特定数据重复多次的 Observable

1
2
3
4
void Start()
{
Observable.Repeat("hello", 3).Subscribe(s => { });
}

RepeatInfinite

无限重复

RepeatSafe

Repeat 相同,但若连续出现 onComplete() 时,重复停止

变换操作

Select(对应 Rx 文档的 Map)

Observable 发射的每一项数据应用一个函数,执行变换操作

变换函数支持输出数据序号:

1
2
3
4
5
6
7
Observable.Range(1, 3)
.Select((x, i) =>
{
Debug.Log("i = " + i);// 序号
return x * i;
})
.Subscribe(x => Debug.Log("x = " + x));

SelectMany(对应 Rx 文档的 FlatMap)

将一个发射数据的 Observable 变换为多个 Observables,然后将它们发射的数据合并后放进一个单独的 Observable

注意:SelectMany 对这些 Observables 发射的数据做的是合并(merge)操作,因此它们可能是交错的

ContinueWith

将原 Observable 发射的最后一个值变换为一个新的 Observable,并发射新 Observable 的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.Range(0, 10)
.ContinueWith(i =>
{
Debug.Log("i = " + i);
return Observable.Range(i, 3);
})
.Subscribe(x =>
{
Debug.Log("x = " + x);
});

// 输出:
// i = 9
// x = 9,x = 10,x = 11

Buffer

定期收集 Observable 的数据放进一个数据包裹,然后发射这些数据包裹,而不是一次发射一个值

Pairwise

将原 Observable 发射的相邻的数据打包成一个 Pair 数据发射(只有 1 个数据时,不发射新数据)

GroupBy

将一个 Observable 分拆为一些 Observables 集合,它们中的每一个发射原始Observable 的一个子序列

Cast

将原始 Observable 发射的每一项数据都强制转换为一个指定的类型,然后再发射数据

PS:Cast 内部使用强转,失败会报异常

Scan

扫描:连续地对数据序列的每一项应用一个函数,然后连续发射结果

Aggregate

聚合:连续地对数据序列的每一项应用一个函数,然后只发射结果

过滤操作

Throttle(对应 Rx 文档的 Debounce)

仅在过了一段指定的时间还没发射数据时才发射一个数据

ThrottleFirst

定期发射这个时间段里源 Observable 发射的第一个数据

ThrottleFrame

仅在过了一段指定的帧数还没发射数据时才发射一个数据,可以指定帧数类型

ThrottleFirstFrame

定期发射指定的帧数时间段里源 Observable 发射的第一帧

Distinct

抑制(过滤掉)重复的数据项

DistinctUntilChanged

抑制(过滤掉)与直接前驱数据相同的数据项

Where(对应 Rx 文档的 Filter)

只发射通过了谓词测试的数据项

谓词函数支持输出数据序号

OfType

作用与 Cast 相似,但内部在强转前会用 is 先判断类型,会忽略不能转换类型的数据

First

只发射第一项(或者满足某个条件的第一项)数据

FirstOrDefault

只发射第一项数据,没有则发射 null

Single

如果原始 Observable 发射超过一个的数据,会抛出异常

SingleOrDefault

如果原始 Observable 发射超过一个的数据,会发射错误通知

IgnoreElements

不发射任何数据,只发射 Observable 的终止(或错误)通知

Last

只发射最后一项(或者满足某个条件的最后一项)数据

LastOfDefault

只发射最后一项(或者满足某个条件的最后一项)数据,没有则发射 null

Sample

采样:定期发射 Observable 最近发射的数据项

SampleFrame

帧采样

Skip

抑制 Observable 发射的前 N 项数据

SkipUntil

跳过源 Observable 发出的值,直到提供的 Observable 发出值

SkipWhile

跳过源 Observable 发出的值,直到提供的表达式结果为 false

Take

只发射前面的 N 项数据

TakeLast

只发射后面 N 项数据

TakeUntil

发射前面的 N 项数据,直到提供的 Observable 发出值

TakeWhile

发射前面的 N 项数据,直到提供的表达式结果为 false

TakeUntilDestroy

发射前面的 N 项数据,直到 Unity 组件被破环

TakeUntilDisable

发射前面的 N 项数据,直到 Unity 组件被禁用

结合操作

CombineLatest

当两个 Observables 中的任何一个发射了数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据

WithLatestFrom

类似于 CombineLatest,但是只在单个原始 Observable 发射了一条数据时才发射数据

WhenAll

// Observable.WhenAll is for parallel asynchronous operation

// (It’s like Observable.Zip but specialized for single async operations like Task.WhenAll)

只会将多个源 Observable 最后发射的数据组合成新数据并发射

Merge

合并多个 Observables 的发射物

使用 Merge 操作符你可以将多个 Observables 的输出合并,就好像它们是一个单个的 Observable 一样

Merge 可能会让合并的 Observables 发射的数据交错(有一个类似的操作符 Concat 不会让数据交错,它会按顺序一个接着一个发射多个 Observables 的发射物)

Concat

类似 Merge,但组合的数据不会发生交错

StartWith

在数据序列的开头插入一条指定的项

Switch

将一个发射多个 ObservablesObservable 转换成另一个单独的 Observable,后者发射那些 Observables 最近发射的数据项

注意:当原始 Observable 发射了一个新的 Observable 时(不是这个新的Observable 发射了一条数据时),它将取消订阅之前的那个 Observable。(简单理解:切换一个发数据的 Observable

Zip

通过一个函数将多个 Observables 的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项

只发射与发射数据项最少的那个 Observable 一样多的数据

ZipLatest

类似 Zip,不同的是:ZipLatest 会将多个源 Observables 最新发射的那个值
结合,这意味着如果某个源 Observable 此时已发射多个值也只会取最新值去结合,其余的抛弃

错误处理

Catch

onError 通知中恢复发射数据

Catch 操作符拦截原始 ObservableonError 通知,将它替换为其它的数据项或数据序列,让产生的 Observable 能够正常终止或者根本不终止

CatchIgnore

Catch 操作符拦截原始 ObservableonError 通知,将它替换为 Observable.Empty

Retry

如果原始 Observable 遇到错误,重新订阅它期望它能正常终止

Retry 操作符不会将原始 ObservableonError 通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列

Retry 总是传递 onNext 通知给观察者,由于重新订阅,可能会造成数据项重复

OnErrorRetry

Retry 类似,可进行 onError 处理

Finally

原始 Observable 正常完成或遇到错误都会调起闭包方法

辅助操作

Delay

延迟一段指定的时间再发射来自 Observable 的发射物

DelaySubscription

可以延迟订阅原始 Observable

DelayFrame

DelayFrameSubscription

Do

DoOnCancel

DoOnCompleted

DoOnError

DoOnSubscribe

DoOnTerminate

Materialize/Dematerialize

Materialize 将数据项和事件通知都当做数据项发射,Dematerialize 刚好相反

Materialize 将来自原始 Observable 的通知转换为 Notification 对象,然后它返回的 Observable 会发射这些数据

ObserveOn

指定一个观察者在哪个调度器上观察这个 Observable

ObserveOnMainThread

Subscribe

操作来自 Observable 的发射物和通知

SubscribeOn

指定Observable自身在哪个调度器上执行

TimeInterval

将一个发射数据的 Observable 转换为发射那些数据发射时间间隔的 Observable

FrameTimeInterval

Timeout

对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知

TimeoutFrame

Timestamp

Observable 发射的数据项附加一个时间戳

To

Observable 转换为另一个对象或数据结构

ToArray

ToList

ToAwaitableEnumerator

过时的,推荐使用 ToYieldInstruction

ToYieldInstruction

Observable 转为 Coroutine

条件和布尔操作

Amb

给定两个或多个 Observables,它只发射首先发射数据或通知的那个 Observable 的所有数据

DefaultIfEmpty

发射来自原始 Observable 的值,如果原始 Observable 没有发射任何值,就发射一个默认值