转载:一缕殇流化隐半边冰霜
前言
ReactiveCocoa是一个将函数响应式编程范例带入Objective-C的开源库。ReactiveCocoa是由Josh Abernathy和Justin Spahr-Summers两位大神在对GitHub for Mac的开发过程中编写的。Justin Spahr-Summers大神在2011年11月13号下午12点35分进行的第一次提交,直到2013年2月13日上午3点05分发布了其1.0 release,达到了第一个重要里程碑。ReactiveCocoa社区也非常活跃,目前最新版已经完成了ReactiveCocoa 5.0.0-alpha.3,目前在5.0.0-alpha.4开发中。
ReactiveCocoa v2.5是公认的Objective-C最稳定的版本,因此被广大的以OC为主要语言的客户端选中使用。ReactiveCocoa v3.x主要是基于Swift 1.2的版本,而ReactiveCocoa v4.x主要基于Swift 2.x,ReactiveCocoa 5.0就全面支持Swift 3.0,也许还有以后的Swift 4.0。接下来几篇博客先以ReactiveCocoa v2.5版本为例子,分析一下OC版的RAC具体实现(也许分析完了RAC 5.0就到来了)。也算是写在ReactiveCocoa 5.0正式版到来前夕的祝福吧。
什么是ReactiveCocoa?
ReactiveCocoa(其简称为RAC)是由Github 开源的一个应用于iOS和OS X开发的新框架。RAC具有函数式编程(FP)和响应式编程(RP)的特性。它主要吸取了.Net的Reactive Extensions的设计和实现。
ReactiveCocoa的宗旨是Streams of values over time,随着时间变化而不断流动的数据流。
ReactiveCocoa主要解决了以下这些问题:
- UI数据绑定
UI控件通常需要绑定一个事件,RAC可以很方便的绑定任何数据流到控件上。
- 用户交互事件绑定
RAC为可交互的UI控件提供了一系列能发送Signal信号的方法。这些数据流会在用户交互中相互传递。
- 解决状态以及状态之间依赖过多的问题
有了RAC的绑定之后,可以不用在关心各种复杂的状态,isSelect,isFinish……也解决了这些状态在后期很难维护的问题。
- 消息传递机制的大统一
OC中编程原来消息传递机制有以下几种:Delegate,Block Callback,Target-Action,Timers,KVO,objc上有一篇关于OC中这5种消息传递方式改如何选择的文章Communication Patterns,推荐大家阅读。现在有了RAC之后,以上这5种方式都可以统一用RAC来处理。
RAC中的核心RACSignal
信号的订阅和发送
ReactiveCocoa中最核心的概念之一就是信号RACStream。RACStream中有两个子类——RACSignal和RACSequence。下面先来分析RACSignal。
我们会经常看到以下的代码:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
} error:^(NSError *error) {
NSLog(@"error: %@", error);
} completed:^{
NSLog(@"completed");
}];
[disposable dispose];
这是一个RACSignal被订阅的完整过程。被订阅的过程中,究竟发生了什么?
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
return [RACDynamicSignal createSignal:didSubscribe];
}
RACSignal调用createSignal的时候,会调用RACDynamicSignal的createSignal的方法。

RACDynamicSignal是RACSignal的子类。createSignal后面的参数是一个block。
(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe
block的返回值是RACDisposable类型,block名叫didSubscribe。block的唯一一个参数是id类型的subscriber,这个subscriber是必须遵循RACSubscriber协议的。
RACSubscriber是一个协议,其下有以下4个协议方法:
@protocol RACSubscriber <NSObject>
@required
- (void)sendNext:(id)value;
- (void)sendError:(NSError *)error;
- (void)sendCompleted;
- (void)didSubscribeWithDisposable:(RACCompoundDisposable *)disposable;
@end
所以新建Signal的任务就全部落在了RACSignal的子类RACDynamicSignal上了。
@interface RACDynamicSignal ()
// The block to invoke for each subscriber.
@property (nonatomic, copy, readonly) RACDisposable * (^didSubscribe)(id<RACSubscriber> subscriber);
@end
RACDynamicSignal这个类很简单,里面就保存了一个名字叫didSubscribe的block。
+ (RACSignal *)createSignal:(RACDisposable * (^)(id<RACSubscriber> subscriber))didSubscribe {
RACDynamicSignal *signal = [[self alloc] init];
signal->_didSubscribe = [didSubscribe copy];
return [signal setNameWithFormat:@"+createSignal:"];
}
这个方法中新建了一个RACDynamicSignal对象signal,并把传进来的didSubscribe这个block保存进刚刚新建对象signal里面的didSubscribe属性中。最后再给signal命名+createSignal:。
- (instancetype)setNameWithFormat:(NSString *)format, ... {
if (getenv("RAC_DEBUG_SIGNAL_NAMES") == NULL) return self;
NSCParameterAssert(format != nil);
va_list args;
va_start(args, format);
NSString *str = [[NSString alloc] initWithFormat:format arguments:args];
va_end(args);
self.name = str;
return self;
}
setNameWithFormat是RACStream里面的方法,由于RACDynamicSignal继承自RACSignal,所以它也能调用这个方法。
RACSignal的block就这样被保存起来了,那什么时候会被执行呢?
block闭包在订阅的时候才会被“释放”出来。
RACSignal调用subscribeNext方法,返回一个RACDisposable。
- (RACDisposable *)subscribeNext:(void (^)(id x))nextBlock error:(void (^)(NSError *error))errorBlock completed:(void (^)(void))completedBlock {
NSCParameterAssert(nextBlock != NULL);
NSCParameterAssert(errorBlock != NULL);
NSCParameterAssert(completedBlock != NULL);
RACSubscriber *o = [RACSubscriber subscriberWithNext:nextBlock error:errorBlock completed:completedBlock];
return [self subscribe:o];
}
在这个方法中会新建一个RACSubscriber对象,并传入nextBlock,errorBlock,completedBlock。
@interface RACSubscriber ()
// These callbacks should only be accessed while synchronized on self.
@property (nonatomic, copy) void (^next)(id value);
@property (nonatomic, copy) void (^error)(NSError *error);
@property (nonatomic, copy) void (^completed)(void);
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
@end
RACSubscriber这个类很简单,里面只有4个属性,分别是nextBlock,errorBlock,completedBlock和一个RACCompoundDisposable信号。
+ (instancetype)subscriberWithNext:(void (^)(id x))next error:(void (^)(NSError *error))error completed:(void (^)(void))completed {
RACSubscriber *subscriber = [[self alloc] init];
subscriber->_next = [next copy];
subscriber->_error = [error copy];
subscriber->_completed = [completed copy];
return subscriber;
}
subscriberWithNext方法把传入的3个block都保存分别保存到自己对应的block中。
RACSignal调用subscribeNext方法,最后return的时候,会调用[self subscribe:o],这里实际是调用了RACDynamicSignal类里面的subscribe方法。
- (RACDisposable *)subscribe:(id<RACSubscriber>)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
if (self.didSubscribe != NULL) {
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
}];
[disposable addDisposable:schedulingDisposable];
}
return disposable;
}
RACDisposable有3个子类,其中一个就是RACCompoundDisposable。

@interface RACCompoundDisposable : RACDisposable
+ (instancetype)compoundDisposable;
+ (instancetype)compoundDisposableWithDisposables:(NSArray *)disposables;
- (void)addDisposable:(RACDisposable *)disposable;
- (void)removeDisposable:(RACDisposable *)disposable;
@end
RACCompoundDisposable虽然是RACDisposable的子类,但是它里面可以加入多个RACDisposable对象,在必要的时候可以一口气都调用dispose方法来销毁信号。当RACCompoundDisposable对象被dispose的时候,也会自动dispose容器内的所有RACDisposable对象。
RACPassthroughSubscriber是一个私有的类。
RACPassthroughSubscriber类就只有这一个方法。目的就是为了把所有的信号事件从一个订阅者subscriber传递给另一个还没有disposed的订阅者subscriber。
RACPassthroughSubscriber类中保存了3个非常重要的对象,RACSubscriber,RACSignal,RACCompoundDisposable。RACSubscriber是待转发的信号的订阅者subscriber。RACCompoundDisposable是订阅者的销毁对象,一旦它被disposed了,innerSubscriber就再也接受不到事件流了。
这里需要注意的是内部还保存了一个RACSignal,并且它的属性是unsafe_unretained。这里和其他两个属性有区别, 其他两个属性都是strong的。这里之所以不是weak,是因为引用RACSignal仅仅只是一个DTrace probes动态跟踪技术的探针。如果设置成weak,会造成没必要的性能损失。所以这里仅仅是unsafe_unretained就够了。
- (instancetype)initWithSubscriber:(id<RACSubscriber>)subscriber signal:(RACSignal *)signal disposable:(RACCompoundDisposable *)disposable {
NSCParameterAssert(subscriber != nil);
self = [super init];
if (self == nil) return nil;
_innerSubscriber = subscriber;
_signal = signal;
_disposable = disposable;
[self.innerSubscriber didSubscribeWithDisposable:self.disposable];
return self;
}
回到RACDynamicSignal类里面的subscribe方法中,现在新建好了RACCompoundDisposable和RACPassthroughSubscriber对象了。
RACScheduler.subscriptionScheduler是一个全局的单例。
+ (instancetype)subscriptionScheduler {
static dispatch_once_t onceToken;
static RACScheduler *subscriptionScheduler;
dispatch_once(&onceToken, ^{
subscriptionScheduler = [[RACSubscriptionScheduler alloc] init];
});
return subscriptionScheduler;
}
RACScheduler再继续调用schedule方法。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
+ (BOOL)isOnMainThread {
return [NSOperationQueue.currentQueue isEqual:NSOperationQueue.mainQueue] || [NSThread isMainThread];
}
+ (instancetype)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
在取currentScheduler的过程中,会判断currentScheduler是否存在,和是否在主线程中。如果都没有,那么就会调用后台backgroundScheduler去执行schedule。
schedule的入参就是一个block,执行schedule的时候会去执行block。也就是会去执行:
RACDisposable *innerDisposable = self.didSubscribe(subscriber);
[disposable addDisposable:innerDisposable];
这两句关键的语句。之前信号里面保存的block就会在此处被“释放”执行。self.didSubscribe(subscriber)这一句就执行了信号保存的didSubscribe闭包。
在didSubscribe闭包中有sendNext,sendError,sendCompleted,执行这些语句会分别调用RACPassthroughSubscriber里面对应的方法。
- (void)sendNext:(id)value {
if (self.disposable.disposed) return;
if (RACSIGNAL_NEXT_ENABLED()) {
RACSIGNAL_NEXT(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString([value description]));
}
[self.innerSubscriber sendNext:value];
}
- (void)sendError:(NSError *)error {
if (self.disposable.disposed) return;
if (RACSIGNAL_ERROR_ENABLED()) {
RACSIGNAL_ERROR(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description), cleanedDTraceString(error.description));
}
[self.innerSubscriber sendError:error];
}
- (void)sendCompleted {
if (self.disposable.disposed) return;
if (RACSIGNAL_COMPLETED_ENABLED()) {
RACSIGNAL_COMPLETED(cleanedSignalDescription(self.signal), cleanedDTraceString(self.innerSubscriber.description));
}
[self.innerSubscriber sendCompleted];
}
这个时候的订阅者是RACPassthroughSubscriber。RACPassthroughSubscriber里面的innerSubscriber才是最终的实际订阅者,RACPassthroughSubscriber会把值再继续传递给innerSubscriber。
- (void)sendNext:(id)value {
@synchronized (self) {
void (^nextBlock)(id) = [self.next copy];
if (nextBlock == nil) return;
nextBlock(value);
}
}
- (void)sendError:(NSError *)e {
@synchronized (self) {
void (^errorBlock)(NSError *) = [self.error copy];
[self.disposable dispose];
if (errorBlock == nil) return;
errorBlock(e);
}
}
- (void)sendCompleted {
@synchronized (self) {
void (^completedBlock)(void) = [self.completed copy];
[self.disposable dispose];
if (completedBlock == nil) return;
completedBlock();
}
}
innerSubscriber是RACSubscriber,调用sendNext的时候会先把自己的self.next闭包copy一份,再调用,而且整个过程还是线程安全的,用@synchronized保护着。最终订阅者的闭包在这里被调用。
sendError和sendCompleted也都是同理。

- RACSignal调用subscribeNext方法,新建一个RACSubscriber。
- 新建的RACSubscriber会copy,nextBlock,errorBlock,completedBlock存在自己的属性变量中。
- RACSignal的子类RACDynamicSignal调用subscribe方法。
- 新建RACCompoundDisposable和RACPassthroughSubscriber对象。RACPassthroughSubscriber分别保存对RACSignal,RACSubscriber,RACCompoundDisposable的引用,注意对RACSignal的引用是unsafe_unretained的。
- RACDynamicSignal调用didSubscribe闭包。先调用RACPassthroughSubscriber的相应的sendNext,sendError,sendCompleted方法。
- RACPassthroughSubscriber再去调用self.innerSubscriber,即RACSubscriber的nextBlock,errorBlock,completedBlock。注意这里调用同样是先copy一份,再调用闭包执行。
基础操作
1. bind(变换)
概要:变换操作,可以对原始信号的值进行变换。
在RACSignal的源码里面包含了两个基本操作,concat和zipWith。不过在分析这两个操作之前,先来分析一下更加核心的一个函数,bind操作。
先来说说bind函数的作用:
- 会订阅原始的信号。
- 任何时刻原始信号发送一个值,都会在绑定的block转换一次。
- 一旦绑定的block转换了值变成信号,就立即订阅,并把值发给订阅者subscriber。
- 一旦绑定的block要终止绑定,原始的信号就complete。
- 当所有的信号都complete,发送completed信号给订阅者subscriber。
- 如果中途信号出现了任何error,都要把这个错误发送给subscriber
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != NULL);
/*
* -bind: should:
*
* 1. Subscribe to the original signal of values.
* 2. Any time the original signal sends a value, transform it using the binding block.
* 3. If the binding block returns a signal, subscribe to it, and pass all of its values through to the subscriber as they're received.
* 4. If the binding block asks the bind to terminate, complete the _original_ signal.
* 5. When _all_ signals complete, send completed to the subscriber.
*
* If any signal sends an error at any point, send that to the subscriber.
*/
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
BOOL removeDisposable = NO;
@synchronized (signals) {
[signals removeObject:signal];
if (signals.count == 0) {
[subscriber sendCompleted];
[compoundDisposable dispose];
} else {
removeDisposable = YES;
}
}
if (removeDisposable) [compoundDisposable removeDisposable:finishedDisposable];
};
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
@synchronized (signals) {
[signals addObject:signal];
}
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(signal, selfDisposable);
}
}];
selfDisposable.disposable = disposable;
};
@autoreleasepool {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
[compoundDisposable addDisposable:selfDisposable];
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// Manually check disposal to handle synchronous errors.
if (compoundDisposable.disposed) return;
BOOL stop = NO;
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
selfDisposable.disposable = bindingDisposable;
}
return compoundDisposable;
}] setNameWithFormat:@"[%@] -bind:", self.name];
}
为了弄清楚bind函数究竟做了什么,写出测试代码:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
return ^RACSignal *(NSNumber *value, BOOL *stop){
value = @(value.integerValue * 2);
return [RACSignal return:value];
};
}];
[bindSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];
由于前面第一章节详细讲解了RACSignal的创建和订阅的全过程,这个也为了方法讲解,创建RACDynamicSignal,RACCompoundDisposable,RACPassthroughSubscriber这些都略过,这里着重分析一下bind的各个闭包传递创建和订阅的过程。
为了防止接下来的分析会让读者看晕,这里先把要用到的block进行编号。
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
// block 1
}
RACSignal *bindSignal = [signal bind:^RACStreamBindBlock{
// block 2
return ^RACSignal *(NSNumber *value, BOOL *stop){
// block 3
};
}];
[bindSignal subscribeNext:^(id x) {
// block 4
}];
- (RACSignal *)bind:(RACStreamBindBlock (^)(void))block {
// block 5
return [[RACSignal createSignal:^(id subscriber) {
// block 6
RACStreamBindBlock bindingBlock = block();
NSMutableArray *signals = [NSMutableArray arrayWithObject:self];
void (^completeSignal)(RACSignal *, RACDisposable *) = ^(RACSignal *signal, RACDisposable *finishedDisposable) {
// block 7
};
void (^addSignal)(RACSignal *) = ^(RACSignal *signal) {
// block 8
RACDisposable *disposable = [signal subscribeNext:^(id x) {
// block 9
}];
};
@autoreleasepool {
RACDisposable *bindingDisposable = [self subscribeNext:^(id x) {
// block 10
id signal = bindingBlock(x, &stop);
@autoreleasepool {
if (signal != nil) addSignal(signal);
if (signal == nil || stop) {
[selfDisposable dispose];
completeSignal(self, selfDisposable);
}
}
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
@autoreleasepool {
completeSignal(self, selfDisposable);
}
}];
}
return compoundDisposable;
}] ;
}
-
先创建信号signal,didSubscribe把block1 copy保存起来。
-
当信号调用bind进行绑定,会调用block5,didSubscribe把block6 copy保存起来。
-
当订阅者开始订阅bindSignal的时候,流程如下:
-
bindSignal执行didSubscribe的block,即执行block6。
-
在block6 的第一句代码,就是调用RACStreamBindBlock bindingBlock = block(),这里的block是外面传进来的block2,于是开始调用block2。执行完block2,会返回一个RACStreamBindBlock的对象。
-
由于是signal调用的bind函数,所以bind函数里面的self就是signal,在bind内部订阅了signal的信号。subscribeNext所以会执行block1。
-
执行block1,sendNext调用订阅者subscriber的nextBlock,于是开始执行block10。
-
block10中会先调用bindingBlock,这个是之前调用block2的返回值,这个RACStreamBindBlock对象里面保存的是block3。所以开始调用block3。
-
在block3中入参是一个value,这个value是signal中sendNext中发出来的value的值,在block3中可以对value进行变换,变换完成后,返回一个新的信号signal’。
-
如果返回的signal’为空,则会调用completeSignal,即调用block7。block7中会发送sendCompleted。如果返回的signal’不为空,则会调用addSignal,即调用block8。block8中会继续订阅signal’。执行block9。
-
block9 中会sendNext,这里的subscriber是block6的入参,于是对subscriber调用sendNext,会调用到bindSignal的订阅者的block4中。
-
block9 中执行完sendNext,还会调用sendCompleted。这里的是在执行block9里面的completed闭包。completeSignal(signal, selfDisposable);然后又会调用completeSignal,即block7。
-
执行完block7,就完成了一次从signal 发送信号sendNext的全过程。
bind整个流程就完成了。
2. concat(组合)
概要:组合操作,先发送第一个信号,再发送第二个信号。
写出测试代码:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
[subscriber sendNext:@1];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *signals = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@6];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *concatSignal = [signal concat:signals];
[concatSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];
concat操作就是把两个信号合并起来。注意合并有先后顺序。

- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}
合并前,signal和signals分别都把各自的didSubscribe保存copy起来。
合并之后,合并之后新的信号的didSubscribe会把block保存copy起来。
当合并之后的信号被订阅的时候:
- 调用新的合并信号的didSubscribe。
- 由于是第一个信号调用的concat方法,所以block中的self是前一个信号signal。合并信号的didSubscribe会先订阅signal。
- 由于订阅了signal,于是开始执行signal的didSubscribe,sendNext,sendError。
- 当前一个信号signal发送sendCompleted之后,就会开始订阅后一个信号signals,调用signals的didSubscribe。
- 由于订阅了后一个信号,于是后一个信号signals开始发送sendNext,sendError,sendCompleted。
这样两个信号就前后有序的拼接到了一起。
这里有二点需要注意的是:
- 只有当第一个信号完成之后才能收到第二个信号的值,因为第二个信号是在第一个信号completed的闭包里面订阅的,所以第一个信号不结束,第二个信号也不会被订阅。
- 两个信号concat在一起之后,新的信号的结束信号在第二个信号结束的时候才结束。看上图描述,新的信号的发送长度等于前面两个信号长度之和,concat之后的新信号的结束信号也就是第二个信号的结束信号。
3. zipWith(拉链)
概要:拉链操作,两个信号从开始一一匹配,只发送匹配完成的数据。
写出测试代码:
RACSignal *signal = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose----");
}];
}];
RACSignal *signals = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
[subscriber sendNext:@"A"];
[subscriber sendNext:@"B"];
[subscriber sendNext:@"C"];
[subscriber sendNext:@"D"];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose++++");
}];
}];
RACSignal *zipSignal = [signal zipWith:signals];
[zipSignal subscribeNext:^(id x) {
NSLog(@"subscribe value = %@", x);
}];

- (RACSignal *)zipWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id subscriber) {
__block BOOL selfCompleted = NO;
NSMutableArray *selfValues = [NSMutableArray array];
__block BOOL otherCompleted = NO;
NSMutableArray *otherValues = [NSMutableArray array];
void (^sendCompletedIfNecessary)(void) = ^{
@synchronized (selfValues) {
BOOL selfEmpty = (selfCompleted && selfValues.count == 0);
BOOL otherEmpty = (otherCompleted && otherValues.count == 0);
// 如果任意一个信号完成并且数组里面空了,就整个信号算完成
if (selfEmpty || otherEmpty) [subscriber sendCompleted];
}
};
void (^sendNext)(void) = ^{
@synchronized (selfValues) {
// 数组里面的空了就返回。
if (selfValues.count == 0) return;
if (otherValues.count == 0) return;
// 每次都取出两个数组里面的第0位的值,打包成元组
RACTuple *tuple = RACTuplePack(selfValues[0], otherValues[0]);
[selfValues removeObjectAtIndex:0];
[otherValues removeObjectAtIndex:0];
// 把元组发送出去
[subscriber sendNext:tuple];
sendCompletedIfNecessary();
}
};
// 订阅第一个信号
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (selfValues) {
// 把第一个信号的值加入到数组中
[selfValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
// 订阅完成时判断是否要发送完成信号
selfCompleted = YES;
sendCompletedIfNecessary();
}
}];
// 订阅第二个信号
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (selfValues) {
// 把第二个信号加入到数组中
[otherValues addObject:x ?: RACTupleNil.tupleNil];
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (selfValues) {
// 订阅完成时判断是否要发送完成信号
otherCompleted = YES;
sendCompletedIfNecessary();
}
}];
return [RACDisposable disposableWithBlock:^{
// 销毁两个信号
[selfDisposable dispose];
[otherDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -zipWith: %@", self.name, signal];
}
当把两个信号通过zipWith之后,就像上面的那张图一样,拉链的两边被中间的拉索拉到了一起。既然是拉链,那么一一的位置是有对应的,上面的拉链第一个位置只能对着下面拉链第一个位置,这样拉链才能拉到一起去。
具体实现:
zipWith里面有两个数组,分别会存储两个信号的值。
- 一旦订阅了zipWith之后的信号,就开始执行didSubscribe闭包。
- 在闭包中会先订阅第一个信号。这里假设第一个信号比第二个信号先发出一个值。第一个信号发出来的每一个值都会被加入到第一个数组中保存起来,然后调用sendNext( )闭包。在sendNext( )闭包中,会先判断两个数组里面是否都为空,如果有一个数组里面是空的,就return。由于第二个信号还没有发送值,即第二个信号的数组里面是空的,所以这里第一个值发送不出来。于是第一个信号被订阅之后,发送的值存储到了第一个数组里面了,没有发出去。
- 第二个信号的值紧接着发出来了,第二个信号每发送一次值,也会存储到第二个数组中,但是这个时候再调用sendNext( )闭包的时候,不会再return了,因为两个数组里面都有值了,两个数组的第0号位置都有一个值了。有值以后就打包成元组RACTuple发送出去。并清空两个数组0号位置存储的值。
- 以后两个信号每次发送一个,就先存储在数组中,只要有“配对”的另一个信号,就一起打包成元组RACTuple发送出去。从图中也可以看出,zipWith之后的新信号,每个信号的发送时刻是等于两个信号最晚发出信号的时刻。
- 新信号的完成时间,是当两者任意一个信号完成并且数组里面为空,就算完成了。所以最后第一个信号发送的5的那个值就被丢弃了。
第一个信号依次发送的1,2,3,4的值和第二个信号依次发送的A,B,C,D的值,一一的合在了一起,就像拉链把他们拉在一起。由于5没法配对,所以拉链也拉不上了。
变换操作
1. Map:(简单变换)
在父类RACStream中定义的,map操作一般是用来信号变换的。
RACSignal *signal = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendCompleted];
return nil;
}];
RACSignal *mapSignal = [signal map:^id(NSNumber *value) {
return @(value.integerValue * 10);
}];
[mapSignal subscribeNext:^(NSNumber *x) {
NSLog(@"%@",x);
} completed:^{
NSLog(@"completed");
}];

来看看底层是如何实现的。
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value) {
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}
这里实现代码比较严谨,先判断self的类型。因为RACStream的子类中会有一些子类会重写这些方法,所以需要判断self的类型,在回调中可以回调到原类型的方法中去。
由于本篇文章中我们都分析RACSignal的操作,所以这里的self.class是RACDynamicSignal类型的。相应的在return返回值中也返回class,即RACDynamicSignal类型的信号。
从map实现代码上来看,map实现是用了flattenMap函数来实现的。把map的入参闭包,放到了flattenMap的返回值中。
在来看看flattenMap的实现:
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
flattenMap算是对bind函数的一种封装。bind函数的是一个返回值RACStreamBindBlock类型的闭包。而flattenMap函数的入参是一个value,返回值RACStream类型的闭包。
在flattenMap中,返回block(value)的信号,如果信号为nil,则返回[class empty]。
先来看看为空的情况。当block(value)为空,返回[RACEmptySignal empty],empty就是创建了一个RACEmptySignal类型的信号:
+ (RACSignal *)empty {
#ifdef DEBUG
// Create multiple instances of this class in DEBUG so users can set custom
// names on each.
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});
return singleton;
#endif
}
RACEmptySignal类型的信号又是什么呢?
- (RACDisposable *)subscribe:(id)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
RACEmptySignal是RACSignal的子类,一旦订阅它,它就会同步的发送completed完成信号给订阅者。
所以flattenMap返回一个信号,如果信号不存在,就返回一个completed完成信号给订阅者。
再来看看flattenMap返回的信号是怎么变换的。
block(value)会把原信号发送过来的value传递给flattenMap的入参。flattenMap的入参是一个闭包,闭包的参数也是value的:
^(id value) { return [class return:block(value)]; }
这个闭包返回一个信号,信号类型和原信号的类型一样,即RACDynamicSignal类型的,值是block(value)。这里的闭包是外面map传进来的:
^id(NSNumber *value) { return @([value intValue] * 10); }
在这个闭包中把原信号的value值传进去进行变换,变换结束之后,包装成和原信号相同类型的信号,返回。返回的信号作为bind函数的闭包的返回值。这样订阅新的map之后的信号就会拿到变换之后的值。
2. MapReplace:(全部替换)
一般用法如下:
RACSignal *signalB = [signalA mapReplace:@"A"];

效果是不管A信号发送什么值,都替换成@“A”。
- (instancetype)mapReplace:(id)object {
return [[self map:^(id _) {
return object;
}] setNameWithFormat:@"[%@] -mapReplace: %@", self.name, [object rac_description]];
}
看底层源码就知道,它并不去关心原信号发送的是什么值,原信号发送什么值,都返回入参object的值。
3. reduceEach:(元组变换)
reduce是减少,聚合在一起的意思,reduceEach就是每个信号内部都聚合在一起。
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id<RACSubscriber> subscriber) {
[subscriber sendNext:[RACTuple tupleWithObjects:@(1), @(2), nil]];
[subscriber sendNext:[RACTuple tupleWithObjects:@(3), @(4), nil]];
[subscriber sendNext:[RACTuple tupleWithObjects:@(5), @(6), nil]];
[subscriber sendCompleted];
return nil;
}];
RACSignal *singnalB = [signalA reduceEach:^id(NSNumber *num1, NSNumber *num2){
return @(num1.integerValue + num2.integerValue);
}];
[singnalB subscribeNext:^(id x) {
NSLog(@"%@",x);
}];

reduceEach后面必须传入一个元组RACTuple类型,否则会报错。
- (instancetype)reduceEach:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}
这里有两个断言,一个是判断传入的reduceBlock闭包是否为空,另一个断言是判断闭包的入参是否是RACTuple类型的。
@interface RACBlockTrampoline : NSObject
@property (nonatomic, readonly, copy) id block;
+ (id)invokeBlock:(id)block withArguments:(RACTuple *)arguments;
@end
RACBlockTrampoline就是一个保存了一个block闭包的对象,它会根据传进来的参数,动态的构造一个NSInvocation,并执行。
reduceEach把入参reduceBlock作为RACBlockTrampoline的入参invokeBlock传进去,以及每个RACTuple也传到RACBlockTrampoline中。
- (id)invokeWithArguments:(RACTuple *)arguments {
SEL selector = [self selectorForArgumentCount:arguments.count];
NSInvocation *invocation = [NSInvocation invocationWithMethodSignature:[self methodSignatureForSelector:selector]];
invocation.selector = selector;
invocation.target = self;
for (NSUInteger i = 0; i < arguments.count; i++) {
id arg = arguments[i];
NSInteger argIndex = (NSInteger)(i + 2);
[invocation setArgument:&arg atIndex:argIndex];
}
[invocation invoke];
__unsafe_unretained id returnVal;
[invocation getReturnValue:&returnVal];
return returnVal;
}
第一步就是先计算入参一个元组RACTuple里面元素的个数。
- (SEL)selectorForArgumentCount:(NSUInteger)count {
NSCParameterAssert(count > 0);
switch (count) {
case 0: return NULL;
case 1: return @selector(performWith:);
case 2: return @selector(performWith::);
case 3: return @selector(performWith:::);
case 4: return @selector(performWith::::);
case 5: return @selector(performWith:::::);
case 6: return @selector(performWith::::::);
case 7: return @selector(performWith:::::::);
case 8: return @selector(performWith::::::::);
case 9: return @selector(performWith:::::::::);
case 10: return @selector(performWith::::::::::);
case 11: return @selector(performWith:::::::::::);
case 12: return @selector(performWith::::::::::::);
case 13: return @selector(performWith:::::::::::::);
case 14: return @selector(performWith::::::::::::::);
case 15: return @selector(performWith:::::::::::::::);
}
NSCAssert(NO, @"The argument count is too damn high! Only blocks of up to 15 arguments are currently supported.");
return NULL;
}
可以看到最多支持元组内元素的个数是15个。
这里我们假设以元组里面有2个元素为例。
- (id)performWith:(id)obj1 :(id)obj2 {
id (^block)(id, id) = self.block;
return block(obj1, obj2);
}
对应的Type Encoding如下:
| ARGUMENT | RETURN VALUE | 0 | 1 | 2 | 3 |
|---|---|---|---|---|---|
| methodSignature | @ | @ | : | @ | @ |
argument0和argument1分别对应着隐藏参数self和_cmd,所以对应着的类型是@和:,从argument2开始,就是入参的Type Encoding了。
所以在构造invocation的参数的时候,argIndex是要偏移2个位置的。即从(i + 2)开始设置参数。
当动态构造了一个invocation方法之后,[invocation invoke]调用这个动态方法,也就是执行了外部的reduceBlock闭包,闭包里面是我们想要信号变换的规则。
闭包执行结束得到returnVal返回值。这个返回值就是整个RACBlockTrampoline的返回值了。这个返回值也作为了map闭包里面的返回值。
接下去的操作就完全转换成了map的操作了。上面已经分析过map操作了,这里就不赘述了。
4. reduceApply(同3,block为元组第一元素)
举个例子:
RACSignal *signalA = [RACSignal createSignal:
^RACDisposable *(id subscriber)
{
id block = ^id(NSNumber *first,NSNumber *second,NSNumber *third) {
return @(first.integerValue + second.integerValue * third.integerValue);
};
[subscriber sendNext:RACTuplePack(block,<a href='http://www.jobbole.com/members/0929outlook'>@2</a> , @3 , @8)];
[subscriber sendNext:RACTuplePack((id)(^id(NSNumber *x){return @(x.intValue * 10);}),@9,@10,@30)];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *signalB = [signalA reduceApply];
使用reduceApply的条件也是需要信号里面的值是元组RACTuple。不过这里和reduceEach不同的是,原信号中每个元祖RACTuple的第0位必须要为一个闭包,后面n位为这个闭包的入参,第0位的闭包有几个参数,后面就需要跟几个参数。
如上述例子中,第一个元组第0位的闭包有3个参数,所以第一个元组后面还要跟3个参数。第二个元组的第0位的闭包只有一个参数,所以后面只需要跟一个参数。
当然后面可以跟更多的参数,如第二个元组,闭包后面跟了3个参数,但是只有第一个参数是有效值,后面那2个参数是无效不起作用的。唯一需要注意的就是后面跟的参数个数一定不能少于第0位闭包入参的个数,否则就会报错。
上面例子输出:
26 // 26 = 2 + 3 * 8;
90 // 90 = 9 * 10;
看看底层实现:
- (RACSignal *)reduceApply {
return [[self map:^(RACTuple *tuple) {
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-reduceApply must only be used on a signal of RACTuples. Instead, received: %@", tuple);
NSCAssert(tuple.count > 1, @"-reduceApply must only be used on a signal of RACTuples, with at least a block in tuple[0] and its first argument in tuple[1]");
// We can't use -array, because we need to preserve RACTupleNil
NSMutableArray *tupleArray = [NSMutableArray arrayWithCapacity:tuple.count];
for (id val in tuple) {
[tupleArray addObject:val];
}
RACTuple *arguments = [RACTuple tupleWithObjectsFromArray:[tupleArray subarrayWithRange:NSMakeRange(1, tupleArray.count - 1)]];
return [RACBlockTrampoline invokeBlock:tuple[0] withArguments:arguments];
}] setNameWithFormat:@"[%@] -reduceApply", self.name];
}
这里也有2个断言,第一个是确保传入的参数是RACTuple类型,第二个断言是确保元组RACTuple里面的元素各种至少是2个。因为下面取参数是直接从1号位开始取的。
reduceApply做的事情和reduceEach基本是等效的,只不过变换规则的block闭包一个是外部传进去的,一个是直接打包在每个信号元组RACTuple中第0位中。
5. materialize(转化为RACEvent)
这个方法会把信号包装成RACEvent类型。
- (RACSignal *)materialize {
return [[RACSignal createSignal:^(id subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:[RACEvent eventWithValue:x]];
} error:^(NSError *error) {
[subscriber sendNext:[RACEvent eventWithError:error]];
[subscriber sendCompleted];
} completed:^{
[subscriber sendNext:RACEvent.completedEvent];
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -materialize", self.name];
}
sendNext会包装成[RACEvent eventWithValue:x],error会包装成[RACEvent eventWithError:error],completed会被包装成RACEvent.completedEvent。注意,当原信号error和completed,新信号都会发送sendCompleted。
6. dematerialize(RACEvent反转)
这个操作是materialize的逆向操作。它会把包装成RACEvent信号重新还原为正常的值信号。
- (RACSignal *)dematerialize {
return [[self bind:^{
return ^(RACEvent *event, BOOL *stop) {
switch (event.eventType) {
case RACEventTypeCompleted:
*stop = YES;
return [RACSignal empty];
case RACEventTypeError:
*stop = YES;
return [RACSignal error:event.error];
case RACEventTypeNext:
return [RACSignal return:event.value];
}
};
}] setNameWithFormat:@"[%@] -dematerialize", self.name];
}
这里的实现也用到了bind函数,它会把原信号进行一个变换。新的信号会根据event.eventType进行转换。RACEventTypeCompleted被转换成[RACSignal empty],RACEventTypeError被转换成[RACSignal error:event.error],RACEventTypeNext被转换成[RACSignal return:event.value]。
7. not(取反)
- (RACSignal *)not {
return [[self map:^(NSNumber *value) {
NSCAssert([value isKindOfClass:NSNumber.class], @"-not must only be used on a signal of NSNumbers. Instead, got: %@", value);
return @(!value.boolValue);
}] setNameWithFormat:@"[%@] -not", self.name];
}
not操作需要传入的值都是NSNumber类型的。不是NSNumber类型会报错。not操作会把每个NSNumber按照BOOL的规则,取非,当成新信号的值。
8. and(元组与操作)
- (RACSignal *)and {
return [[self map:^(RACTuple *tuple) {
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-and must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
return @([tuple.rac_sequence all:^(NSNumber *number) {
NSCAssert([number isKindOfClass:NSNumber.class], @"-and must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
return number.boolValue;
}]);
}] setNameWithFormat:@"[%@] -and", self.name];
}
and操作需要原信号的每个信号都是元组RACTuple类型的,因为只有这样,RACTuple类型里面的每个元素的值才能进行&运算。
and操作里面有3处断言。第一处,判断入参是不是元组RACTuple类型的。第二处,判断RACTuple类型里面至少包含一个NSNumber。第三处,判断RACTuple里面是否都是NSNumber类型,有一个不符合,都会报错。
- (RACSequence *)rac_sequence {
return [RACTupleSequence sequenceWithTupleBackingArray:self.backingArray offset:0];
}
RACTuple类型先转换成RACTupleSequence。
+ (instancetype)sequenceWithTupleBackingArray:(NSArray *)backingArray offset:(NSUInteger)offset {
NSCParameterAssert(offset _tupleBackingArray = backingArray;
seq->_offset = offset;
return seq;
}
RACTuple类型先转换成RACTupleSequence,即存成了一个数组。
- (BOOL)all:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
NSNumber *result = [self foldLeftWithStart:@YES reduce:^(NSNumber *accumulator, id value) {
return @(accumulator.boolValue && block(value));
}];
return result.boolValue;
}
- (id)foldLeftWithStart:(id)start reduce:(id (^)(id, id))reduce {
NSCParameterAssert(reduce != NULL);
if (self.head == nil) return start;
for (id value in self) {
start = reduce(start, value);
}
return start;
}
for会遍历RACSequence里面存的每一个值,分别都去调用reduce( )闭包。start的初始值为YES。reduce( )闭包是:
^(NSNumber *accumulator, id value) { return @(accumulator.boolValue && block(value)); }
这里又会去调用block( )闭包:
^(NSNumber *number) { return number.boolValue; }
number是原信号RACTuple的第一个值。第一次循环reduce( )闭包是拿YES和原信号RACTuple的第一个值进行&计算。第二个循环reduce( )闭包是拿原信号RACTuple的第一个值和第二个值进行&计算,得到的值参与下一次循环,与第三个值进行&计算,如此下去。这也是折叠函数的意思,foldLeft从左边开始折叠。fold函数会从左至右,把RACTuple转换成的数组里面每个值都一个接着一个进行&计算。
每个RACTuple都map成这样的一个BOOL值。接下去信号就map成了一个新的信号。
9. or(元组或操作)
- (RACSignal *)or {
return [[self map:^(RACTuple *tuple) {
NSCAssert([tuple isKindOfClass:RACTuple.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, received: %@", tuple);
NSCAssert(tuple.count > 0, @"-or must only be used on a signal of RACTuples of NSNumbers, with at least 1 value in the tuple");
return @([tuple.rac_sequence any:^(NSNumber *number) {
NSCAssert([number isKindOfClass:NSNumber.class], @"-or must only be used on a signal of RACTuples of NSNumbers. Instead, tuple contains a non-NSNumber value: %@", tuple);
return number.boolValue;
}]);
}] setNameWithFormat:@"[%@] -or", self.name];
}
or操作的实现和and操作的实现大体类似。3处断言的作用和and操作完全一致,这里就不再赘述了。or操作的重点在any函数的实现上。or操作的入参也必须是RACTuple类型的。
- (BOOL)any:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
return [self objectPassingTest:block] != nil;
}
- (id)objectPassingTest:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
return [self filter:block].head;
}
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
any会依次判断RACTupleSequence数组里面的值,依次每个进行filter。如果value对应的BOOL值是YES,就转换成一个RACTupleSequence信号。如果对应的是NO,则转换成一个empty信号。
只要RACTuple为NO,就一直返回empty信号,直到BOOL值为YES,就返回1。map变换信号后变成成1。找到了YES之后的值就不会再判断了。如果没有找到YES,中间都是NO的话,一直遍历到数组最后一个,信号只能返回0。
10. any:(查找第一个满足block条件的值)
- (RACSignal *)any:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);
return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.finished) {
*stop = YES;
return [RACSignal return:@NO];
}
if (predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@YES];
}
return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -any:", self.name];
}
原信号会先经过materialize转换包装成RACEvent事件。依次判断predicateBlock(event.value)值的BOOL值,如果返回YES,就包装成RACSignal的新信号,发送YES出去,并且stop接下来的信号。如果返回NO,就返回[RACSignal empty]空信号。直到event.finished,返回[RACSignal return:@NO]。
所以any:操作的目的是找到第一个满足predicateBlock条件的值。找到了就返回YES的RACSignal的信号,如果没有找到,返回NO的RACSignal。
11. any(上的特殊情况,block返回YES)
- (RACSignal *)any {
return [[self any:^(id x) {
return YES;
}] setNameWithFormat:@"[%@] -any", self.name];
}
any操作是any:操作中的一种情况。即predicateBlock闭包永远都返回YES,所以any操作之后永远都只能得到一个只发送一个YES的新信号。
12. all:(未出错或不满足条件之前所有数据)
- (RACSignal *)all:(BOOL (^)(id object))predicateBlock {
NSCParameterAssert(predicateBlock != NULL);
return [[[self materialize] bind:^{
return ^(RACEvent *event, BOOL *stop) {
if (event.eventType == RACEventTypeCompleted) {
*stop = YES;
return [RACSignal return:@YES];
}
if (event.eventType == RACEventTypeError || !predicateBlock(event.value)) {
*stop = YES;
return [RACSignal return:@NO];
}
return [RACSignal empty];
};
}] setNameWithFormat:@"[%@] -all:", self.name];
}
all:操作和any:有点类似。原信号会先经过materialize转换包装成RACEvent事件。对原信号发送的每个信号都依次判断predicateBlock(event.value)是否是NO 或者event.eventType == RACEventTypeError。如果predicateBlock(event.value)返回NO或者出现了错误,新的信号都返回NO。如果一直都没出现问题,在RACEventTypeCompleted的时候发送YES。
all:可以用来判断整个原信号发送过程中是否有错误事件RACEventTypeError,或者是否存在predicateBlock为NO的情况。可以把predicateBlock设置成一个正确条件。如果原信号出现错误事件,或者不满足设置的错误条件,都会发送新信号返回NO。如果全过程都没有出错,或者都满足predicateBlock设置的条件,则一直到RACEventTypeCompleted,发送YES的新信号。
13. repeat(未error,循环发送)
- (RACSignal *)repeat {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
// Resubscribe.
});
}] setNameWithFormat:@"[%@] -repeat", self.name];
}
repeat操作返回一个subscribeForever闭包,闭包里面要传入4个参数。
static RACDisposable *subscribeForever (RACSignal *signal, void (^next)(id), void (^error)(NSError *, RACDisposable *), void (^completed)(RACDisposable *)) {
next = [next copy];
error = [error copy];
completed = [completed copy];
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACSchedulerRecursiveBlock recursiveBlock = ^(void (^recurse)(void)) {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[compoundDisposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *subscriptionDisposable = [signal subscribeNext:next error:^(NSError *e) {
@autoreleasepool {
error(e, compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}
recurse();
} completed:^{
@autoreleasepool {
completed(compoundDisposable);
[compoundDisposable removeDisposable:weakSelfDisposable];
}
recurse();
}];
[selfDisposable addDisposable:subscriptionDisposable];
};
// Subscribe once immediately, and then use recursive scheduling for any
// further resubscriptions.
recursiveBlock(^{
RACScheduler *recursiveScheduler = RACScheduler.currentScheduler ?: [RACScheduler scheduler];
RACDisposable *schedulingDisposable = [recursiveScheduler scheduleRecursiveBlock:recursiveBlock];
[compoundDisposable addDisposable:schedulingDisposable];
});
return compoundDisposable;
}
subscribeForever有4个参数,第一个参数是原信号,第二个是传入的next闭包,第三个是error闭包,最后一个是completed闭包。
subscribeForever一进入这个函数就会调用recursiveBlock( )闭包,闭包中有一个recurse( )的入参的参数。在recursiveBlock( )闭包中对原信号RACSignal进行订阅。next,error,completed分别会先调用传进来的闭包。然后error,completed执行完error( )和completed( )闭包之后,还会继续再执行recurse( ),recurse( )是recursiveBlock的入参。
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
return disposable;
}
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *schedulingDisposable = [self schedule:^{ // 此处省略 }];
[selfDisposable addDisposable:schedulingDisposable];
}
}
先取到当前的currentScheduler,即recursiveScheduler,执行scheduleRecursiveBlock,在这个函数中,会调用schedule函数。这里的recursiveScheduler是RACQueueScheduler类型的。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_async(self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
如果原信号没有disposed,dispatch_async会继续执行block,在这个block中还会继续原信号的发送。所以原信号只要没有error信号,disposable.disposed就不会返回YES,就会一直调用block。所以在subscribeForever的error和completed的最后都会调用recurse( )闭包。error调用recurse( )闭包是为了结束调用block,结束所有的信号。completed调用recurse( )闭包是为了继续调用block( )闭包,也就是repeat的本质。原信号会继续发送信号,如此无限循环下去。
14. retry:(基本同上,error时重复规定次数,其他正常)
- (RACSignal *)retry:(NSInteger)retryCount {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
__block NSInteger currentRetryCount = 0;
return subscribeForever(self,
^(id x) {
[subscriber sendNext:x];
},
^(NSError *error, RACDisposable *disposable) {
if (retryCount == 0 || currentRetryCount < retryCount) {
// Resubscribe.
currentRetryCount++;
return;
}
[disposable dispose];
[subscriber sendError:error];
},
^(RACDisposable *disposable) {
[disposable dispose];
[subscriber sendCompleted];
});
}] setNameWithFormat:@"[%@] -retry: %lu", self.name, (unsigned long)retryCount];
}
在retry:的实现中,和repeat实现的区别是中间加入了一个currentRetryCount值。如果currentRetryCount > retryCount的话,就会在error中调用[disposable dispose],这样subscribeForever就不会再无限循环下去了。
所以retry:操作的用途就是在原信号在出现error的时候,重试retryCount的次数,如果依旧error,那么就会停止重试。
如果原信号没有发生错误,那么原信号在发送结束,subscribeForever也就结束了。retry:操作对于没有任何error的信号相当于什么都没有发生。
15. retry(发生错误无限重复)
- (RACSignal *)retry {
return [[self retry:0] setNameWithFormat:@"[%@] -retry", self.name];
}
这里的retry操作就是一个无限重试的操作。因为retryCount设置成0之后,在error的闭包中中,retryCount 永远等于 0,原信号永远都不会被dispose,所以subscribeForever会一直无限重试下去。
同样的,如果对一个没有error的信号调用retry操作,也是不起任何作用的。
16. scanWithStart: reduceWithIndex: (有index的终值计算过程)
先写出测试代码:
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@1];
[subscriber sendNext:@4];
[subscriber sendNext:@6];
return [RACDisposable disposableWithBlock:^{
}];
}];
RACSignal *signalB = [signalA scanWithStart:@(2) reduceWithIndex:^id(NSNumber * running, NSNumber * next, NSUInteger index) {
return @(running.intValue * next.intValue + index);
}];
结论:running最新的值,next当前信号值,index当前信号位置。
2 // 2 * 1 + 0 = 2
3 // 2 * 1 + 1 = 3
14 // 3 * 4 + 2 = 14
87 // 14 * 6 + 3 = 87

- (instancetype)scanWithStart:(id)startingValue reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
NSCParameterAssert(reduceBlock != nil);
Class class = self.class;
return [[self bind:^{
__block id running = startingValue;
__block NSUInteger index = 0;
return ^(id value, BOOL *stop) {
running = reduceBlock(running, value, index++);
return [class return:running];
};
}] setNameWithFormat:@"[%@] -scanWithStart: %@ reduceWithIndex:", self.name, [startingValue rac_description]];
}
scanWithStart这个变换由初始值,变换函数reduceBlock( ),和index步进的变量组成。原信号的每个信号都会由变换函数reduceBlock( )进行变换。index每次都是自增。变换的初始值是由入参startingValue传入的。
17. scanWithStart: reduce: (无index的终值计算过程)
- (instancetype)scanWithStart:(id)startingValue reduce:(id (^)(id running, id next))reduceBlock {
NSCParameterAssert(reduceBlock != nil);
return [[self
scanWithStart:startingValue
reduceWithIndex:^(id running, id next, NSUInteger index) {
return reduceBlock(running, next);
}]
setNameWithFormat:@"[%@] -scanWithStart: %@ reduce:", self.name, [startingValue rac_description]];
}
scanWithStart: reduce:就是scanWithStart: reduceWithIndex: 的缩略版。变换函数也是外面闭包reduceBlock( )传进来的。只不过变换过程中不会使用index自增的这个变量。
18. aggregateWithStart: reduceWithIndex:(有index的终值计算)
- (RACSignal *)aggregateWithStart:(id)start reduceWithIndex:(id (^)(id, id, NSUInteger))reduceBlock {
return [[[[self
scanWithStart:start reduceWithIndex:reduceBlock]
startWith:start]
takeLast:1]
setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduceWithIndex:", self.name, [start rac_description]];
}

aggregate是合计的意思。所以最后变换出来的信号只有最后一个值。 aggregateWithStart: reduceWithIndex:操作调用了scanWithStart: reduceWithIndex:,原理和它完全一致。不同的是多了两步额外的操作,一个是startWith:,一个是takeLast:1。startWith:是在scanWithStart: reduceWithIndex:变换之后的信号之前加上start信号。takeLast:1是取最后一个信号。takeLast:和startWith:的详细分析文章下面会详述。
值得注意的一点是,原信号如果没有发送complete信号,那么该函数就不会输出新的信号值。因为在一直等待结束。
19. aggregateWithStart: reduce:(无index的终值计算)
- (RACSignal *)aggregateWithStart:(id)start reduce:(id (^)(id running, id next))reduceBlock {
return [[self
aggregateWithStart:start
reduceWithIndex:^(id running, id next, NSUInteger index) {
return reduceBlock(running, next);
}]
setNameWithFormat:@"[%@] -aggregateWithStart: %@ reduce:", self.name, [start rac_description]];
}

aggregateWithStart: reduce:调用aggregateWithStart: reduceWithIndex:函数,只不过没有只用index值。同样,如果原信号没有发送complete信号,也不会输出任何信号。
20. aggregateWithStartFactory: reduce:(无index的终值计算)
- (RACSignal *)aggregateWithStartFactory:(id (^)(void))startFactory reduce:(id (^)(id running, id next))reduceBlock {
NSCParameterAssert(startFactory != NULL);
NSCParameterAssert(reduceBlock != NULL);
return [[RACSignal defer:^{
return [self aggregateWithStart:startFactory() reduce:reduceBlock];
}] setNameWithFormat:@"[%@] -aggregateWithStartFactory:reduce:", self.name];
}
aggregateWithStartFactory: reduce:内部实现就是调用aggregateWithStart: reduce:,只不过入参多了一个产生start的startFactory( )闭包罢了。
21. collect(收集所有值,数组输出)
- (RACSignal *)collect {
return [[self aggregateWithStartFactory:^{
return [[NSMutableArray alloc] init];
} reduce:^(NSMutableArray *collectedValues, id x) {
[collectedValues addObject:(x ?: NSNull.null)];
return collectedValues;
}] setNameWithFormat:@"[%@] -collect", self.name];
}

collect函数会调用aggregateWithStartFactory: reduce:方法。把所有原信号的值收集起来,保存在NSMutableArray中。
时间操作
1. throttle:valuesPassingTest:(节流输出,block为NO,不节流)
throttle:节流,在确定的时间间隔中,发送一个信号,之后没有信号,时间结束时发出该信号;如果发送了一个新值,则旧值被丢弃,直到时间结束,发送最新的值;但是当闭包predicate不满足时,说明当前信号不屏蔽,直接输出。
这个操作传入一个时间间隔NSTimeInterval,和一个判断条件的闭包predicate。原信号在一个时间间隔NSTimeInterval之间发送的信号,如果还能满足predicate,则原信号都被“吞”了,直到一个时间间隔NSTimeInterval结束,会再次判断predicate,如果不满足了,原信号就会被发送出来。

如上图,原信号发送1以后,间隔NSTimeInterval的时间内,没有信号发出,并且predicate也为YES,就把1变换成新的信号发出去。接下去由于原信号发送2,3,4的过程中,都在间隔NSTimeInterval的时间内,所以都被“吞”了。直到原信号发送5之后,间隔NSTimeInterval的时间内没有新的信号发出,所以把原信号的5发送出来。原信号的6也是如此。
再来看看具体实现:
- (RACSignal *)throttle:(NSTimeInterval)interval valuesPassingTest:(BOOL (^)(id next))predicate {
NSCParameterAssert(interval >= 0);
NSCParameterAssert(predicate != nil);
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACScheduler *scheduler = [RACScheduler scheduler];
__block id nextValue = nil;
__block BOOL hasNextValue = NO;
RACSerialDisposable *nextDisposable = [[RACSerialDisposable alloc] init];
void (^flushNext)(BOOL send) = ^(BOOL send) { // 暂时省略 };
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
// 暂时省略
} error:^(NSError *error) {
[compoundDisposable dispose];
[subscriber sendError:error];
} completed:^{
flushNext(YES);
[subscriber sendCompleted];
}];
[compoundDisposable addDisposable:subscriptionDisposable];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -throttle: %f valuesPassingTest:", self.name, (double)interval];
}
看这段实现,里面有2处断言。会先判断传入的interval是否大于0,小于0当然是不行的。还有一个就是传入的predicate闭包不能为空,这个是接下来用来控制流程的。
接下来的实现还是按照套路来,返回值是一个信号,新信号的闭包里面再订阅原信号进行变换。
那么整个变换的重点就落在了flushNext闭包和订阅原信号subscribeNext闭包中了。
当新的信号一旦被订阅,闭包执行到此处,就会对原信号进行订阅。
[self subscribeNext:^(id x) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
BOOL shouldThrottle = predicate(x);
@synchronized (compoundDisposable) {
flushNext(NO);
if (!shouldThrottle) {
[subscriber sendNext:x];
return;
}
nextValue = x;
hasNextValue = YES;
nextDisposable.disposable = [delayScheduler afterDelay:interval schedule:^{
flushNext(YES);
}];
}
}
- 首先先创建一个delayScheduler。先判断当前的currentScheduler是否存在,不存在就取之前创建的[RACScheduler scheduler]。这里虽然两处都是RACTargetQueueScheduler类型的,但是currentScheduler是com.ReactiveCocoa.RACScheduler.mainThreadScheduler,而[RACScheduler scheduler]创建的是com.ReactiveCocoa.RACScheduler.backgroundScheduler。
- 调用predicate( )闭包,传入原信号发来的信号值x,经过predicate判断以后,得到是否打开节流开关的BOOL变量shouldThrottle。
- 之所以把RACCompoundDisposable作为线程间互斥信号量,因为RACCompoundDisposable里面会加入所有的RACDisposable信号。接着下面的操作用@synchronized给线程间加锁。
- flushNext( )这个闭包是为了hook住原信号的发送。
void (^flushNext)(BOOL send) = ^(BOOL send) {
@synchronized (compoundDisposable) {
[nextDisposable.disposable dispose];
if (!hasNextValue) return;
if (send) [subscriber sendNext:nextValue];
nextValue = nil;
hasNextValue = NO;
}
};
这个闭包中如果传入的是NO,那么原信号就无法立即sendNext。如果传入的是YES,并且hasNextValue = YES,原信号待发送的还有值,那么就发送原信号。
shouldThrottle是一个阀门,随时控制原信号是否可以被发送。
小结一下,每个原信号发送过来,通过在throttle:valuesPassingTest:里面的did subscriber闭包中进行订阅。这个闭包中主要干了4件事情:
调用flushNext(NO)闭包判断能否发送原信号的值。入参为NO,不发送原信号的值。 判断阀门条件predicate(x)能否发送原信号的值。 如果以上两个条件都满足,nextValue中进行赋值为原信号发来的值,hasNextValue = YES代表当前有要发送的值。 开启一个delayScheduler,延迟interval的时间,发送原信号的这个值,即调用flushNext(YES)。
现在再来分析一下整个throttle:valuesPassingTest:的全过程
原信号发出第一个值,如果在interval的时间间隔内,没有新的信号发送,那么delayScheduler延迟interval的时间,执行flushNext(YES),发送原信号的这个第一个值。
- (RACDisposable *)afterDelay:(NSTimeInterval)delay schedule:(void (^)(void))block {
return [self after:[NSDate dateWithTimeIntervalSinceNow:delay] schedule:block];
}
- (RACDisposable *)after:(NSDate *)date schedule:(void (^)(void))block {
NSCParameterAssert(date != nil);
NSCParameterAssert(block != NULL);
RACDisposable *disposable = [[RACDisposable alloc] init];
dispatch_after([self.class wallTimeWithDate:date], self.queue, ^{
if (disposable.disposed) return;
[self performAsCurrentScheduler:block];
});
return disposable;
}
注意,在dispatch_after闭包里面之前[self performAsCurrentScheduler:block]之前,有一个关键的判断:
if (disposable.disposed) return;
这个判断就是用来判断从第一个信号发出,在间隔interval的时间之内,还有没有其他信号存在。如果有,第一个信号肯定会disposed,这里会执行return,所以也就不会把第一个信号发送出来了。
这样也就达到了节流的目的:原来每个信号都会创建一个delayScheduler,都会延迟interval的时间,在这个时间内,如果原信号再没有发送新值,即原信号没有disposed,就把原信号的值发出来;如果在这个时间内,原信号还发送了一个新值,那么第一个值就被丢弃。在发送过程中,每个信号都要判断一次predicate( ),这个是阀门的开关,如果随时都不节流了,原信号发的值就需要立即被发送出来。
还有二点需要注意的是,第一点,正好在interval那一时刻,有新信号发送出来,原信号也会被丢弃,即只有在>=interval的时间之内,原信号没有发送新值,原来的这个值才能发送出来。第二点,原信号发送completed时,会立即执行flushNext(YES),把原信号的最后一个值发送出来。
2. throttle:(节流输出)
- (RACSignal *)throttle:(NSTimeInterval)interval {
return [[self throttle:interval valuesPassingTest:^(id _) {
return YES;
}] setNameWithFormat:@"[%@] -throttle: %f", self.name, (double)interval];
}
这个操作其实就是调用了throttle:valuesPassingTest:方法,传入时间间隔interval,predicate( )闭包则永远返回YES,原信号的每个信号都执行节流操作。
3. bufferWithTime:onScheduler:(时间内元组打包发送)
这个操作的实现是类似于throttle:valuesPassingTest:的实现。
- (RACSignal *)bufferWithTime:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id subscriber) {
RACSerialDisposable *timerDisposable = [[RACSerialDisposable alloc] init];
NSMutableArray *values = [NSMutableArray array];
void (^flushValues)() = ^{
// 暂时省略
};
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
// 暂时省略
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
flushValues();
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[timerDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -bufferWithTime: %f onScheduler: %@", self.name, (double)interval, scheduler];
}
bufferWithTime:onScheduler:的实现和throttle:valuesPassingTest:的实现给出类似。开始有2个断言,2个都是判断scheduler的,第一个断言是判断scheduler是否为nil。第二个断言是判断scheduler的类型的,scheduler类型不能是immediateScheduler类型的,因为这个方法是要缓存一些信号的,所以不能是immediateScheduler类型的。
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (values) {
if (values.count == 0) {
timerDisposable.disposable = [scheduler afterDelay:interval schedule:flushValues];
}
[values addObject:x ?: RACTupleNil.tupleNil];
}
}
在subscribeNext中,当数组里面是没有存任何原信号的值,就会开启一个scheduler,延迟interval时间,执行flushValues闭包。如果里面有值了,就继续加到values的数组中。关键的也是闭包里面的内容,代码如下:
void (^flushValues)() = ^{
@synchronized (values) {
[timerDisposable.disposable dispose];
if (values.count == 0) return;
RACTuple *tuple = [RACTuple tupleWithObjectsFromArray:values];
[values removeAllObjects];
[subscriber sendNext:tuple];
}
};
flushValues( )闭包里面主要是把数组包装成一个元组,并且全部发送出来,原数组里面就全部清空了。这也是bufferWithTime:onScheduler:的作用,在interval时间内,把这个时间间隔内的原信号都缓存起来,并且在interval的那一刻,把这些缓存的信号打包成一个元组,发送出来。
和throttle:valuesPassingTest:方法一样,在原信号completed的时候,立即执行flushValues( )闭包,把里面存的值都发送出来。
4. delay:(延迟发送)
delay:函数的操作和上面几个套路都是一样的,实现方式也都是模板式的,唯一的不同都在subscribeNext中,和一个判断是否发送的闭包中。
- (RACSignal *)delay:(NSTimeInterval)interval {
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// We may never use this scheduler, but we need to set it up ahead of
// time so that our scheduled blocks are run serially if we do.
RACScheduler *scheduler = [RACScheduler scheduler];
void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
// 暂时省略
};
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
// 暂时省略
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
schedule(^{
[subscriber sendCompleted];
});
}];
[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -delay: %f", self.name, (double)interval];
}
在delay:的subscribeNext中,就单纯的执行了schedule的闭包。
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
schedule(^{
[subscriber sendNext:x];
});
}
void (^schedule)(dispatch_block_t) = ^(dispatch_block_t block) {
RACScheduler *delayScheduler = RACScheduler.currentScheduler ?: scheduler;
RACDisposable *schedulerDisposable = [delayScheduler afterDelay:interval schedule:block];
[disposable addDisposable:schedulerDisposable];
};
在schedule闭包中做的时间就是延迟interval的时间发送原信号的值。
5. interval:onScheduler:withLeeway:
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler withLeeway:(NSTimeInterval)leeway {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id subscriber) {
return [scheduler after:[NSDate dateWithTimeIntervalSinceNow:interval] repeatingEvery:interval withLeeway:leeway schedule:^{
[subscriber sendNext:[NSDate date]];
}];
}] setNameWithFormat:@"+interval: %f onScheduler: %@ withLeeway: %f", (double)interval, scheduler, (double)leeway];
}
在这个操作中,实现代码不难。先来看看2个断言,都是保护入参类型的,scheduler不能为空,且不能是immediateScheduler的类型,原因和上面是一样的,这里是延迟操作。
主要的实现就在after:repeatingEvery:withLeeway:schedule:上了。
这里的实现就是用GCD在self.queue上创建了一个Timer,时间间隔是interval,修正时间是leeway。
leeway这个参数是为dispatch source指定一个期望的定时器事件精度,让系统能够灵活地管理并唤醒内核。例如系统可以使用leeway值来提前或延迟触发定时器,使其更好地与其它系统事件结合。创建自己的定时器时,应该尽量指定一个leeway值。不过就算指定leeway值为0,也不能完完全全期望定时器能够按照精确的纳秒来触发事件。
这个定时器在interval执行sendNext操作,也就是发送原信号的值。
6. interval:onScheduler:
+ (RACSignal *)interval:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
return [[RACSignal interval:interval onScheduler:scheduler withLeeway:0.0] setNameWithFormat:@"+interval: %f onScheduler: %@", (double)interval, scheduler];
}
这个操作就是调用上一个方法interval:onScheduler:withLeeway:,只不过leeway = 0.0。具体实现上面已经分析过了,这里不再赘述。
过滤操作
1. filter: (选择符合条件数据)
这个filter:操作在any:的实现中用到过了。
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
if (block(value)) {
return [class return:value];
} else {
return class.empty;
}
}] setNameWithFormat:@"[%@] -filter:", self.name];
}

filter:中传入一个闭包,是用筛选的条件。如果满足筛选条件的即返回原信号的值,否则原信号的值被“吞”掉,返回空的信号。这个变换主要是用flattenMap的。
2. ignoreValues
- (RACSignal *)ignoreValues {
return [[self filter:^(id _) {
return NO;
}] setNameWithFormat:@"[%@] -ignoreValues", self.name];
}

由上面filter的实现,这里把筛选判断条件永远的传入NO,那么原信号的值都会被变换成empty信号,故变换之后的信号为空信号。
3. ignore:
- (instancetype)ignore:(id)value {
return [[self filter:^ BOOL (id innerValue) {
return innerValue != value && ![innerValue isEqual:value];
}] setNameWithFormat:@"[%@] -ignore: %@", self.name, [value rac_description]];
}

ignore:的实现还是由filter:实现的。传入的筛选判断条件是一个值,当原信号发送的值中是这个值的时候,就替换成空信号。
4. distinctUntilChanged(相邻数据相同则忽略)
- (instancetype)distinctUntilChanged {
Class class = self.class;
return [[self bind:^{
__block id lastValue = nil;
__block BOOL initial = YES;
return ^(id x, BOOL *stop) {
if (!initial && (lastValue == x || [x isEqual:lastValue])) return [class empty];
initial = NO;
lastValue = x;
return [class return:x];
};
}] setNameWithFormat:@"[%@] -distinctUntilChanged", self.name];
}

distinctUntilChanged的实现是用bind来完成的。每次变换中都记录一下原信号上一次发送过来的值,并与这一次进行比较,如果是相同的值,就“吞”掉,返回empty信号。只有和原信号上一次发送的值不同,变换后的新信号才把这个值发送出来。
关于distinctUntilChanged,这里关注的是两两信号之间的值是否不同,有时候我们可能需要一个类似于NSSet的信号集,distinctUntilChanged就无法满足了。在ReactiveCocoa 2.5的这个版本也并没有向我们提供distinct的变换函数。

我们可以自己实现类似的变换。实现思路也不难,可以把之前每次发送过来的信号都用数组存起来,新来的信号都去数组里面查找一遍,如果找不到,就把这个值发送出去,如果找到了,就返回empty信号。效果如上图。
5. take:
- (instancetype)take:(NSUInteger)count {
Class class = self.class;
if (count == 0) return class.empty;
return [[self bind:^{
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop) {
if (taken < count) {
++taken;
if (taken == count) *stop = YES;
return [class return:value];
} else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -take: %lu", self.name, (unsigned long)count];
}

take:实现也非常简单,借助bind函数来实现的。入参的count是原信号取值的个数。在bind的闭包中,taken计数从0开始取原信号的值,当taken取到count个数的时候,就停止取值。
在take:的基础上我们还可以继续改造出新的变换方式。比如说,想取原信号中执行的第几个值。类似于elementAt的操作。这个操作在ReactiveCocoa 2.5的这个版本也并没有直接向我们提供出来。

// 我自己增加实现的方法
- (instancetype)elementAt:(NSUInteger)index {
Class class = self.class;
return [[self bind:^{
__block NSUInteger taken = 0;
return ^ id (id value, BOOL *stop) {
if (index == 0) {
*stop = YES;
return [class return:value];
}
if (taken == index) {
*stop = YES;
return [class return:value];
} else if (taken < index){
taken ++;
return [class empty];
}else {
return nil;
}
};
}] setNameWithFormat:@"[%@] -elementAt: %lu", self.name, (unsigned long)index];
}
6. takeLast:
- (RACSignal *)takeLast:(NSUInteger)count {
return [[RACSignal createSignal:^(id subscriber) {
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x) {
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count) {
[valuesTaken removeObjectAtIndex:0];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
for (id value in valuesTaken) {
[subscriber sendNext:value == RACTupleNil.tupleNil ? nil : value];
}
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -takeLast: %lu", self.name, (unsigned long)count];
}

takeLast:的实现也是按照套路来。先创建一个新信号,return的时候订阅原信号。在函数内部用一个valuesTaken来保存原信号发送过来的值,原信号发多少,就存多少,直到个数溢出入参给定的count,就溢出数组第0位。这样能保证数组里面始终都装着最后count个原信号的值。
当原信号发送completed信号的时候,把数组里面存的值都sendNext出去。这里要注意的也是该变换发送信号的时机。如果原信号一直没有completed,那么takeLast:就一直没法发出任何信号来。
7. takeUntilBlock:(block满足条件停止发送)
- (instancetype)takeUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^{
return ^ id (id value, BOOL *stop) {
if (predicate(value)) return nil;
return [class return:value];
};
}] setNameWithFormat:@"[%@] -takeUntilBlock:", self.name];
}
takeUntilBlock:是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么新信号停止发送新信号,因为它被置为nil了。和函数名的意思是一样的,take原信号的值,Until直到闭包满足条件。
8. takeWhileBlock:(满足条件开始发送)
- (instancetype)takeWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
return [[self takeUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -takeWhileBlock:", self.name];
}
takeWhileBlock:的信号集是takeUntilBlock:的信号集的补集。全集是原信号。takeWhileBlock:底层还是调用takeUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。
9. takeUntil:(终止信号发出,停止发送)
- (RACSignal *)takeUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
void (^triggerCompletion)(void) = ^{
[disposable dispose];
[subscriber sendCompleted];
};
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:selfDisposable];
}
return disposable;
}] setNameWithFormat:@"[%@] -takeUntil: %@", self.name, signalTrigger];
}

takeUntil:的实现也是“经典套路”——return一个新信号,在新信号中订阅原信号。入参是一个信号signalTrigger,这个信号是一个Trigger。一旦signalTrigger发出第一个信号,就会触发triggerCompletion( )闭包,在这个闭包中,会调用triggerCompletion( )闭包。
一旦调用了triggerCompletion( )闭包,就会把原信号取消订阅,并给变换的新的信号订阅者sendCompleted。
如果入参signalTrigger一直没有sendNext,那么原信号就会一直sendNext:。
10. takeUntilReplacement:(信号发送替换)
- (RACSignal *)takeUntilReplacement:(RACSignal *)replacement {
return [RACSignal createSignal:^(id subscriber) {
RACSerialDisposable *selfDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *replacementDisposable = [replacement subscribeNext:^(id x) {
[selfDisposable dispose];
[subscriber sendNext:x];
} error:^(NSError *error) {
[selfDisposable dispose];
[subscriber sendError:error];
} completed:^{
[selfDisposable dispose];
[subscriber sendCompleted];
}];
if (!selfDisposable.disposed) {
selfDisposable.disposable = [[self
concat:[RACSignal never]]
subscribe:subscriber];
}
return [RACDisposable disposableWithBlock:^{
[selfDisposable dispose];
[replacementDisposable dispose];
}];
}];
}

- 原信号concat:了一个[RACSignal never]信号,这样原信号就一直不会disposed,会一直等待replacement信号的到来。
- 控制selfDisposable是否被dispose,控制权来自于入参的replacement信号,一旦replacement信号sendNext,那么原信号就会取消订阅,接下来的事情就会交给replacement信号了。
- 变换后的新信号sendNext,sendError,sendCompleted全部都由replacement信号来发送,最终新信号完成的时刻也是replacement信号完成的时刻。
11. skip:
- (instancetype)skip:(NSUInteger)skipCount {
Class class = self.class;
return [[self bind:^{
__block NSUInteger skipped = 0;
return ^(id value, BOOL *stop) {
if (skipped >= skipCount) return [class return:value];
skipped++;
return class.empty;
};
}] setNameWithFormat:@"[%@] -skip: %lu", self.name, (unsigned long)skipCount];
}

skip:信号集和take:信号集是补集关系,全集是原信号。take:是取原信号的前count个信号,而skip:是从原信号第count + 1位开始取信号。
skipped是一个游标,每次原信号发送一个值,就比较它和入参skipCount的大小。如果不比skipCount大,说明还需要跳过,所以就返回empty信号,否则就把原信号的值发送出来。
通过类比take系列方法,可以发现在ReactiveCocoa 2.5的这个版本也并没有向我们提供skipLast:的变换函数。这个变换函数的实现过程也不难,我们可以类比takeLast:来实现。
实现的思路也不难,原信号每次发送过来的值,都用一个数组存储起来。skipLast:是想去掉原信号最末尾的count个信号。
我们先来分析一下:假设原信号有n个信号,从0 – (n-1),去掉最后的count个,前面还剩n – count个信号。那么从 原信号的第 count + 1位的信号开始发送,到原信号结束,这样中间就正好是发送了 n – count 个信号。
分析清楚后,代码就很容易了:
// 我自己增加实现的方法
- (RACSignal *)skipLast:(NSUInteger)count {
return [[RACSignal createSignal:^(id subscriber) {
NSMutableArray *valuesTaken = [NSMutableArray arrayWithCapacity:count];
return [self subscribeNext:^(id x) {
[valuesTaken addObject:x ? : RACTupleNil.tupleNil];
while (valuesTaken.count > count) {
[subscriber sendNext:valuesTaken[0] == RACTupleNil.tupleNil ? nil : valuesTaken[0]];
[valuesTaken removeObjectAtIndex:0];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -skipLast: %lu", self.name, (unsigned long)count];
}

原信号每发送过来一个信号就存入数组,当数组里面的个数大于count的时候,就是需要我们发送信号的时候,这个时候每次都把数组里面第0位发送出去即可,数组维护了一个FIFO的队列。这样就实现了skipLast:的效果了。
12. skipUntilBlock:
- (instancetype)skipUntilBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
Class class = self.class;
return [[self bind:^{
__block BOOL skipping = YES;
return ^ id (id value, BOOL *stop) {
if (skipping) {
if (predicate(value)) {
skipping = NO;
} else {
return class.empty;
}
}
return [class return:value];
};
}] setNameWithFormat:@"[%@] -skipUntilBlock:", self.name];
}
skipUntilBlock: 的实现可以类比takeUntilBlock: 的实现。
skipUntilBlock: 是根据传入的predicate闭包作为筛选条件的。一旦predicate( )闭包满足条件,那么skipping = NO。skipping为NO,以后原信号发送的每个值都原封不动的发送出去。predicate( )闭包不满足条件的时候,即会一直skip原信号的值。和函数名的意思是一样的,skip原信号的值,Until直到闭包满足条件,就不再skip了。
13. skipWhileBlock:
- (instancetype)skipWhileBlock:(BOOL (^)(id x))predicate {
NSCParameterAssert(predicate != nil);
return [[self skipUntilBlock:^ BOOL (id x) {
return !predicate(x);
}] setNameWithFormat:@"[%@] -skipWhileBlock:", self.name];
}
skipWhileBlock:的信号集是skipUntilBlock:的信号集的补集。全集是原信号。skipWhileBlock:底层还是调用skipUntilBlock:,只不过判断条件的是不满足predicate( )闭包的集合。
到这里skip系列方法就结束了,对比take系列的方法,少了2个方法,在ReactiveCocoa 2.5的这个版本中 takeUntil: 和 takeUntilReplacement:这两个方法没有与之对应的skip方法。
// 我自己增加实现的方法
- (RACSignal *)skipUntil:(RACSignal *)signalTrigger {
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
__block BOOL sendTrigger = NO;
void (^triggerCompletion)(void) = ^{
sendTrigger = YES;
};
RACDisposable *triggerDisposable = [signalTrigger subscribeNext:^(id _) {
triggerCompletion();
} completed:^{
triggerCompletion();
}];
[disposable addDisposable:triggerDisposable];
if (!disposable.disposed) {
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
if (sendTrigger) {
[subscriber sendNext:x];
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:selfDisposable];
}
return disposable;
}] setNameWithFormat:@"[%@] -skipUntil: %@", self.name, signalTrigger];
}

skipUntil实现方法也很简单,当入参的signalTrigger开发发送信号的时候,就让原信号sendNext把值发送出来,否则就把原信号的值“吞”掉。
skipUntilReplacement:就没什么意义了,把原信号经过skipUntilReplacement:变换之后得到的新的信号就是Replacement信号。所以说这个操作也就没意义了。
14. groupBy:transform:(分组变换)
- (RACSignal *)groupBy:(id (^)(id object))keyBlock transform:(id (^)(id object))transformBlock {
NSCParameterAssert(keyBlock != NULL);
return [[RACSignal createSignal:^(id subscriber) {
NSMutableDictionary *groups = [NSMutableDictionary dictionary];
NSMutableArray *orderedGroups = [NSMutableArray array];
return [self subscribeNext:^(id x) {
id key = keyBlock(x);
RACGroupedSignal *groupSubject = nil;
@synchronized(groups) {
groupSubject = groups[key];
if (groupSubject == nil) {
groupSubject = [RACGroupedSignal signalWithKey:key];
groups[key] = groupSubject;
[orderedGroups addObject:groupSubject];
[subscriber sendNext:groupSubject];
}
}
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
} error:^(NSError *error) {
[subscriber sendError:error];
[orderedGroups makeObjectsPerformSelector:@selector(sendError:) withObject:error];
} completed:^{
[subscriber sendCompleted];
[orderedGroups makeObjectsPerformSelector:@selector(sendCompleted)];
}];
}] setNameWithFormat:@"[%@] -groupBy:transform:", self.name];
}
看groupBy:transform:的实现,依旧是老“套路”。return 一个新的RACSignal,在新的信号里面订阅原信号。
groupBy:transform:的重点就在subscribeNext中了。
- 首先解释一下两个入参。两个入参都是闭包,keyBlock返回值是要作为字典的key,transformBlock的返回值是对原信号发出来的值x进行变换。
- 先创建一个NSMutableDictionary字典groups,和NSMutableArray数组orderedGroups。
- 从字典里面取出key对应的value,这里的key对应着keyBlock返回值。value的值是一个RACGroupedSignal信号。如果找不到对应的key值,就新建一个RACGroupedSignal信号,并存入字典对应的key值,与之对应。
- 新变换之后的信号,订阅之后,RACGroupedSignal进行sendNext,这是一个信号,如果transformBlock不为空,就发送transformBlock变换之后的值。
- sendError和sendCompleted都要分别对数组orderedGroups里面每个RACGroupedSignal都要进行sendError或者sendCompleted。因为要对数组里面每个信号都执行一个操作,所以需要调用makeObjectsPerformSelector:withObject:方法。
经过groupBy:transform:变换之后,原信号会根据keyBlock进行分组。
写出测试代码,来看看平时应该怎么用。
RACSignal *signalA = [RACSignal createSignal:^RACDisposable *(id subscriber)
{
[subscriber sendNext:@1];
[subscriber sendNext:@2];
[subscriber sendNext:@3];
[subscriber sendNext:@4];
[subscriber sendNext:@5];
[subscriber sendCompleted];
return [RACDisposable disposableWithBlock:^{
NSLog(@"signal dispose");
}];
}];
RACSignal *signalGroup = [signalA groupBy:^id(NSNumber *object) {
return object.integerValue > 3 ? @"good" : @"bad";
} transform:^id(NSNumber * object) {
return @(object.integerValue * 10);
}];
[[[signalGroup filter:^BOOL(RACGroupedSignal *value) {
return [(NSString *)value.key isEqualToString:@"good"];
}] flatten]subscribeNext:^(id x) {
NSLog(@"subscribeNext: %@", x);
}];
假设原信号发送的1,2,3,4,5是代表的成绩的5个等级。当成绩大于3的都算“good”,小于3的都算“bad”。
signalGroup是原信号signalA经过groupBy:transform:得到的新的信号,这个信号是一个高阶的信号,因为它里面并不是直接装的是值,signalGroup这个信号里面装的还是信号。signalGroup里面有两个分组,分别是“good”分组和“bad”分组。
想从中取出这两个分组里面的值,需要进行一次filter:筛选。筛选之后得到对应分组的高阶信号。这时还要再进行一个flatten操作,把高阶信号变成低阶信号,再次订阅才能取到其中的值。
订阅新信号的值,输出如下:
subscribeNext: 40
subscribeNext: 50
15. groupBy:
- (RACSignal *)groupBy:(id (^)(id object))keyBlock {
return [[self groupBy:keyBlock transform:nil] setNameWithFormat:@"[%@] -groupBy:", self.name];
}
groupBy:操作就是groupBy:transform:的缩减版,transform传入的为nil。
关于groupBy:可以干的事情很多,可以进行很高级的分组操作。这里可以举一个例子:
// 简单算法题,分离数组中的相同的元素,如果元素个数大于2,则组成一个新的数组,结果得到多个包含相同元素的数组,
// 例如[1,2,3,1,2,3]分离成[1,1],[2,2],[3,3]
RACSignal *signal = @[@1, @2, @3, @4,@2,@3,@3,@4,@4,@4].rac_sequence.signal;
NSArray * array = [[[[signal groupBy:^NSString *(NSNumber *object) {
return [NSString stringWithFormat:@"%@",object];
}] map:^id(RACGroupedSignal *value) {
return [value sequence];
}] sequence] map:^id(RACSignalSequence * value) {
return value.array;
}].array;
for (NSNumber * num in array) {
NSLog(@"最后的数组%@",num);
}
// 最后输出 [1,2,3,4,2,3,3,4,4,4]变成[1],[2,2],[3,3,3],[4,4,4,4]
组合操作
1. startWith:
- (instancetype)startWith:(id)value {
return [[[self.class return:value]
concat:self]
setNameWithFormat:@"[%@] -startWith: %@", self.name, [value rac_description]];
}

startWith:的实现很简单,就是先构造一个只发送一个value的信号,然后这个信号发送完毕之后接上原信号。得到的新的信号就是在原信号前面新加了一个值。
2. concat:(对象方法)
这里说的concat:是在父类RACStream中定义的。
- (instancetype)concat:(RACStream *)stream {
return nil;
}
父类中定义的这个方法就返回一个nil,具体的实现还要子类去重写。
3. concat:(类方法)
+ (instancetype)concat:(id)streams {
RACStream *result = self.empty;
for (RACStream *stream in streams) {
result = [result concat:stream];
}
return [result setNameWithFormat:@"+concat: %@", streams];
}
这个concat:后面跟着一个数组,数组里面包含这很多信号,concat:依次把这些信号concat:连接串起来。
4. merge:(多个信号)
+ (RACSignal *)merge:(id)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}
return [[[RACSignal
createSignal:^ RACDisposable * (id subscriber) {
for (RACSignal *signal in copiedSignals) {
[subscriber sendNext:signal];
}
[subscriber sendCompleted];
return nil;
}]
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
}

merge:后面跟一个数组。先会新建一个数组copiedSignals,把传入的信号都装到数组里。然后依次发送数组里面的信号。由于新信号也是一个高阶信号,因为sendNext会把各个信号都依次发送出去,所以需要flatten操作把这个信号转换成值发送出去。
从上图上看,上下两个信号就像被拍扁了一样,就成了新信号的发送顺序。
5. merge:(单个信号)
- (RACSignal *)merge:(RACSignal *)signal {
return [[RACSignal
merge:@[ self, signal ]]
setNameWithFormat:@"[%@] -merge: %@", self.name, signal];
}
merge:后面参数也可以跟一个信号,那么merge:就是合并这两个信号。具体实现和merge:多个信号是一样的原理。
6. zip:
+ (instancetype)zip:(id)streams {
return [[self join:streams block:^(RACStream *left, RACStream *right) {
return [left zipWith:right];
}] setNameWithFormat:@"+zip: %@", streams];
}
zip:后面可以跟一个数组,数组里面装的是各种信号流。
它的实现是调用了join: block: 实现的。
+ (instancetype)join:(id)streams block:(RACStream * (^)(id, id))block {
RACStream *current = nil;
// 第一步
for (RACStream *stream in streams) {
if (current == nil) {
current = [stream map:^(id x) {
return RACTuplePack(x);
}];
continue;
}
current = block(current, stream);
}
// 第二步
if (current == nil) return [self empty];
return [current map:^(RACTuple *xs) {
NSMutableArray *values = [[NSMutableArray alloc] init];
// 第三步
while (xs != nil) {
[values insertObject:xs.last ?: RACTupleNil.tupleNil atIndex:0];
xs = (xs.count > 1 ? xs.first : nil);
}
// 第四步
return [RACTuple tupleWithObjectsFromArray:values];
}];
}
join: block: 的实现可以分为4步:
- 依次打包各个信号流,把每个信号流都打包成元组RACTuple。首先第一个信号流打包成一个元组,这个元组里面就一个信号。接着把第一个元组和第二个信号执行block( )闭包里面的操作。传入的block( )闭包执行的是zipWith:的操作。这个操作是把两个信号“压”在一起。得到第二个元组,里面装着是第一个元组和第二个信号。之后每次循环都执行类似的操作,再把第二个元组和第三个信号进行zipWith:操作,以此类推下去,直到所有的信号流都循环一遍。
- 经过第一步的循环操作之后,还是nil,那么肯定就是空信号了,就返回empty信号。
- 这一步是把之前第一步打包出来的结果,还原回原信号的过程。经过第一步的循环之后,current会是类似这个样子,(((1), 2), 3),第三步就是为了把这种多重元组解出来,每个信号流都依次按照顺序放在数组里。注意观察current的特点,最外层的元组,是一个值和一个元组,所以从最外层的元组开始,一层一层往里“剥”。while循环每次都取最外层元组的last,即那个单独的值,插入到数组的第0号位置,然后取出first即是里面一层的元组。然后依次循环。由于每次都插入到数组0号的位置,类似于链表的头插法,最终数组里面的顺序肯定也保证是原信号的顺序。
- 第四步就是把还原成原信号的顺序的数组包装成元组,返回给map操作的闭包。
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array {
return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
}
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {
RACTuple *tuple = [[self alloc] init];
if (convert) {
NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
for (id object in array) {
[newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
}
tuple.backingArray = newArray;
} else {
tuple.backingArray = [array copy];
}
return tuple;
}
在转换过程中,入参convertNullsToNils的含义是,是否把数组里面的NSNull转换成RACTupleNil。
这里转换传入的是NO,所以就是把数组原封不动的copy一份。
测试代码:
RACSignal *signalD = [RACSignal interval:3 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalO = [RACSignal interval:1 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalE = [RACSignal interval:4 onScheduler:[RACScheduler mainThreadScheduler] withLeeway:0];
RACSignal *signalB = [RACStream zip:@[signalD,signalO,signalE]];
[signalB subscribeNext:^(id x) {
NSLog(@"最后接收到的值 = %@",x);
}];
打印输出:
2016-11-29 13:07:57.349 最后接收到的值 = (
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:07:54 +0000",
"2016-11-29 05:07:57 +0000"
)
2016-11-29 13:08:01.350 最后接收到的值 = (
"2016-11-29 05:07:59 +0000",
"2016-11-29 05:07:55 +0000",
"2016-11-29 05:08:01 +0000"
)
2016-11-29 13:08:05.352 最后接收到的值 = (
"2016-11-29 05:08:02 +0000",
"2016-11-29 05:07:56 +0000",
"2016-11-29 05:08:05 +0000"
)
最后输出的信号以时间最长的为主,最后接到的信号是一个元组,里面依次包含zip:数组里每个信号在一次“压”缩周期里面的值。
7. zip: reduce:
+ (instancetype)zip:(id)streams reduce:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
RACStream *result = [self zip:streams];
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+zip: %@ reduce:", streams];
}
zip: reduce:是一个组合的方法。具体实现可以拆分成两部分,第一部分是先执行zip:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。zip:完成之后,紧接着进行reduceEach:操作。
这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
- (instancetype)reduceEach:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
__weak RACStream *stream __attribute__((unused)) = self;
return [[self map:^(RACTuple *t) {
NSCAssert([t isKindOfClass:RACTuple.class], @"Value from stream %@ is not a tuple: %@", stream, t);
return [RACBlockTrampoline invokeBlock:reduceBlock withArguments:t];
}] setNameWithFormat:@"[%@] -reduceEach:", self.name];
}
reduceEach:它会动态的构造闭包,对原信号每个元组,执行reduceBlock( )闭包里面的方法。一般用法如下:
[RACStream zip:@[ stringSignal, intSignal ] reduce:^(NSString *string, NSNumber *number) {
return [NSString stringWithFormat:@"%@: %@", string, number];
}];
8. combineLatestWith:
- (RACSignal *)combineLatestWith:(RACSignal *)signal {
NSCParameterAssert(signal != nil);
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
// 初始化第一个信号的一些标志变量
__block id lastSelfValue = nil;
__block BOOL selfCompleted = NO;
// 初始化第二个信号的一些标志变量
__block id lastOtherValue = nil;
__block BOOL otherCompleted = NO;
// 这里是一个判断是否sendNext的闭包
void (^sendNext)(void) = ^{ };
// 订阅第一个信号
RACDisposable *selfDisposable = [self subscribeNext:^(id x) { }];
[disposable addDisposable:selfDisposable];
// 订阅第二个信号
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) { }];
[disposable addDisposable:otherDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -combineLatestWith: %@", self.name, signal];
}
大体实现思路比较简单,在新信号里面分别订阅原信号和入参signal信号。
RACDisposable *selfDisposable = [self subscribeNext:^(id x) {
@synchronized (disposable) {
lastSelfValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
selfCompleted = YES;
if (otherCompleted) [subscriber sendCompleted];
}
}];
先来看看原信号订阅的具体实现:
在subscribeNext闭包中,记录下原信号最新发送的x值,并保存到lastSelfValue中。从此lastSelfValue变量每次都保存原信号发送过来的最新的值。然后再调用sendNext( )闭包。
在completed闭包中,selfCompleted中记录下原信号发送完成。这是还要判断otherCompleted是否完成,即入参信号signal是否发送完成,只有两者都发送完成了,组合的新信号才能算全部发送完成。
RACDisposable *otherDisposable = [signal subscribeNext:^(id x) {
@synchronized (disposable) {
lastOtherValue = x ?: RACTupleNil.tupleNil;
sendNext();
}
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (disposable) {
otherCompleted = YES;
if (selfCompleted) [subscriber sendCompleted];
}
}];
这是对入参信号signal的处理实现。和原信号的处理方式完全一致。现在重点就要看看sendNext( )闭包中都做了些什么。
void (^sendNext)(void) = ^{
@synchronized (disposable) {
if (lastSelfValue == nil || lastOtherValue == nil) return;
[subscriber sendNext:RACTuplePack(lastSelfValue, lastOtherValue)];
}
};
在sendNext( )闭包中,如果lastSelfValue 或者 lastOtherValue 其中之一有一个为nil,就return,因为这个时候无法结合在一起。当两个信号都有值,那么就把这两个信号的最新的值打包成元组发送出来。

可以看到,每个信号每发送出来一个新的值,都会去找另外一个信号上一个最新的值进行结合。
这里可以对比一下类似的zip:操作

zip:操作是会把新来的信号的值存起来,放在数组里,然后另外一个信号发送一个值过来就和数组第0位的值相互结合成新的元组信号发送出去,并分别移除数组里面第0位的两个值。zip:能保证每次结合的值都是唯一的,不会一个原信号的值被多次结合到新的元组信号中。但是combineLatestWith:是不能保证这一点的,在原信号或者另外一个信号新信号发送前,每次发送信号都会结合当前最新的信号,这里就会有反复结合的情况。
9. combineLatest:
+ (RACSignal *)combineLatest:(id)signals {
return [[self join:signals block:^(RACSignal *left, RACSignal *right) {
return [left combineLatestWith:right];
}] setNameWithFormat:@"+combineLatest: %@", signals];
}
combineLatest:的实现就是把入参数组里面的每个信号都调用一次join: block:方法。传入的闭包是把两个信号combineLatestWith:一下。combineLatest:的实现就是2个操作的组合。具体实现上面也都分析过,这里不再赘述。
10. combineLatest: reduce:
+ (RACSignal *)combineLatest:(id)signals reduce:(id (^)())reduceBlock {
NSCParameterAssert(reduceBlock != nil);
RACSignal *result = [self combineLatest:signals];
if (reduceBlock != nil) result = [result reduceEach:reduceBlock];
return [result setNameWithFormat:@"+combineLatest: %@ reduce:", signals];
}
combineLatest: reduce: 的实现可以类比zip: reduce:的实现。
具体实现可以拆分成两部分,第一部分是先执行combineLatest:,把数组里面的信号流依次都进行组合。这一过程的实现在上一个变换实现中分析过了。combineLatest:完成之后,紧接着进行reduceEach:操作。
这里有一个判断reduceBlock是否为nil的判断,这个判断是针对老版本的“历史遗留问题”。在ReactiveCocoa 2.5之前的版本,是允许reduceBlock传入nil,这里为了防止崩溃,所以加上了这个reduceBlock是否为nil的判断。
11. combinePreviousWithStart: reduce:
这个方法的实现也是多个变换操作组合在一起的。
- (instancetype)combinePreviousWithStart:(id)start reduce:(id (^)(id previous, id next))reduceBlock {
NSCParameterAssert(reduceBlock != NULL);
return [[[self
scanWithStart:RACTuplePack(start)
reduce:^(RACTuple *previousTuple, id next) {
id value = reduceBlock(previousTuple[0], next);
return RACTuplePack(next, value);
}]
map:^(RACTuple *tuple) {
return tuple[1];
}]
setNameWithFormat:@"[%@] -combinePreviousWithStart: %@ reduce:", self.name, [start rac_description]];
}
combinePreviousWithStart: reduce:的实现完全可以类比scanWithStart:reduce:的实现。举个例子来说明他们俩的不同。
RACSequence *numbers = @[ @1, @2, @3, @4 ].rac_sequence;
RACSignal *signalA = [numbers combinePreviousWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
return @(previous.integerValue + next.integerValue);
}].signal;
RACSignal *signalB = [numbers scanWithStart:@0 reduce:^(NSNumber *previous, NSNumber *next) {
return @(previous.integerValue + next.integerValue);
}].signal;
signalA输出如下:
1
3
5
7
signalB输出如下:
1
3
6
10
现在应该不同点应该很明显了。combinePreviousWithStart: reduce:实现的是两两之前的加和,而scanWithStart:reduce:实现的累加。
为什么会这样呢,具体看看combinePreviousWithStart: reduce:的实现。
虽然combinePreviousWithStart: reduce:也是调用了scanWithStart:reduce:,但是初始值是RACTuplePack(start)元组,聚合reduce的过程也有所不同:
id value = reduceBlock(previousTuple[0], next);
return RACTuplePack(next, value);
依次调用reduceBlock( )闭包,传入previousTuple[0], next,这里reduceBlock( )闭包是进行累加的操作,所以就是把前一个元组的第0位加上后面新来的信号的值。得到的值拼成新的元组,新的元组由next和value值构成。
如果打印出上述例子中combinePreviousWithStart: reduce:的加合过程中每个信号的值,如下:
(
1,
1
)
(
2,
3
)
(
3,
5
)
(
4,
7
)
由于这样拆成元组之后,下次再进行操作的时候,还可以拿到前一个信号的值,这样就不会形成累加的效果。
12. sample:
- (RACSignal *)sample:(RACSignal *)sampler {
NSCParameterAssert(sampler != nil);
return [[RACSignal createSignal:^(id subscriber) {
NSLock *lock = [[NSLock alloc] init];
__block id lastValue;
__block BOOL hasValue = NO;
RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) { // 暂时省略 }];
samplerDisposable.disposable = [sampler subscribeNext:^(id _) { // 暂时省略 }];
return [RACDisposable disposableWithBlock:^{
[samplerDisposable dispose];
[sourceDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -sample: %@", self.name, sampler];
}
sample:内部实现也是对原信号和入参信号sampler分别进行订阅。具体实现就是这两个信号订阅内部都干了些什么。
RACSerialDisposable *samplerDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[lock lock];
hasValue = YES;
lastValue = x;
[lock unlock];
} error:^(NSError *error) {
[samplerDisposable dispose];
[subscriber sendError:error];
} completed:^{
[samplerDisposable dispose];
[subscriber sendCompleted];
}];
这是对原信号的操作,原信号的操作在subscribeNext中就记录了两个变量的值,hasValue记录原信号有值,lastValue记录了原信号的最新的值。这里加了一层NSLock锁进行保护。
在发生error的时候,先把sampler信号取消订阅,然后再sendError:。当原信号完成的时候,同样是先把sampler信号取消订阅,然后再sendCompleted。
samplerDisposable.disposable = [sampler subscribeNext:^(id _) {
BOOL shouldSend = NO;
id value;
[lock lock];
shouldSend = hasValue;
value = lastValue;
[lock unlock];
if (shouldSend) {
[subscriber sendNext:value];
}
} error:^(NSError *error) {
[sourceDisposable dispose];
[subscriber sendError:error];
} completed:^{
[sourceDisposable dispose];
[subscriber sendCompleted];
}];
这是对入参信号sampler的操作。shouldSend默认值是NO,这个变量控制着是否sendNext:值。只有当原信号有值的时候,hasValue = YES,所以shouldSend = YES,这个时候才能发送原信号的值。这里我们并不关心入参信号sampler的值,从subscribeNext:^(id _)这里可以看出, _代表并不需要它的值。
在发生error的时候,先把原信号取消订阅,然后再sendError:。当sampler信号完成的时候,同样是先把原信号取消订阅,然后再sendCompleted。

经过sample:变换就会变成这个样子。只是把原信号的值都移动到了sampler信号发送信号的时刻,值还是和原信号的值一样。
高阶信号操作
1. flattenMap:
flattenMap:在整个RAC中具有很重要的地位,很多信号变换都是可以用flattenMap:来实现的。
map:,flatten,filter,sequenceMany:这4个操作都是用flattenMap:来实现的。然而其他变换操作实现里面用到map:,flatten,filter又有很多。
回顾一下map:的实现:
- (instancetype)map:(id (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^(id value) {
return [class return:block(value)];
}] setNameWithFormat:@"[%@] -map:", self.name];
}
map:的操作其实就是直接原信号进行的 flattenMap:的操作,变换出来的新的信号的值是block(value)。
flatten的实现接下去会具体分析,这里先略过。
filter的实现:
- (instancetype)filter:(BOOL (^)(id value))block {
NSCParameterAssert(block != nil);
Class class = self.class;
return [[self flattenMap:^ id (id value) {
block(value) ? return [class return:value] : return class.empty;
}] setNameWithFormat:@"[%@] -filter:", self.name];
}
filter的实现和map:有点类似,也是对原信号进行 flattenMap:的操作,只不过block(value)不是作为返回值,而是作为判断条件,满足这个闭包的条件,变换出来的新的信号返回值就是value,不满足的就返回empty信号。
接下去要分析的高阶操作里面,switchToLatest,try:,tryMap:的实现中也将会使用到flattenMap:。
flattenMap:的源码实现:
- (instancetype)flattenMap:(RACStream * (^)(id value))block {
Class class = self.class;
return [[self bind:^{
return ^(id value, BOOL *stop) {
id stream = block(value) ?: [class empty];
NSCAssert([stream isKindOfClass:RACStream.class], @"Value returned from -flattenMap: is not a stream: %@", stream);
return stream;
};
}] setNameWithFormat:@"[%@] -flattenMap:", self.name];
}
flattenMap:的实现是调用了bind函数,对原信号进行变换,并返回block(value)的新信号。
从flattenMap:的源码可以看到,它是可以支持类似Promise的串行异步操作的,并且flattenMap:是满足Monad中bind部分定义的。flattenMap:没法去实现takeUntil:和take:的操作。
然而,bind操作可以实现take:的操作,bind是完全满足Monad中bind部分定义的。
2. flatten
flatten的源码实现:
- (instancetype)flatten {
__weak RACStream *stream __attribute__((unused)) = self;
return [[self flattenMap:^(id value) {
return value;
}] setNameWithFormat:@"[%@] -flatten", self.name];
}
flatten操作必须是对高阶信号进行操作,如果信号里面不是信号,即不是高阶信号,那么就会崩溃。崩溃信息如下:
*** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: 'Value returned from -flattenMap: is not a stream
所以flatten是对高阶信号进行的降阶操作。高阶信号每发送一次信号,经过flatten变换,由于flattenMap:操作之后,返回的新的信号的每个值就是原信号中每个信号的值。

如果对信号A,信号B,信号C进行merge:操作,可以达到和flatten一样的效果。
[RACSignal merge:@[signalA,signalB,signalC]];
+ (RACSignal *)merge:(id)signals {
NSMutableArray *copiedSignals = [[NSMutableArray alloc] init];
for (RACSignal *signal in signals) {
[copiedSignals addObject:signal];
}
return [[[RACSignal
createSignal:^ RACDisposable * (id subscriber) {
for (RACSignal *signal in copiedSignals) {
[subscriber sendNext:signal];
}
[subscriber sendCompleted];
return nil;
}]
flatten]
setNameWithFormat:@"+merge: %@", copiedSignals];
}
现在在回来看这段代码,copiedSignals虽然是一个NSMutableArray,但是它近似合成了一个上图中的高阶信号。然后这些信号们每发送出来一个信号就发给订阅者。整个操作如flatten的字面意思一样,压平。
3. flatten:
flatten:操作也必须是对高阶信号进行操作,如果信号里面不是信号,即不是高阶信号,那么就会崩溃。
flatten:的实现比较复杂,一步步的来分析:
- (RACSignal *)flatten:(NSUInteger)maxConcurrent {
return [[RACSignal createSignal:^(id<RACSubscriber> subscriber) {
RACCompoundDisposable *compoundDisposable = [[RACCompoundDisposable alloc] init];
// Contains disposables for the currently active subscriptions.
//
// This should only be used while synchronized on `subscriber`.
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
// Whether the signal-of-signals has completed yet.
//
// This should only be used while synchronized on `subscriber`.
__block BOOL selfCompleted = NO;
// Subscribes to the given signal.
__block void (^subscribeToSignal)(RACSignal *);
// Weak reference to the above, to avoid a leak.
__weak __block void (^recur)(RACSignal *);
// Sends completed to the subscriber if all signals are finished.
//
// This should only be used while synchronized on `subscriber`.
void (^completeIfAllowed)(void) = ^{
if (selfCompleted && activeDisposables.count == 0) {
[subscriber sendCompleted];
// A strong reference is held to `subscribeToSignal` until completion,
// preventing it from deallocating early.
subscribeToSignal = nil;
}
};
// The signals waiting to be started.
//
// This array should only be used while synchronized on `subscriber`.
NSMutableArray *queuedSignals = [NSMutableArray array];
recur = subscribeToSignal = ^(RACSignal *signal) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
@synchronized (subscriber) {
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}
serialDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
__strong void (^subscribeToSignal)(RACSignal *) = recur;
RACSignal *nextSignal;
@synchronized (subscriber) {
[compoundDisposable removeDisposable:serialDisposable];
[activeDisposables removeObjectIdenticalTo:serialDisposable];
if (queuedSignals.count == 0) {
completeIfAllowed();
return;
}
nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
}
subscribeToSignal(nextSignal);
}];
};
[compoundDisposable addDisposable:[self subscribeNext:^(RACSignal *signal) {
if (signal == nil) return;
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
@synchronized (subscriber) {
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];
// If we need to wait, skip subscribing to this
// signal.
return;
}
}
subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (subscriber) {
selfCompleted = YES;
completeIfAllowed();
}
}]];
return compoundDisposable;
}] setNameWithFormat:@"[%@] -flatten: %lu", self.name, (unsigned long)maxConcurrent];
}
先来解释一些变量,数组的作用
activeDisposables里面装的是当前正在订阅的订阅者们的disposables信号。
queuedSignals里面装的是被暂时缓存起来的信号,它们等待被订阅。
selfCompleted表示高阶信号是否Completed。
subscribeToSignal闭包的作用是订阅所给的信号。这个闭包的入参参数就是一个信号,在闭包内部订阅这个信号,并进行一些操作。
recur是对subscribeToSignal闭包的一个弱引用,防止strong-weak循环引用,在下面会分析subscribeToSignal闭包,就会明白为什么recur要用weak修饰了。
completeIfAllowed的作用是在所有信号都发送完毕的时候,通知订阅者,给订阅者发送completed。
入参maxConcurrent的意思是最大可容纳同时被订阅的信号个数。
再来详细分析一下具体订阅的过程。
flatten:的内部,订阅高阶信号发出来的信号,这部分的代码比较简单:
[self subscribeNext:^(RACSignal *signal) {
if (signal == nil) return;
NSCAssert([signal isKindOfClass:RACSignal.class], @"Expected a RACSignal, got %@", signal);
@synchronized (subscriber) {
// 1
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent) {
[queuedSignals addObject:signal];
return;
}
}
// 2
subscribeToSignal(signal);
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
@synchronized (subscriber) {
selfCompleted = YES;
// 3
completeIfAllowed();
}
}]];
- 如果当前最大可容纳信号的个数 > 0 ,且,activeDisposables数组里面已经装满到最大可容纳信号的个数,不能再装新的信号了。那么就把当前的信号缓存到queuedSignals数组中。
- 直到activeDisposables数组里面有空的位子可以加入新的信号,那么就调用subscribeToSignal( )闭包,开始订阅这个新的信号。
- 最后完成的时候标记变量selfCompleted为YES,并且调用completeIfAllowed( )闭包。
void (^completeIfAllowed)(void) = ^{
if (selfCompleted && activeDisposables.count == 0) {
[subscriber sendCompleted];
subscribeToSignal = nil;
}
};
当selfCompleted = YES 并且activeDisposables数组里面的信号都发送完毕,没有可以发送的信号了,即activeDisposables.count = 0,那么就给订阅者sendCompleted。这里值得一提的是,还需要把subscribeToSignal手动置为nil。因为在subscribeToSignal闭包中强引用了completeIfAllowed闭包,防止completeIfAllowed闭包被提早的销毁掉了。所以在completeIfAllowed闭包执行完毕的时候,需要再把subscribeToSignal闭包置为nil。
那么接下来需要看的重点就是subscribeToSignal( )闭包。
recur = subscribeToSignal = ^(RACSignal *signal) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
// 1
@synchronized (subscriber) {
[compoundDisposable addDisposable:serialDisposable];
[activeDisposables addObject:serialDisposable];
}
serialDisposable.disposable = [signal subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
// 2
__strong void (^subscribeToSignal)(RACSignal *) = recur;
RACSignal *nextSignal;
// 3
@synchronized (subscriber) {
[compoundDisposable removeDisposable:serialDisposable];
[activeDisposables removeObjectIdenticalTo:serialDisposable];
// 4
if (queuedSignals.count == 0) {
completeIfAllowed();
return;
}
// 5
nextSignal = queuedSignals[0];
[queuedSignals removeObjectAtIndex:0];
}
// 6
subscribeToSignal(nextSignal);
}];
};
- activeDisposables先添加当前高阶信号发出来的信号的Disposable( 也就是入参信号的Disposable)
- 这里会对recur进行__strong,因为下面第6步会用到subscribeToSignal( )闭包,同样也是为了防止出现循环引用。
- 订阅入参信号,给订阅者发送信号。当发送完毕后,activeDisposables中移除它对应的Disposable。
- 如果当前缓存的queuedSignals数组里面没有缓存的信号,那么就调用completeIfAllowed( )闭包。
- 如果当前缓存的queuedSignals数组里面有缓存的信号,那么就取出第0个信号,并在queuedSignals数组移除它。
- 把第4步取出的信号继续订阅,继续调用subscribeToSignal( )闭包。
总结一下:高阶信号每发送一个信号值,判断activeDisposables数组装的个数是否已经超过了maxConcurrent。如果装不下了就缓存进queuedSignals数组中。如果还可以装的下就开始调用subscribeToSignal( )闭包,订阅当前信号。
每发送完一个信号就判断缓存数组queuedSignals的个数,如果缓存数组里面已经没有信号了,那么就结束原来高阶信号的发送。如果缓存数组里面还有信号就继续订阅。如此循环,直到原高阶信号所有的信号都发送完毕。
整个flatten:的执行流程都分析清楚了,最后,关于入参maxConcurrent进行更进一步的解读。
回看上面flatten:的实现中有这样一句话:
if (maxConcurrent > 0 && activeDisposables.count >= maxConcurrent)
那么maxConcurrent的值域就是最终决定flatten:表现行为。
如果maxConcurrent
NSMutableArray *activeDisposables = [[NSMutableArray alloc] initWithCapacity:maxConcurrent];
activeDisposables在初始化的时候会初始化一个大小为maxConcurrent的NSMutableArray。如果maxConcurrent
如果maxConcurrent = 0,会发生什么?那么flatten:就退化成flatten了。

如果maxConcurrent = 1,会发生什么?那么flatten:就退化成concat了。

如果maxConcurrent > 1,会发生什么?由于至今还没有遇到能用到maxConcurrent > 1的需求情况,所以这里暂时不展示图解了。maxConcurrent > 1之后,flatten的行为还依照高阶信号的个数和maxConcurrent的关系。如果高阶信号的个数maxConcurrent的值,那么多的信号就会进入queuedSignals缓存数组。
4. concat
这里的concat实现是在RACSignal里面定义的。
- (RACSignal *)concat {
return [[self flatten:1] setNameWithFormat:@"[%@] -concat", self.name];
}
一看源码就知道了,concat其实就是flatten:1。
当然在RACSignal中定义了concat:方法,这个方法在之前的文章已经分析过了,这里回顾对比一下:
- (RACSignal *)concat:(RACSignal *)signal {
return [[RACSignal createSignal:^(id subscriber) {
RACSerialDisposable *serialDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *sourceDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
RACDisposable *concattedDisposable = [signal subscribe:subscriber];
serialDisposable.disposable = concattedDisposable;
}];
serialDisposable.disposable = sourceDisposable;
return serialDisposable;
}] setNameWithFormat:@"[%@] -concat: %@", self.name, signal];
}

经过对比可以发现,虽然最终变换出来的结果类似,但是针对的信号的对象是不同的,concat是针对高阶信号进行降阶操作。concat:是把两个信号连接起来的操作。如果把高阶信号按照时间轴,从左往右,依次把每个信号都concat:连接起来,那么结果就是concat。
5. switchToLatest
- (RACSignal *)switchToLatest {
return [[RACSignal createSignal:^(id subscriber) {
RACMulticastConnection *connection = [self publish];
RACDisposable *subscriptionDisposable = [[connection.signal
flattenMap:^(RACSignal *x) {
NSCAssert(x == nil || [x isKindOfClass:RACSignal.class], @"-switchToLatest requires that the source signal (%@) send signals. Instead we got: %@", self, x);
return [x takeUntil:[connection.signal concat:[RACSignal never]]];
}]
subscribe:subscriber];
RACDisposable *connectionDisposable = [connection connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[connectionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -switchToLatest", self.name];
}
switchToLatest这个操作只能用在高阶信号上,如果原信号里面有不是信号的值,那么就会崩溃,崩溃信息如下:
***** Terminating app due to uncaught exception 'NSInternalInconsistencyException', reason: '-switchToLatest requires that the source signal ( name: ) send signals.
在switchToLatest操作中,先把原信号转换成热信号,connection.signal就是RACSubject类型的。对RACSubject进行flattenMap:变换。在flattenMap:变换中,connection.signal会先concat:一个never信号。这里concat:一个never信号的原因是为了内部的信号过早的结束而导致订阅者收到complete信号。
flattenMap:变换中x也是一个信号,对x进行takeUntil:变换,效果就是下一个信号到来之前,x会一直发送信号,一旦下一个信号到来,x就会被取消订阅,开始订阅新的信号。

一个高阶信号经过switchToLatest降阶操作之后,能得到上图中的信号。
6. switch: cases: default:
switch: cases: default:源码实现如下:
+ (RACSignal *)switch:(RACSignal *)signal cases:(NSDictionary *)cases default:(RACSignal *)defaultSignal {
NSCParameterAssert(signal != nil);
NSCParameterAssert(cases != nil);
for (id key in cases) {
id value __attribute__((unused)) = cases[key];
NSCAssert([value isKindOfClass:RACSignal.class], @"Expected all cases to be RACSignals, %@ isn't", value);
}
NSDictionary *copy = [cases copy];
return [[[signal
map:^(id key) {
if (key == nil) key = RACTupleNil.tupleNil;
RACSignal *signal = copy[key] ?: defaultSignal;
if (signal == nil) {
NSString *description = [NSString stringWithFormat:NSLocalizedString(@"No matching signal found for value %@", @""), key];
return [RACSignal error:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorNoMatchingCase userInfo:@{ NSLocalizedDescriptionKey: description }]];
}
return signal;
}]
switchToLatest]
setNameWithFormat:@"+switch: %@ cases: %@ default: %@", signal, cases, defaultSignal];
}
实现中有3个断言,全部都是针对入参的要求。入参signal信号和cases字典都不能是nil。其次,cases字典里面所有key对应的value必须是RACSignal类型的。注意,defaultSignal是可以为nil的。
接下来的实现比较简单,对入参传进来的signal信号进行map变换,这里的变换是升阶的变换。
signal每次发送出来的一个值,就把这个值当做key值去cases字典里面去查找对应的value。当然value对应的是一个信号。如果value对应的信号不为空,就把signal发送出来的这个值map成字典里面对应的信号。如果value对应为空,那么就把原signal发出来的值map成defaultSignal信号。
如果经过转换之后,得到的信号为nil,就会返回一个error信号。如果得到的信号不为nil,那么原信号完全转换完成就会变成一个高阶信号,这个高阶信号里面装的都是信号。最后再对这个高阶信号执行switchToLatest转换。
7. if: then: else:
if: then: else:源码实现如下:
+ (RACSignal *)if:(RACSignal *)boolSignal then:(RACSignal *)trueSignal else:(RACSignal *)falseSignal {
NSCParameterAssert(boolSignal != nil);
NSCParameterAssert(trueSignal != nil);
NSCParameterAssert(falseSignal != nil);
return [[[boolSignal
map:^(NSNumber *value) {
NSCAssert([value isKindOfClass:NSNumber.class], @"Expected %@ to send BOOLs, not %@", boolSignal, value);
return (value.boolValue ? trueSignal : falseSignal);
}]
switchToLatest]
setNameWithFormat:@"+if: %@ then: %@ else: %@", boolSignal, trueSignal, falseSignal];
}
入参boolSignal,trueSignal,falseSignal三个信号都不能为nil。
boolSignal里面都必须装的是NSNumber类型的值。
针对boolSignal进行map升阶操作,boolSignal信号里面的值如果是YES,那么就转换成trueSignal信号,如果为NO,就转换成falseSignal。升阶转换完成之后,boolSignal就是一个高阶信号,然后再进行switchToLatest操作。
8. catch:
catch:的实现如下:
- (RACSignal *)catch:(RACSignal * (^)(NSError *error))catchBlock {
NSCParameterAssert(catchBlock != NULL);
return [[RACSignal createSignal:^(id subscriber) {
RACSerialDisposable *catchDisposable = [[RACSerialDisposable alloc] init];
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
RACSignal *signal = catchBlock(error);
NSCAssert(signal != nil, @"Expected non-nil signal from catch block on %@", self);
catchDisposable.disposable = [signal subscribe:subscriber];
} completed:^{
[subscriber sendCompleted];
}];
return [RACDisposable disposableWithBlock:^{
[catchDisposable dispose];
[subscriptionDisposable dispose];
}];
}] setNameWithFormat:@"[%@] -catch:", self.name];
}
当对原信号进行订阅的时候,如果出现了错误,会去执行catchBlock( )闭包,入参为刚刚产生的error。catchBlock( )闭包产生的是一个新的RACSignal,并再次用订阅者订阅该信号。
这里之所以说是高阶操作,是因为这里原信号发生错误之后,错误会升阶成一个信号。
9. catchTo:
catchTo:的实现如下:
- (RACSignal *)catchTo:(RACSignal *)signal {
return [[self catch:^(NSError *error) {
return signal;
}] setNameWithFormat:@"[%@] -catchTo: %@", self.name, signal];
}
catchTo:的实现就是调用catch:方法,只不过原来catch:方法里面的catchBlock( )闭包,永远都只返回catchTo:的入参,signal信号。
10. try:
- (RACSignal *)try:(BOOL (^)(id value, NSError **errorPtr))tryBlock {
NSCParameterAssert(tryBlock != NULL);
return [[self flattenMap:^(id value) {
NSError *error = nil;
BOOL passed = tryBlock(value, &error);
return (passed ? [RACSignal return:value] : [RACSignal error:error]);
}] setNameWithFormat:@"[%@] -try:", self.name];
}
try:也是一个高阶操作。对原信号进行flattenMap变换,对信号发出来的每个值都调用一遍tryBlock( )闭包,如果这个闭包的返回值是YES,那么就返回[RACSignal return:value],如果闭包的返回值是NO,那么就返回error。原信号中如果都是值,那么经过try:操作之后,每个值都会变成RACSignal,于是原信号也就变成了高阶信号了。
11. tryMap:
- (RACSignal *)tryMap:(id (^)(id value, NSError **errorPtr))mapBlock {
NSCParameterAssert(mapBlock != NULL);
return [[self flattenMap:^(id value) {
NSError *error = nil;
id mappedValue = mapBlock(value, &error);
return (mappedValue == nil ? [RACSignal error:error] : [RACSignal return:mappedValue]);
}] setNameWithFormat:@"[%@] -tryMap:", self.name];
}
tryMap:的实现和try:的实现基本一致,唯一不同的就是入参闭包的返回值不同。在tryMap:中调用mapBlock( )闭包,返回是一个对象,如果这个对象不为nil,就返回[RACSignal return:mappedValue]。如果返回的对象是nil,那么就变换成error信号。
12. timeout: onScheduler:
- (RACSignal *)timeout:(NSTimeInterval)interval onScheduler:(RACScheduler *)scheduler {
NSCParameterAssert(scheduler != nil);
NSCParameterAssert(scheduler != RACScheduler.immediateScheduler);
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *timeoutDisposable = [scheduler afterDelay:interval schedule:^{
[disposable dispose];
[subscriber sendError:[NSError errorWithDomain:RACSignalErrorDomain code:RACSignalErrorTimedOut userInfo:nil]];
}];
[disposable addDisposable:timeoutDisposable];
RACDisposable *subscriptionDisposable = [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[disposable dispose];
[subscriber sendError:error];
} completed:^{
[disposable dispose];
[subscriber sendCompleted];
}];
[disposable addDisposable:subscriptionDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -timeout: %f onScheduler: %@", self.name, (double)interval, scheduler];
}
timeout: onScheduler:的实现很简单,它比正常的信号订阅多了一个timeoutDisposable操作。它在信号订阅的内部开启了一个scheduler,经过interval的时间之后,就会停止订阅原信号,并对订阅者sendError。
这个操作的表意和方法名完全一致,经过interval的时间之后,就算timeout,那么就停止订阅原信号,并sendError。
总结一下ReactiveCocoa v2.5中高阶信号的升阶 / 降阶操作:
升阶操作:
- map( 把值map成一个信号)
- [RACSignal return:signal]
降阶操作:
- flatten(等效于flatten:0,+merge:)
- concat(等效于flatten:1)
- flatten:1
- switchToLatest
- flattenMap:
这5种操作能将高阶信号变为低阶信号,但是最终降阶之后的效果就只有3种:switchToLatest,flatten,concat。具体的图示见上面的分析。
同步操作
在ReactiveCocoa中还包含一些同步的操作,这些操作一般我们很少使用,除非真的很确定这样做了之后不会有什么问题,否则胡乱使用会导致线程死锁等一些严重的问题。
- (id)firstOrDefault:(id)defaultValue success:(BOOL *)success error:(NSError **)error {
NSCondition *condition = [[NSCondition alloc] init];
condition.name = [NSString stringWithFormat:@"[%@] -firstOrDefault: %@ success:error:", self.name, defaultValue];
__block id value = defaultValue;
__block BOOL done = NO;
// Ensures that we don't pass values across thread boundaries by reference.
__block NSError *localError;
__block BOOL localSuccess;
[[self take:1] subscribeNext:^(id x) {
// 加锁
[condition lock];
value = x;
localSuccess = YES;
done = YES;
[condition broadcast];
// 解锁
[condition unlock];
} error:^(NSError *e) {
// 加锁
[condition lock];
if (!done) {
localSuccess = NO;
localError = e;
done = YES;
[condition broadcast];
}
// 解锁
[condition unlock];
} completed:^{
// 加锁
[condition lock];
localSuccess = YES;
done = YES;
[condition broadcast];
// 解锁
[condition unlock];
}];
// 加锁
[condition lock];
while (!done) {
[condition wait];
}
if (success != NULL) *success = localSuccess;
if (error != NULL) *error = localError;
// 解锁
[condition unlock];
return value;
}
从源码上看,firstOrDefault: success: error:这种同步的方法很容易导致线程死锁。它在subscribeNext,error,completed的闭包里面都调用condition锁先lock再unlock。如果一个信号发送值过来,都没有执行subscribeNext,error,completed这3个操作里面的任意一个,那么就会执行[condition wait],等待。
由于对原信号进行了take:1操作,所以只会对第一个值进行操作。执行完subscribeNext,error,completed这3个操作里面的任意一个,又会加一次锁,对外部传进来的入参success和error进行赋值,已便外部可以拿到里面的状态。最终返回信号是原信号中第一个next里面的值,如果原信号第一个值没有,比如直接error或者completed,那么返回的是defaultValue。
done为YES表示已经成功执行了subscribeNext,error,completed这3个操作里面的任意一个。反之为NO。
localSuccess为YES表示成功发送值或者成功发送完了原信号的所有值,期间没有发生错误。
condition的broadcast操作是唤醒其他线程的操作,相当于操作系统里面互斥信号量的signal操作。
入参defaultValue是给内部变量value的一个初始值。当原信号发送出一个值之后,value的值时刻都会与原信号的值保持一致。
success和error是外部变量的地址,从外面可以监听到里面的状态。在函数内部赋值,在函数外面拿到它们的值。
2. firstOrDefault:
- (id)firstOrDefault:(id)defaultValue {
return [self firstOrDefault:defaultValue success:NULL error:NULL];
}
firstOrDefault:的实现就是调用了firstOrDefault: success: error:方法。只不过不需要传success和error,不关心内部的状态。最终返回信号是原信号中第一个next里面的值,如果原信号第一个值没有,比如直接error或者completed,那么返回的是defaultValue。
3. first
- (id)first {
return [self firstOrDefault:nil];
}
first方法就更加省略,连defaultValue也不传。最终返回信号是原信号中第一个next里面的值,如果原信号第一个值没有,比如直接error或者completed,那么返回的是nil。
4. waitUntilCompleted:
- (BOOL)waitUntilCompleted:(NSError **)error {
BOOL success = NO;
[[[self
ignoreValues]
setNameWithFormat:@"[%@] -waitUntilCompleted:", self.name]
firstOrDefault:nil success:&success error:error];
return success;
}
waitUntilCompleted:里面还是调用firstOrDefault: success: error:方法。返回值是success。只要原信号正常的发送完信号,success应该为YES,但是如果发送过程中出现了error,success就为NO。success作为返回值,外部就可以监听到是否发送成功。
虽然这个方法可以监听到发送结束的状态,但是也尽量不要使用,因为它的实现调用了firstOrDefault: success: error:方法,这个方法里面有大量的锁的操作,一不留神就会导致死锁。
5. toArray
- (NSArray *)toArray {
return [[[self collect] first] copy];
}
经过collect之后,原信号所有的值都会被加到一个数组里面,取出信号的第一个值就是一个数组。所以执行完first之后第一个值就是原信号所有值的数组。
副作用操作
1. doNext:
- (RACSignal *)doNext:(void (^)(id x))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id subscriber) {
return [self subscribeNext:^(id x) {
block(x);
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doNext:", self.name];
}
doNext:能让我们在原信号sendNext之前,能执行一个block闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。
2. doError:
- (RACSignal *)doError:(void (^)(NSError *error))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
block(error);
[subscriber sendError:error];
} completed:^{
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doError:", self.name];
}
doError:能让我们在原信号sendError之前,能执行一个block闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。
3. doCompleted:
- (RACSignal *)doCompleted:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id subscriber) {
return [self subscribeNext:^(id x) {
[subscriber sendNext:x];
} error:^(NSError *error) {
[subscriber sendError:error];
} completed:^{
block();
[subscriber sendCompleted];
}];
}] setNameWithFormat:@"[%@] -doCompleted:", self.name];
}
doCompleted:能让我们在原信号sendCompleted之前,能执行一个block闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。
4. initially:
- (RACSignal *)initially:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal defer:^{
block();
return self;
}] setNameWithFormat:@"[%@] -initially:", self.name];
}
initially:能让我们在原信号发送之前,先调用了defer:操作,在return self之前先执行了一个闭包,在这个闭包中我们可以执行我们想要执行的副作用操作。
5. finally:
- (RACSignal *)finally:(void (^)(void))block {
NSCParameterAssert(block != NULL);
return [[[self
doError:^(NSError *error) {
block();
}]
doCompleted:^{
block();
}]
setNameWithFormat:@"[%@] -finally:", self.name];
}
finally:操作调用了doError:和doCompleted:操作,依次在sendError之前,sendCompleted之前,插入一个block( )闭包。这样当信号因为错误而要终止取消订阅,或者,发送结束之前,都能执行一段我们想要执行的副作用操作。
多线程操作
在RACSignal里面有3个关于多线程的操作。
1. deliverOn:
- (RACSignal *)deliverOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id subscriber) {
return [self subscribeNext:^(id x) {
[scheduler schedule:^{
[subscriber sendNext:x];
}];
} error:^(NSError *error) {
[scheduler schedule:^{
[subscriber sendError:error];
}];
} completed:^{
[scheduler schedule:^{
[subscriber sendCompleted];
}];
}];
}] setNameWithFormat:@"[%@] -deliverOn: %@", self.name, scheduler];
}
deliverOn:的入参是一个scheduler,当原信号subscribeNext,sendError,sendCompleted的时候,都去调用scheduler的schedule方法。
- (RACDisposable *)schedule:(void (^)(void))block {
NSCParameterAssert(block != NULL);
if (RACScheduler.currentScheduler == nil) return [self.backgroundScheduler schedule:block];
block();
return nil;
}
在schedule的方法里面会判断当前currentScheduler是否为nil,如果是nil就调用backgroundScheduler去执行block( )闭包,如果不为nil,当前currentScheduler直接执行block( )闭包。
+ (instancetype)currentScheduler {
RACScheduler *scheduler = NSThread.currentThread.threadDictionary[RACSchedulerCurrentSchedulerKey];
if (scheduler != nil) return scheduler;
if ([self.class isOnMainThread]) return RACScheduler.mainThreadScheduler;
return nil;
}
判断currentScheduler是否存在,看两点,一是当前线程的字典里面,是否存在RACSchedulerCurrentSchedulerKey( @”RACSchedulerCurrentSchedulerKey” ),如果存在对应的value,返回scheduler,二是看当前的类是不是在主线程,如果在主线程,返回mainThreadScheduler。如果两个条件都不存在,那么当前currentScheduler就不存在,返回nil。
deliverOn:操作的特点是原信号发送sendNext,sendError,sendCompleted所在线程是确定的。
2. subscribeOn:
- (RACSignal *)subscribeOn:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id subscriber) {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [scheduler schedule:^{
RACDisposable *subscriptionDisposable = [self subscribe:subscriber];
[disposable addDisposable:subscriptionDisposable];
}];
[disposable addDisposable:schedulingDisposable];
return disposable;
}] setNameWithFormat:@"[%@] -subscribeOn: %@", self.name, scheduler];
}
subscribeOn:操作就是在传入的scheduler的闭包内部订阅原信号的。它与deliverOn:操作就不同:
subscribeOn:操作能够保证didSubscribe block( )闭包在入参scheduler中执行,但是不能保证原信号subscribeNext,sendError,sendCompleted在哪个scheduler中执行。
deliverOn:与subscribeOn:正好反过来,能保证原信号subscribeNext,sendError,sendCompleted在哪个scheduler中执行,但是不能保证didSubscribe block( )闭包在哪个scheduler中执行。
3. deliverOnMainThread
- (RACSignal *)deliverOnMainThread {
return [[RACSignal createSignal:^(id subscriber) {
__block volatile int32_t queueLength = 0;
void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) { // 暂时省略};
return [self subscribeNext:^(id x) {
performOnMainThread(^{
[subscriber sendNext:x];
});
} error:^(NSError *error) {
performOnMainThread(^{
[subscriber sendError:error];
});
} completed:^{
performOnMainThread(^{
[subscriber sendCompleted];
});
}];
}] setNameWithFormat:@"[%@] -deliverOnMainThread", self.name];
}
对比deliverOn:的源码实现,发现两者比较相似,只不过这里deliverOnMainThread把sendNext,sendError,sendCompleted都包在了performOnMainThread闭包中执行。
__block volatile int32_t queueLength = 0;
void (^performOnMainThread)(dispatch_block_t) = ^(dispatch_block_t block) {
int32_t queued = OSAtomicIncrement32(&queueLength);
if (NSThread.isMainThread && queued == 1) {
block();
OSAtomicDecrement32(&queueLength);
} else {
dispatch_async(dispatch_get_main_queue(), ^{
block();
OSAtomicDecrement32(&queueLength);
});
}
};
performOnMainThread闭包内部保证了入参block( )闭包一定是在主线程中执行。
OSAtomicIncrement32 和 OSAtomicDecrement32是原子操作,分别代表+1和-1。下面的if-else判断里面,不管是满足哪一条,最终都还是在主线程中执行block( )闭包。
deliverOnMainThread能保证原信号subscribeNext,sendError,sendCompleted都在主线程MainThread中执行。
RACSequence和RACTuple分析
在OOP的世界里使用FRP的思想来编程,光有函数这种一等公民,还是无法满足我们一些需求的。因此还是需要引用变量来完成各式各样的类的操作行为。
在前几篇文章中详细的分析了RACStream中RACSignal的底层实现。RACStream还有另外一个子类,RACSequence,这个类是RAC专门为集合而设计的。
RACTuple底层实现分析
- RACTuple
@interface RACTuple : NSObject
@property (nonatomic, readonly) NSUInteger count;
@property (nonatomic, readonly) id first;
@property (nonatomic, readonly) id second;
@property (nonatomic, readonly) id third;
@property (nonatomic, readonly) id fourth;
@property (nonatomic, readonly) id fifth;
@property (nonatomic, readonly) id last;
@property (nonatomic, strong) NSArray *backingArray;
@property (nonatomic, copy, readonly) RACSequence *rac_sequence; // 这个是专门为sequence提供的一个扩展
@end
RACTuple的定义看上去很简单,底层实质就是一个NSArray,只不过封装了一些方法。RACTuple继承了NSCoding, NSCopying, NSFastEnumeration这三个协议。
- (id)initWithCoder:(NSCoder *)coder {
self = [self init];
if (self == nil) return nil;
self.backingArray = [coder decodeObjectForKey:@keypath(self.backingArray)];
return self;
}
- (void)encodeWithCoder:(NSCoder *)coder {
if (self.backingArray != nil) [coder encodeObject:self.backingArray forKey:@keypath(self.backingArray)];
}
这里是NSCoding协议。都是对内部的backingArray进行decodeObjectForKey:和encodeObject: 。
- (instancetype)copyWithZone:(NSZone *)zone {
// we're immutable, bitches!
上面这是NSCopying协议。由于内部是基于NSArray的,所以是immutable不可变的。
- (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(id __unsafe_unretained [])buffer count:(NSUInteger)len {
return [self.backingArray countByEnumeratingWithState:state objects:buffer count:len];
}
上面是NSFastEnumeration协议,快速枚举也都是针对NSArray进行的操作。
// 三个类方法
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array;
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert;
+ (instancetype)tupleWithObjects:(id)object, ... NS_REQUIRES_NIL_TERMINATION;
- (id)objectAtIndex:(NSUInteger)index;
- (NSArray *)allObjects;
- (instancetype)tupleByAddingObject:(id)obj;
RACTuple的方法也不多,总共就6个方法,3个类方法,3个实例方法。
先看类方法:
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array {
return [self tupleWithObjectsFromArray:array convertNullsToNils:NO];
}
+ (instancetype)tupleWithObjectsFromArray:(NSArray *)array convertNullsToNils:(BOOL)convert {
RACTuple *tuple = [[self alloc] init];
if (convert) {
NSMutableArray *newArray = [NSMutableArray arrayWithCapacity:array.count];
for (id object in array) {
[newArray addObject:(object == NSNull.null ? RACTupleNil.tupleNil : object)];
}
tuple.backingArray = newArray;
} else {
tuple.backingArray = [array copy];
}
return tuple;
}
先看这两个类方法,这两个类方法的区别在于是否把NSNull转换成RACTupleNil类型。根据入参array初始化RACTuple内部的NSArray。
RACTuplePack( ) 和 RACTuplePack_( )这两个宏的实现也是调用了tupleWithObjectsFromArray:方法
#define RACTuplePack(...) \
RACTuplePack_(__VA_ARGS__)
#define RACTuplePack_(...) \
([RACTuple tupleWithObjectsFromArray:@[ metamacro_foreach(RACTuplePack_object_or_ractuplenil,, __VA_ARGS__) ]])
这里需要注意的是RACTupleNil
+ (RACTupleNil *)tupleNil {
static dispatch_once_t onceToken;
static RACTupleNil *tupleNil = nil;
dispatch_once(&onceToken, ^{
tupleNil = [[self alloc] init];
});
return tupleNil;
}
RACTupleNil是一个单例。
重点需要解释的是另外一种类方法:
+ (instancetype)tupleWithObjects:(id)object, ... {
RACTuple *tuple = [[self alloc] init];
va_list args;
va_start(args, object);
NSUInteger count = 0;
for (id currentObject = object; currentObject != nil; currentObject = va_arg(args, id)) {
++count;
}
va_end(args);
if (count == 0) {
tuple.backingArray = @[];
return tuple;
}
NSMutableArray *objects = [[NSMutableArray alloc] initWithCapacity:count];
va_start(args, object);
for (id currentObject = object; currentObject != nil; currentObject = va_arg(args, id)) {
[objects addObject:currentObject];
}
va_end(args);
tuple.backingArray = objects;
return tuple;
}
这个类方法的参数是可变参数类型。由于用到了可变参数类型,所以就会用到va_list,va_start,va_arg,va_end。
#ifndef _VA_LIST_T
#define _VA_LIST_T
typedef __darwin_va_list va_list;
#endif /* _VA_LIST_T */
#ifndef _VA_LIST
typedef __builtin_va_list va_list;
#define _VA_LIST
#endif
#define va_start(ap, param) __builtin_va_start(ap, param)
#define va_end(ap) __builtin_va_end(ap)
#define va_arg(ap, type) __builtin_va_arg(ap, type)
- va_list用于声明一个变量,我们知道函数的可变参数列表其实就是一个字符串,所以va_list才被声明为字符型指针,这个类型用于声明一个指向参数列表的字符型指针变量,例如:va_list ap;//ap:arguement pointer
- va_start(ap,v),它的第一个参数是指向可变参数字符串的变量,第二个参数是可变参数函数的第一个参数,通常用于指定可变参数列表中参数的个数。
- va_arg(ap,t),它的第一个参数指向可变参数字符串的变量,第二个参数是可变参数的类型。
- va_end(ap) 用于将存放可变参数字符串的变量清空(赋值为NULL)。
剩下的3个实例方法都是对数组的操作,没有什么难度。
一般使用用两个宏,RACTupleUnpack( ) 用来解包,RACTuplePack( ) 用来装包。
RACTupleUnpack(NSString *string, NSNumber *num) = [RACTuple tupleWithObjects:@"foo", @5, nil];
RACTupleUnpack(NSString *string, NSNumber *num) = RACTuplePack(@"foo",@(5));
NSLog(@"string: %@", string);
NSLog(@"num: %@", num);
/* 上面的做法等价于下面的 */
RACTuple *t = [RACTuple tupleWithObjects:@"foo", @5, nil];
NSString *string = t[0];
NSNumber *num = t[1];
NSLog(@"string: %@", string);
NSLog(@"num: %@", num);
关于RACTuple还有2个相关的类,RACTupleUnpackingTrampoline,RACTupleSequence。
- RACTupleUnpackingTrampoline
@interface RACTupleUnpackingTrampoline : NSObject
+ (instancetype)trampoline;
- (void)setObject:(RACTuple *)tuple forKeyedSubscript:(NSArray *)variables;
@end
首先这个类是一个单例。
+ (instancetype)trampoline {
static dispatch_once_t onceToken;
static id trampoline = nil;
dispatch_once(&onceToken, ^{
trampoline = [[self alloc] init];
});
return trampoline;
}
RACTupleUnpackingTrampoline这个类也就只有一个作用,就是它对应的实例方法。
- (void)setObject:(RACTuple *)tuple forKeyedSubscript:(NSArray *)variables {
NSCParameterAssert(variables != nil);
[variables enumerateObjectsUsingBlock:^(NSValue *value, NSUInteger index, BOOL *stop) {
__strong id *ptr = (__strong id *)value.pointerValue;
*ptr = tuple[index];
}];
}
这个方法里面会遍历入参数组NSArray,然后依次取出数组里面每个value 的指针,用这个指针又赋值给了tuple[index]。
为了解释清楚这个方法的作用,写出测试代码:
RACTupleUnpackingTrampoline *tramp = [RACTupleUnpackingTrampoline trampoline];
NSString *string;
NSString *string1;
NSString *string2;
NSArray *array = [NSArray arrayWithObjects:[NSValue valueWithPointer:&string],[NSValue valueWithPointer:&string1],[NSValue valueWithPointer:&string2], nil];
NSLog(@"调用方法之前 string = %@,string1 = %@,string2 = %@",string,string1,string2);
[tramp setObject:[RACTuple tupleWithObjectsFromArray:@[(@"foo"),(@(10)),@"32323"]] forKeyedSubscript:array];
NSLog(@"调用方法之后 string = %@,string1 = %@,string2 = %@",string,string1,string2);
输出如下:
调用方法之前 string = (null),string1 = (null),string2 = (null)
调用方法之后 string = foo,string1 = 10,string2 = 32323
这个函数的作用也就一清二楚了。但是平时我们是很少用到[NSValue valueWithPointer:&string]这种写法的。究竟是什么地方会用到这个函数呢?全局搜索一下,找到了用到这个的地方。
在RACTuple 中两个非常有用的宏:RACTupleUnpack( ) 用来解包,RACTuplePack( ) 用来装包。RACTuplePack( )的实现在上面分析过了,实际是调用tupleWithObjectsFromArray:方法。那么RACTupleUnpack( ) 的宏是怎么实现的呢?这里就用到了RACTupleUnpackingTrampoline。
#define RACTupleUnpack_(...) \
metamacro_foreach(RACTupleUnpack_decl,, __VA_ARGS__) \
\
int RACTupleUnpack_state = 0; \
\
RACTupleUnpack_after: \
; \
metamacro_foreach(RACTupleUnpack_assign,, __VA_ARGS__) \
if (RACTupleUnpack_state != 0) RACTupleUnpack_state = 2; \
\
while (RACTupleUnpack_state != 2) \
if (RACTupleUnpack_state == 1) { \
goto RACTupleUnpack_after; \
} else \
for (; RACTupleUnpack_state != 1; RACTupleUnpack_state = 1) \
[RACTupleUnpackingTrampoline trampoline][ @[ metamacro_foreach(RACTupleUnpack_value,, __VA_ARGS__) ] ]
以上就是RACTupleUnpack( ) 具体的宏。看上去很复杂。还是写出测试代码分析分析。
RACTupleUnpack(NSString *string, NSNumber *num) = RACTuplePack(@"foo",@(10));
把上述的代码编译之后的代码贴出来:
__attribute__((objc_ownership(strong))) id RACTupleUnpack284_var0;
__attribute__((objc_ownership(strong))) id RACTupleUnpack284_var1;
int RACTupleUnpack_state284 = 0;
RACTupleUnpack_after284: ;
__attribute__((objc_ownership(strong))) NSString *string = RACTupleUnpack284_var0;
__attribute__((objc_ownership(strong))) NSNumber *num = RACTupleUnpack284_var1;
if (RACTupleUnpack_state284 != 0)
RACTupleUnpack_state284 = 2;
while (RACTupleUnpack_state284 != 2)
if (RACTupleUnpack_state284 == 1) {
goto RACTupleUnpack_after284;
} else for (; RACTupleUnpack_state284 != 1; RACTupleUnpack_state284 = 1)
[RACTupleUnpackingTrampoline trampoline][ @[ [NSValue valueWithPointer:&RACTupleUnpack284_var0], [NSValue valueWithPointer:&RACTupleUnpack284_var1], ] ] = ([RACTuple tupleWithObjectsFromArray:@[ (@"foo") ?: RACTupleNil.tupleNil, (@(10)) ?: RACTupleNil.tupleNil, ]]);
转换成这样就比较好理解了。RACTupleUnpack_after284: 是一个标号。RACTupleUnpack_state284初始值为0,在下面while里面有一个for循环,在这个循环里面会进行解包操作,也就是会调用setObject:forKeyedSubscript:函数。
在循环里面,
[RACTupleUnpackingTrampoline trampoline][ @[ [NSValue valueWithPointer:&RACTupleUnpack284_var0], [NSValue valueWithPointer:&RACTupleUnpack284_var1], ] ]
这里就是调用了[NSValue valueWithPointer:&string]的写法。
至此,RACTupleUnpackingTrampoline这个类的作用也已明了,它是被作用设计出来用来实现神奇的RACTupleUnpack( ) 这个宏。
当然RACTupleUnpackingTrampoline这个类的setObject:forKeyedSubscript:函数也可以使用,只不过要注意写法,注意指针的类型,在NSValue里面包裹的是valueWithPointer,(nullable const void *)pointer类型的。
- RACTupleSequence
这个类仅仅只是名字里面带有Tuple而已,它其实是继承自RACSequence。
需要分析这个类的原因是因为RACTuple里面有一个拓展的属性rac_sequence。
- (RACSequence *)rac_sequence {
return [RACTupleSequence sequenceWithTupleBackingArray:self.backingArray offset:0];
}
还是先看看RACTupleSequence的定义。
@interface RACTupleSequence : RACSequence
@property (nonatomic, strong, readonly) NSArray *tupleBackingArray;
@property (nonatomic, assign, readonly) NSUInteger offset;
+ (instancetype)sequenceWithTupleBackingArray:(NSArray *)backingArray offset:(NSUInteger)offset;
@end
这个类是继承自RACSequence,而且只有这一个类方法。
tupleBackingArray是来自于RACTuple里面的backingArray。
+ (instancetype)sequenceWithTupleBackingArray:(NSArray *)backingArray offset:(NSUInteger)offset {
NSCParameterAssert(offset <= backingArray.count);
if (offset == backingArray.count) return self.empty;
RACTupleSequence *seq = [[self alloc] init];
seq->_tupleBackingArray = backingArray;
seq->_offset = offset;
return seq;
}
RACTupleSequence这个类的目的就是把Tuple转换成Sequence。Sequence里面的数组就是Tuple内部的backingArray。offset从0开始。
RACSequence底层实现分析
@interface RACSequence : RACStream
@property (nonatomic, strong, readonly) id head;
@property (nonatomic, strong, readonly) RACSequence *tail;
@property (nonatomic, copy, readonly) NSArray *array;
@property (nonatomic, copy, readonly) NSEnumerator *objectEnumerator;
@property (nonatomic, copy, readonly) RACSequence *eagerSequence;
@property (nonatomic, copy, readonly) RACSequence *lazySequence;
@end
RACSequence是RACStream的子类,主要是ReactiveCocoa里面的集合类。
先来说说关于RACSequence的一些概念。
RACSequence有两个很重要的属性就是head和tail。head是一个id,而tail又是一个RACSequence,这个定义有点递归的意味。
RACSequence *sequence = [RACSequence sequenceWithHeadBlock:^id{
return @(1);
} tailBlock:^RACSequence *{
return @[@2,@3,@4].rac_sequence;
}];
NSLog(@"sequence.head = %@ , sequence.tail = %@",sequence.head ,sequence.tail);
输出:
sequence.head = 1 , sequence.tail = { name = , array = (
2,
3,
4
) }
这段测试代码就道出了head和tail的定义。更加详细的描述见下图:

上述代码里面用到了RACSequence初始化的方法,具体的分析见后面。
objectEnumerator是一个快速枚举器。
@interface RACSequenceEnumerator : NSEnumerator
@property (nonatomic, strong) RACSequence *sequence;
@end
之所以需要实现这个,是为了更加方便的RACSequence进行遍历。
- (id)nextObject {
id object = nil;
@synchronized (self) {
object = self.sequence.head;
self.sequence = self.sequence.tail;
}
return object;
}
有了这个NSEnumerator,就可以从RACSequence的head一直遍历到tail。
- (NSEnumerator *)objectEnumerator {
RACSequenceEnumerator *enumerator = [[RACSequenceEnumerator alloc] init];
enumerator.sequence = self;
return enumerator;
}
回到RACSequence的定义里面的objectEnumerator,这里就是取出内部的RACSequenceEnumerator。
- (NSArray *)array {
NSMutableArray *array = [NSMutableArray array];
for (id obj in self) {
[array addObject:obj];
}
return [array copy];
}
RACSequence的定义里面还有一个array,这个数组就是返回一个NSArray,这个数组里面装满了RACSequence里面所有的对象。这里之所以能用for-in,是因为实现了NSFastEnumeration协议。至于for-in的效率,完全就看重写NSFastEnumeration协议里面countByEnumeratingWithState: objects: count: 方法里面的执行效率了。
在分析RACSequence的for-in执行效率之前,先回顾一下NSFastEnumerationState的定义,这里的属性在接下来的实现中会被大量使用。
typedef struct {
unsigned long state; //可以被自定义成任何有意义的变量
id __unsafe_unretained _Nullable * _Nullable itemsPtr; //返回对象数组的首地址
unsigned long * _Nullable mutationsPtr; //指向会随着集合变动而变化的一个值
unsigned long extra[5]; //可以被自定义成任何有意义的数组
} NSFastEnumerationState;
接下来要分析的这个函数的入参,stackbuf是为for-in提供的对象数组,len是该数组的长度。
- (NSUInteger)countByEnumeratingWithState:(NSFastEnumerationState *)state objects:(__unsafe_unretained id *)stackbuf count:(NSUInteger)len {
if (state->state == ULONG_MAX) {
// Enumeration has completed.
return 0;
}
// We need to traverse the sequence itself on repeated calls to this
// method, so use the 'state' field to track the current head.
RACSequence *(^getSequence)(void) = ^{
return (__bridge RACSequence *)(void *)state->state;
};
void (^setSequence)(RACSequence *) = ^(RACSequence *sequence) {
// Release the old sequence and retain the new one.
CFBridgingRelease((void *)state->state);
state->state = (unsigned long)CFBridgingRetain(sequence);
};
void (^complete)(void) = ^{
// Release any stored sequence.
setSequence(nil);
state->state = ULONG_MAX;
};
if (state->state == 0) {
// Since a sequence doesn't mutate, this just needs to be set to
// something non-NULL.
state->mutationsPtr = state->extra;
setSequence(self);
}
state->itemsPtr = stackbuf;
NSUInteger enumeratedCount = 0;
while (enumeratedCount < len) {
RACSequence *seq = getSequence();
// Because the objects in a sequence may be generated lazily, we want to
// prevent them from being released until the enumerator's used them.
__autoreleasing id obj = seq.head;
if (obj == nil) {
complete();
break;
}
stackbuf[enumeratedCount++] = obj;
if (seq.tail == nil) {
complete();
break;
}
setSequence(seq.tail);
}
return enumeratedCount;
}
整个遍历的过程类似递归的过程,从头到尾依次遍历一遍。
再来研究研究RACSequence的初始化:
+ (RACSequence *)sequenceWithHeadBlock:(id (^)(void))headBlock tailBlock:(RACSequence *(^)(void))tailBlock;
+ (RACSequence *)sequenceWithHeadBlock:(id (^)(void))headBlock tailBlock:(RACSequence *(^)(void))tailBlock {
return [[RACDynamicSequence sequenceWithHeadBlock:headBlock tailBlock:tailBlock] setNameWithFormat:@"+sequenceWithHeadBlock:tailBlock:"];
}
初始化RACSequence,会调用RACDynamicSequence。这里有点类比RACSignal的RACDynamicSignal。
再来看看RACDynamicSequence的定义。
@interface RACDynamicSequence () {
id _head;
RACSequence *_tail;
id _dependency;
}
@property (nonatomic, strong) id headBlock;
@property (nonatomic, strong) id tailBlock;
@property (nonatomic, assign) BOOL hasDependency;
@property (nonatomic, strong) id (^dependencyBlock)(void);
@end
这里需要说明的是此处的headBlock,tailBlock,dependencyBlock的修饰符都是用了strong,而不是copy。这里是一个很奇怪的bug导致的。在https://github.com/ReactiveCocoa/ReactiveCocoa/issues/505中详细记录了用copy关键字会导致内存泄露的bug。具体代码如下:
[[[@[@1,@2,@3,@4,@5] rac_sequence] filter:^BOOL(id value) {
return [value intValue] > 1;
}] array];
最终发现这个问题的人把copy改成strong就神奇的修复了这个bug。最终整个ReactiveCocoa库里面就只有这里把block的关键字从copy改成了strong,而不是所有的地方都改成strong。
所以日常我们写block的时候,没有特殊情况,依旧需要继续用copy进行修饰。
+ (RACSequence *)sequenceWithHeadBlock:(id (^)(void))headBlock tailBlock:(RACSequence *(^)(void))tailBlock {
NSCParameterAssert(headBlock != nil);
RACDynamicSequence *seq = [[RACDynamicSequence alloc] init];
seq.headBlock = [headBlock copy];
seq.tailBlock = [tailBlock copy];
seq.hasDependency = NO;
return seq;
}
hasDependency这个变量是代表是否有dependencyBlock。这个函数里面就只把headBlock和tailBlock保存起来了。
+ (RACSequence *)sequenceWithLazyDependency:(id (^)(void))dependencyBlock headBlock:(id (^)(id dependency))headBlock tailBlock:(RACSequence *(^)(id dependency))tailBlock {
NSCParameterAssert(dependencyBlock != nil);
NSCParameterAssert(headBlock != nil);
RACDynamicSequence *seq = [[RACDynamicSequence alloc] init];
seq.headBlock = [headBlock copy];
seq.tailBlock = [tailBlock copy];
seq.dependencyBlock = [dependencyBlock copy];
seq.hasDependency = YES;
return seq;
}
另外一个类方法sequenceWithLazyDependency: headBlock: tailBlock:是带有dependencyBlock的,这个方法里面会保存headBlock,tailBlock,dependencyBlock这3个block。
从RACSequence这两个唯一的初始化方法之间就引出了RACSequence两大核心问题之一,积极运算 和 惰性求值。
- 积极运算 和 惰性求值
在RACSequence的定义中还有两个RACSequence —— eagerSequence 和 lazySequence。这两个RACSequence就是分别对应着积极运算的RACSequence和惰性求值的RACSequence。
如果日常我们遇到了这种问题,就很浪费内存空间了。比如在内存里面开了一个100W大小的数组,结果实际只使用到100个数值。这个时候就需要用到惰性运算了。
在RACSequence里面这两种方式都支持,我们来看看底层源码是如何实现的。
先来看看平时我们很熟悉的情况——积极运算。
在RACSequence中积极运算的代表是RACSequence的一个子类RACArraySequence的子类——RACEagerSequence。它的积极运算表现在其bind函数上。
- (instancetype)bind:(RACStreamBindBlock (^)(void))block {
NSCParameterAssert(block != nil);
RACStreamBindBlock bindBlock = block();
NSArray *currentArray = self.array;
NSMutableArray *resultArray = [NSMutableArray arrayWithCapacity:currentArray.count];
for (id value in currentArray) {
BOOL stop = NO;
RACSequence *boundValue = (id)bindBlock(value, &stop);
if (boundValue == nil) break;
for (id x in boundValue) {
[resultArray addObject:x];
}
if (stop) break;
}
return [[self.class sequenceWithArray:resultArray offset:0] setNameWithFormat:@"[%@] -bind:", self.name];
}
从上述代码中能看到主要是进行了2层循环,最外层循环遍历的自己RACSequence中的值,然后拿到这个值传入闭包bindBlock( )中,返回一个RACSequence,最后用一个NSMutableArray依次把每个RACSequence里面的值都装起来。
第二个for-in循环是在遍历RACSequence,之所以可以用for-in的方式遍历就是因为实现了NSFastEnumeration协议,实现了countByEnumeratingWithState: objects: count: 方法,这个方法在上面详细分析过了,这里不再赘述。
这里就是一个积极运算的例子,在每次循环中都会把闭包block( )的值计算出来。值得说明的是,最后返回的RACSequence的类型是self.class类型的,即还是RACEagerSequence类型的。
再来看看RACSequence中的惰性求值是怎么实现的。
在RACSequence中,bind函数是下面这个样子:
- (instancetype)bind:(RACStreamBindBlock (^)(void))block {
RACStreamBindBlock bindBlock = block();
return [[self bind:bindBlock passingThroughValuesFromSequence:nil] setNameWithFormat:@"[%@] -bind:", self.name];
}
实际上调用了bind: passingThroughValuesFromSequence:方法,第二个入参传入nil。
- (instancetype)bind:(RACStreamBindBlock)bindBlock passingThroughValuesFromSequence:(RACSequence *)passthroughSequence {
__block RACSequence *valuesSeq = self;
__block RACSequence *current = passthroughSequence;
__block BOOL stop = NO;
RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
// 暂时省略
} headBlock:^(id _) {
return current.head;
} tailBlock:^ id (id _) {
if (stop) return nil;
return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
}];
sequence.name = self.name;
return sequence;
}
在bind: passingThroughValuesFromSequence:方法的实现中,就是用sequenceWithLazyDependency: headBlock: tailBlock:方法生成了一个RACSequence,并返回。在sequenceWithLazyDependency: headBlock: tailBlock:上面分析过源码,主要目的是为了保存3个闭包,headBlock,tailBlock,dependencyBlock。
通过调用RACSequence里面的bind操作,并没有执行3个闭包里面的值,只是保存起来了。这里就是惰性求值的表现——等到要用的时候才会计算。
通过上述源码的分析,可以写出如下的测试代码加深理解。
NSArray *array = @[@1,@2,@3,@4,@5];
RACSequence *lazySequence = [array.rac_sequence map:^id(id value) {
NSLog(@"lazySequence");
return @(101);
}];
RACSequence *eagerSequence = [array.rac_sequence.eagerSequence map:^id(id value) {
NSLog(@"eagerSequence");
return @(100);
}];
上述代码运行之后,会输出如下信息:
eagerSequence
eagerSequence
eagerSequence
eagerSequence
eagerSequence
只输出了5遍eagerSequence,lazySequence并没有输出。原因是因为bind闭包只在eagerSequence中真正被调用执行了,而在lazySequence中bind闭包仅仅只是被copy了。
那如何让lazySequence执行bind闭包呢?
[lazySequence array];
通过执行上述代码,就可以输出5遍“lazySequence”了。因为bind闭包再次会被调用执行。
积极运算 和 惰性求值在这里就区分出来了。在RACSequence中,除去RACEagerSequence只积极运算,其他的Sequence都是惰性求值的。
接下来再继续分析RACSequence是如何实现惰性求值的。
RACSequence *sequence = [RACDynamicSequence sequenceWithLazyDependency:^ id {
while (current.head == nil) {
if (stop) return nil;
// 遍历当前sequence,取出下一个值
id value = valuesSeq.head;
if (value == nil) {
// 遍历完sequence所有的值
stop = YES;
return nil;
}
current = (id)bindBlock(value, &stop);
if (current == nil) {
stop = YES;
return nil;
}
valuesSeq = valuesSeq.tail;
}
NSCAssert([current isKindOfClass:RACSequence.class], @"-bind: block returned an object that is not a sequence: %@", current);
return nil;
} headBlock:^(id _) {
return current.head;
} tailBlock:^ id (id _) {
if (stop) return nil;
return [valuesSeq bind:bindBlock passingThroughValuesFromSequence:current.tail];
}];
在bind操作中创建了这样一个lazySequence,3个block闭包保存了如何创建一个lazySequence的做法。
headBlock是入参为id,返回值也是一个id。在创建lazySequence的head的时候,并不关心入参,直接返回passthroughSequence的head。
tailBlock是入参为id,返回值为RACSequence。由于RACSequence的定义类似递归定义的,所以tailBlock会再次递归调用bind:passingThroughValuesFromSequence:产生一个RACSequence作为新的sequence的tail。
dependencyBlock的返回值是作为headBlock和tailBlock的入参。不过现在headBlock和tailBlock都不关心这个入参。那么dependencyBlock就是成为了headBlock和tailBlock闭包执行之前要执行的闭包。
dependencyBlock的目的是为了把原来的sequence里面的值,都进行一次变换。current是入参passthroughSequence,valuesSeq就是原sequence的引用。每次循环一次就取出原sequence的头,直到取不到为止,就是遍历完成。
取出valuesSeq的head,传入bindBlock( )闭包进行变换,返回值是一个current 的sequence。在每次headBlock和tailBlock之前都会调用这个dependencyBlock,变换后新的sequence的head就是current的head,新的sequence的tail就是递归调用传入的current.tail。
RACDynamicSequence创建的lazyDependency的过程就是保存了3个block的过程。那这些闭包什么时候会被调用呢?
- (id)head {
@synchronized (self) {
id untypedHeadBlock = self.headBlock;
if (untypedHeadBlock == nil) return _head;
if (self.hasDependency) {
if (self.dependencyBlock != nil) {
_dependency = self.dependencyBlock();
self.dependencyBlock = nil;
}
id (^headBlock)(id) = untypedHeadBlock;
_head = headBlock(_dependency);
} else {
id (^headBlock)(void) = untypedHeadBlock;
_head = headBlock();
}
self.headBlock = nil;
return _head;
}
}
上面的源码就是获取RACDynamicSequence中head的实现。当要取出sequence的head的时候,就会调用headBlock( )。如果保存了dependencyBlock闭包,在执行headBlock( )之前会先执行dependencyBlock( )进行一次变换。
- (RACSequence *)tail {
@synchronized (self) {
id untypedTailBlock = self.tailBlock;
if (untypedTailBlock == nil) return _tail;
if (self.hasDependency) {
if (self.dependencyBlock != nil) {
_dependency = self.dependencyBlock();
self.dependencyBlock = nil;
}
RACSequence * (^tailBlock)(id) = untypedTailBlock;
_tail = tailBlock(_dependency);
} else {
RACSequence * (^tailBlock)(void) = untypedTailBlock;
_tail = tailBlock();
}
if (_tail.name == nil) _tail.name = self.name;
self.tailBlock = nil;
return _tail;
}
}
获取RACDynamicSequence中tail的时候,和获取head是一样的,当需要取出tail的时候才会调用tailBlock( )。当有dependencyBlock闭包,会先执行dependencyBlock闭包,再调用tailBlock( )。
总结一下:
- RACSequence的惰性求值,除去RACEagerSequence的bind函数以外,其他所有的Sequence都是基于惰性求值的。只有到取出来运算之前才会去把相应的闭包执行一遍。
- 在RACSequence所有函数中,只有bind函数会传入dependencyBlock( )闭包,(RACEagerSequence会重写这个bind函数),所以看到dependencyBlock( )闭包一定可以推断出是RACSequence做了变换操作了。
- Pull-driver 和 Push-driver
在RACSequence中有一个方法可以让RACSequence和RACSignal进行关联上。
- (RACSignal *)signal {
return [[self signalWithScheduler:[RACScheduler scheduler]] setNameWithFormat:@"[%@] -signal", self.name];
}
- (RACSignal *)signalWithScheduler:(RACScheduler *)scheduler {
return [[RACSignal createSignal:^(id subscriber) {
__block RACSequence *sequence = self;
return [scheduler scheduleRecursiveBlock:^(void (^reschedule)(void)) {
if (sequence.head == nil) {
[subscriber sendCompleted];
return;
}
[subscriber sendNext:sequence.head];
sequence = sequence.tail;
reschedule();
}];
}] setNameWithFormat:@"[%@] -signalWithScheduler: %@", self.name, scheduler];
}
RACSequence中的signal方法会调用signalWithScheduler:方法。在signalWithScheduler:方法中会创建一个新的信号。这个新的信号的RACDisposable信号由scheduleRecursiveBlock:产生。
- (RACDisposable *)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock {
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
[self scheduleRecursiveBlock:[recursiveBlock copy] addingToDisposable:disposable];
return disposable;
}
- (void)scheduleRecursiveBlock:(RACSchedulerRecursiveBlock)recursiveBlock addingToDisposable:(RACCompoundDisposable *)disposable {
@autoreleasepool {
RACCompoundDisposable *selfDisposable = [RACCompoundDisposable compoundDisposable];
[disposable addDisposable:selfDisposable];
__weak RACDisposable *weakSelfDisposable = selfDisposable;
RACDisposable *schedulingDisposable = [self schedule:^{
@autoreleasepool {
// At this point, we've been invoked, so our disposable is now useless.
[disposable removeDisposable:weakSelfDisposable];
}
if (disposable.disposed) return;
void (^reallyReschedule)(void) = ^{
if (disposable.disposed) return;
[self scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable];
};
// Protects the variables below.
//
// This doesn't actually need to be __block qualified, but Clang
// complains otherwise. :C
__block NSLock *lock = [[NSLock alloc] init];
lock.name = [NSString stringWithFormat:@"%@ %s", self, sel_getName(_cmd)];
__block NSUInteger rescheduleCount = 0;
// Set to YES once synchronous execution has finished. Further
// rescheduling should occur immediately (rather than being
// flattened).
__block BOOL rescheduleImmediately = NO;
@autoreleasepool {
recursiveBlock(^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];
if (immediate) reallyReschedule();
});
}
[lock lock];
NSUInteger synchronousCount = rescheduleCount;
rescheduleImmediately = YES;
[lock unlock];
for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
}];
[selfDisposable addDisposable:schedulingDisposable];
}
}
这段代码虽然长,但是拆分分析一下:
__block NSUInteger rescheduleCount = 0;
// 一旦同步操作执行完成,rescheduleImmediately就应该被设为YES
__block BOOL rescheduleImmediately = NO;
rescheduleCount 是递归次数计数。rescheduleImmediately这个BOOL是决定是否立即执行reallyReschedule( )闭包。
recursiveBlock是入参,它实际是下面这段闭包代码:
{
if (sequence.head == nil) {
[subscriber sendCompleted];
return;
}
[subscriber sendNext:sequence.head];
sequence = sequence.tail;
reschedule();
}
recursiveBlock的入参是reschedule( )。执行完上面的代码之后开始执行入参reschedule( )的代码,入参reschedule( 闭包的代码是如下:
^{
[lock lock];
BOOL immediate = rescheduleImmediately;
if (!immediate) ++rescheduleCount;
[lock unlock];
if (immediate) reallyReschedule();
}
在这段block中会统计rescheduleCount,如果rescheduleImmediately为YES还会继续开始执行递归操作reallyReschedule( )。
for (NSUInteger i = 0; i < synchronousCount; i++) {
reallyReschedule();
}
最终会在这个循环里面递归调用reallyReschedule( )闭包。reallyReschedule( )闭包执行的操作就是再次执行scheduleRecursiveBlock:recursiveBlock addingToDisposable:disposable方法。
每次执行一次递归就会取出sequence的head值发送出来,直到sequence.head = = nil发送完成信号。
既然RACSequence也可以转换成RACSignal,那么就需要总结一下两者的异同点。
RACSequence 和 RACSignal 异同点对比:
- RACSequence除去RACEagerSequence,其他所有的都是基于惰性计算的,这和RACSignal是一样的。
- RACSequence是在时间上是连续的,一旦把RACSequence变成signal,再订阅,会立即把所有的值一口气都发送出来。RACSignal是在时间上是离散的,当有事件到来的时候,才会发送出数据流。
- RACSequence是Pull-driver,由订阅者来决定是否发送值,只要订阅者订阅了,就会发送数据流。RACSignal是Push-driver,它发送数据流是不由订阅者决定的,不管有没有订阅者,它有离散事件产生了,就会发送数据流。
- RACSequence发送的全是数据,RACSignal发送的全是事件。事件不仅仅包括数据,还包括事件的状态,比如说事件是否出错,事件是否完成。
RACSequence操作实现分析
RACSequence还有以下几个操作。
- (id)foldLeftWithStart:(id)start reduce:(id (^)(id accumulator, id value))reduce;
- (id)foldRightWithStart:(id)start reduce:(id (^)(id first, RACSequence *rest))reduce;
- (BOOL)any:(BOOL (^)(id value))block;
- (BOOL)all:(BOOL (^)(id value))block;
- (id)objectPassingTest:(BOOL (^)(id value))block;
1. foldLeftWithStart: reduce:
- (id)foldLeftWithStart:(id)start reduce:(id (^)(id, id))reduce {
NSCParameterAssert(reduce != NULL);
if (self.head == nil) return start;
for (id value in self) {
start = reduce(start, value);
}
return start;
}
这个函数传入了一个初始值start,然后依次循环执行reduce( ),循环之后,最终的值作为返回值返回。这个函数就是折叠函数,从左边折叠到右边。
2. foldRightWithStart: reduce:
- (id)foldRightWithStart:(id)start reduce:(id (^)(id, RACSequence *))reduce {
NSCParameterAssert(reduce != NULL);
if (self.head == nil) return start;
RACSequence *rest = [RACSequence sequenceWithHeadBlock:^{
return [self.tail foldRightWithStart:start reduce:reduce];
} tailBlock:nil];
return reduce(self.head, rest);
}
这个函数和上一个foldLeftWithStart: reduce:是一样的,只不过方向是从右往左。
3. objectPassingTest:
- (id)objectPassingTest:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
return [self filter:block].head;
}
objectPassingTest:里面会调用RACStream中的filter:函数,这个函数在前几篇文章分析过了。如果block(value)为YES,就代表通过了Test,那么就会返回value的sequence。取出head返回。
4. any:
- (BOOL)any:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
return [self objectPassingTest:block] != nil;
}
any:会调用objectPassingTest:函数,如果不为nil就代表有value值通过了Test,有通过了value的就返回YES,反之返回NO。
5. all:
- (BOOL)all:(BOOL (^)(id))block {
NSCParameterAssert(block != NULL);
NSNumber *result = [self foldLeftWithStart:@YES reduce:^(NSNumber *accumulator, id value) {
return @(accumulator.boolValue && block(value));
}];
return result.boolValue;
}
all:会从左往右依次对每个值进行block( ) Test,然后每个值依次进行&&操作。
6. concat:
- (instancetype)concat:(RACStream *)stream {
NSCParameterAssert(stream != nil);
return [[[RACArraySequence sequenceWithArray:@[ self, stream ] offset:0]
flatten]
setNameWithFormat:@"[%@] -concat: %@", self.name, stream];
}
concat:的操作和RACSignal的作用是一样的。它会把原sequence和入参stream连接到一起,组合成一个高阶sequence,最后调用flatten“拍扁”。关于flatten的实现见前几篇RACStream里面的flatten实现分析。
7. zipWith:
- (instancetype)zipWith:(RACSequence *)sequence {
NSCParameterAssert(sequence != nil);
return [[RACSequence
sequenceWithHeadBlock:^ id {
if (self.head == nil || sequence.head == nil) return nil;
return RACTuplePack(self.head, sequence.head);
} tailBlock:^ id {
if (self.tail == nil || [[RACSequence empty] isEqual:self.tail]) return nil;
if (sequence.tail == nil || [[RACSequence empty] isEqual:sequence.tail]) return nil;
return [self.tail zipWith:sequence.tail];
}]
setNameWithFormat:@"[%@] -zipWith: %@", self.name, sequence];
}
由于sequence的定义是递归形式的,所以zipWith:也是递归来进行的。zipWith:新的sequence的head是原来2个sequence的head组合成RACTuplePack。新的sequence的tail是原来2个sequence的tail递归调用zipWith:。
RACSequence的一些扩展

关于RACSequence有以下9个子类,其中RACEagerSequence是继承自RACArraySequence。这些子类看名字就知道sequence里面装的是什么类型的数据。RACUnarySequence里面装的是单元sequence。它只有head值,没有tail值。

RACSequenceAdditions 总共有7个Category。这7个Category分别对iOS 里面的集合类进行了RACSequence的扩展,使我们能更加方便的使用RACSequence。
1. NSArray+RACSequenceAdditions
@interface NSArray (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@end
这个Category能把任意一个NSArray数组转换成RACSequence。
- (RACSequence *)rac_sequence {
return [RACArraySequence sequenceWithArray:self offset:0];
}
根据NSArray创建一个RACArraySequence并返回。
2. NSDictionary+RACSequenceAdditions
@interface NSDictionary (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@property (nonatomic, copy, readonly) RACSequence *rac_keySequence;
@property (nonatomic, copy, readonly) RACSequence *rac_valueSequence;
@end
这个Category能把任意一个NSDictionary字典转换成RACSequence。
- (RACSequence *)rac_sequence {
NSDictionary *immutableDict = [self copy];
return [immutableDict.allKeys.rac_sequence map:^(id key) {
id value = immutableDict[key];
return RACTuplePack(key, value);
}];
}
- (RACSequence *)rac_keySequence {
return self.allKeys.rac_sequence;
}
- (RACSequence *)rac_valueSequence {
return self.allValues.rac_sequence;
}
rac_sequence会把字典都转化为一个装满RACTuplePack的RACSequence,在这个RACSequence中,第一个位置是key,第二个位置是value。
rac_keySequence是装满所有key的RACSequence。
rac_valueSequence是装满所有value的RACSequence。
3. NSEnumerator+RACSequenceAdditions
@interface NSEnumerator (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@end
这个Category能把任意一个NSEnumerator转换成RACSequence。
- (RACSequence *)rac_sequence {
return [RACSequence sequenceWithHeadBlock:^{
return [self nextObject];
} tailBlock:^{
return self.rac_sequence;
}];
}
返回的RACSequence的head是当前的sequence的head,tail就是当前的sequence。
4. NSIndexSet+RACSequenceAdditions
@interface NSIndexSet (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@end
这个Category能把任意一个NSIndexSet转换成RACSequence。
- (RACSequence *)rac_sequence {
return [RACIndexSetSequence sequenceWithIndexSet:self];
}
+ (instancetype)sequenceWithIndexSet:(NSIndexSet *)indexSet {
NSUInteger count = indexSet.count;
if (count == 0) return self.empty;
NSUInteger sizeInBytes = sizeof(NSUInteger) * count;
NSMutableData *data = [[NSMutableData alloc] initWithCapacity:sizeInBytes];
[indexSet getIndexes:data.mutableBytes maxCount:count inIndexRange:NULL];
RACIndexSetSequence *seq = [[self alloc] init];
seq->_data = data;
seq->_indexes = data.bytes;
seq->_count = count;
return seq;
}
返回RACIndexSetSequence,在这个IndexSetSequence中,data里面装的NSData,indexes里面装的NSUInteger,count里面装的是index的总数。
5. NSOrderedSet+RACSequenceAdditions
@interface NSOrderedSet (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@end
这个Category能把任意一个NSOrderedSet转换成RACSequence。
- (RACSequence *)rac_sequence {
return self.array.rac_sequence;
}
返回的NSOrderedSet中的数组转换成sequence。
6. NSSet+RACSequenceAdditions
@interface NSSet (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@end
这个Category能把任意一个NSSet转换成RACSequence。
- (RACSequence *)rac_sequence {
return self.allObjects.rac_sequence;
}
根据NSSet的allObjects数组创建一个RACArraySequence并返回。
7. NSString+RACSequenceAdditions
@interface NSString (RACSequenceAdditions)
@property (nonatomic, copy, readonly) RACSequence *rac_sequence;
@end
这个Category能把任意一个NSString转换成RACSequence。
- (RACSequence *)rac_sequence {
return [RACStringSequence sequenceWithString:self offset:0];
}
返回的是一个装满string字符的数组对应的sequence。
冷信号和热信号
关于ReactiveCocoa v2.5中冷信号和热信号的文章中,最著名的就是美团的臧成威老师写的3篇冷热信号的文章:
细说ReactiveCocoa的冷信号与热信号(二):为什么要区分冷热信号
细说ReactiveCocoa的冷信号与热信号(三):怎么处理冷信号与热信号
由于最近在写关于RACSignal底层实现分析的文章,当然也逃不了关于冷热信号操作的分析。这篇文章打算分析分析如何从冷信号转成热信号的底层实现。
关于冷信号和热信号的概念
冷热信号的概念是源自于源于.NET框架Reactive Extensions(RX)中的Hot Observable和Cold Observable,
Hot Observable是主动的,尽管你并没有订阅事件,但是它会时刻推送,就像鼠标移动;而Cold Observable是被动的,只有当你订阅的时候,它才会发布消息。
Hot Observable可以有多个订阅者,是一对多,集合可以与订阅者共享信息;而Cold Observable只能一对一,当有不同的订阅者,消息是重新完整发送。
在这篇文章细说ReactiveCocoa的冷信号与热信号(一)详细分析了冷热信号的特点:
热信号是主动的,即使你没有订阅事件,它仍然会时刻推送。而冷信号是被动的,只有当你订阅的时候,它才会发送消息。
热信号可以有多个订阅者,是一对多,信号可以与订阅者共享信息。而冷信号只能一对一,当有不同的订阅者,消息会从新完整发送。
RACSignal热信号
RACSignal家族中符合热信号的特点的信号有以下几个。
1.RACSubject
@interface RACSubject : RACSignal
@property (nonatomic, strong, readonly) NSMutableArray *subscribers;
@property (nonatomic, strong, readonly) RACCompoundDisposable *disposable;
- (void)enumerateSubscribersUsingBlock:(void (^)(id subscriber))block;
+ (instancetype)subject;
@end
首先来看看RACSubject的定义。
RACSubject是继承自RACSignal,并且它还遵守RACSubscriber协议。这就意味着它既能订阅信号,也能发送信号。
在RACSubject里面有一个NSMutableArray数组,里面装着该信号的所有订阅者。其次还有一个RACCompoundDisposable信号,里面装着该信号所有订阅者的RACDisposable。
RACSubject之所以能称之为热信号,那么它肯定是符合上述热信号的定义的。让我们从它的实现来看看它是如何符合的。
- (RACDisposable *)subscribe:(id)subscriber {
NSCParameterAssert(subscriber != nil);
RACCompoundDisposable *disposable = [RACCompoundDisposable compoundDisposable];
subscriber = [[RACPassthroughSubscriber alloc] initWithSubscriber:subscriber signal:self disposable:disposable];
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
return [RACDisposable disposableWithBlock:^{
@synchronized (subscribers) {
NSUInteger index = [subscribers indexOfObjectWithOptions:NSEnumerationReverse passingTest:^ BOOL (id obj, NSUInteger index, BOOL *stop) {
return obj == subscriber;
}];
if (index != NSNotFound) [subscribers removeObjectAtIndex:index];
}
}];
}
上面是RACSubject的实现,它和RACSignal最大的不同在这两行
NSMutableArray *subscribers = self.subscribers;
@synchronized (subscribers) {
[subscribers addObject:subscriber];
}
RACSubject 把它的所有订阅者全部都保存到了NSMutableArray的数组里。既然保存了所有的订阅者,那么sendNext,sendError,sendCompleted就需要发生改变。
- (void)sendNext:(id)value {
[self enumerateSubscribersUsingBlock:^(id subscriber) {
[subscriber sendNext:value];
}];
}
- (void)sendError:(NSError *)error {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id subscriber) {
[subscriber sendError:error];
}];
}
- (void)sendCompleted {
[self.disposable dispose];
[self enumerateSubscribersUsingBlock:^(id subscriber) {
[subscriber sendCompleted];
}];
}
从源码可以看到,RACSubject中的sendNext,sendError,sendCompleted都会执行enumerateSubscribersUsingBlock:方法。
- (void)enumerateSubscribersUsingBlock:(void (^)(id subscriber))block {
NSArray *subscribers;
@synchronized (self.subscribers) {
subscribers = [self.subscribers copy];
}
for (id subscriber in subscribers) {
block(subscriber);
}
}
enumerateSubscribersUsingBlock:方法会取出所有RACSubject的订阅者,依次调用入参的block( )方法。
关于RACSubject的订阅和发送的流程可以参考第一篇文章,大体一致,其他的不同就是会依次对自己的订阅者发送信号。
RACSubject就满足了热信号的特点,它即使没有订阅者,因为自己继承了RACSubscriber协议,所以自己本身就可以发送信号。冷信号只能被订阅了才能发送信号。
RACSubject可以有很多订阅者,它也会把这些订阅者都保存到自己的数组里。RACSubject之后再发送信号,订阅者就如同一起看电视,播放过的节目就看不到了,发送过的信号也接收不到了。接收信号。而RACSignal发送信号,订阅者接收信号都只能从头开始接受,如同看点播节目,每次看都从头开始看。
2. RACGroupedSignal
@interface RACGroupedSignal : RACSubject
@property (nonatomic, readonly, copy) id key;
+ (instancetype)signalWithKey:(id)key;
@end
先看看RACGroupedSignal的定义。
RACGroupedSignal是在RACsignal这个方法里面被用到的。
- (RACSignal *)groupBy:(id (^)(id object))keyBlock transform:(id (^)(id object))transformBlock
在这个方法里面,sendNext里面最后里面是由RACGroupedSignal发送信号。
[groupSubject sendNext:transformBlock != NULL ? transformBlock(x) : x];
3. RACBehaviorSubject
@interface RACBehaviorSubject : RACSubject
@property (nonatomic, strong) id currentValue;
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value;
@end
这个信号里面存储了一个对象currentValue,这里存储着这个信号的最新的值。
当然也可以调用类方法behaviorSubjectWithDefaultValue
+ (instancetype)behaviorSubjectWithDefaultValue:(id)value {
RACBehaviorSubject *subject = [self subject];
subject.currentValue = value;
return subject;
}
在这个方法里面存储默认的值,如果RACBehaviorSubject没有接受到任何值,那么这个信号就会发送这个默认的值。
当RACBehaviorSubject被订阅:
- (RACDisposable *)subscribe:(id)subscriber {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
[subscriber sendNext:self.currentValue];
}
}];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
[schedulingDisposable dispose];
}];
}
sendNext里面会始终发送存储的currentValue值。调用sendNext会调用RACSubject里面的sendNext,也会依次发送信号值给订阅数组里面每个订阅者。
当RACBehaviorSubject向订阅者sendNext的时候:
- (void)sendNext:(id)value {
@synchronized (self) {
self.currentValue = value;
[super sendNext:value];
}
}
RACBehaviorSubject会把发送的值更新到currentValue里面。下次发送值就会发送最后更新的值。
4. RACReplaySubject
const NSUInteger RACReplaySubjectUnlimitedCapacity = NSUIntegerMax;
@interface RACReplaySubject : RACSubject
@property (nonatomic, assign, readonly) NSUInteger capacity;
@property (nonatomic, strong, readonly) NSMutableArray *valuesReceived;
@property (nonatomic, assign) BOOL hasCompleted;
@property (nonatomic, assign) BOOL hasError;
@property (nonatomic, strong) NSError *error;
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity;
@end
RACReplaySubject中会存储RACReplaySubjectUnlimitedCapacity大小的历史值。
+ (instancetype)replaySubjectWithCapacity:(NSUInteger)capacity {
return [(RACReplaySubject *)[self alloc] initWithCapacity:capacity];
}
- (instancetype)init {
return [self initWithCapacity:RACReplaySubjectUnlimitedCapacity];
}
- (instancetype)initWithCapacity:(NSUInteger)capacity {
self = [super init];
if (self == nil) return nil;
_capacity = capacity;
_valuesReceived = (capacity == RACReplaySubjectUnlimitedCapacity ? [NSMutableArray array] : [NSMutableArray arrayWithCapacity:capacity]);
return self;
}
在RACReplaySubject初始化中会初始化一个capacity大小的数组。
- (RACDisposable *)subscribe:(id)subscriber {
RACCompoundDisposable *compoundDisposable = [RACCompoundDisposable compoundDisposable];
RACDisposable *schedulingDisposable = [RACScheduler.subscriptionScheduler schedule:^{
@synchronized (self) {
for (id value in self.valuesReceived) {
if (compoundDisposable.disposed) return;
[subscriber sendNext:(value == RACTupleNil.tupleNil ? nil : value)];
}
if (compoundDisposable.disposed) return;
if (self.hasCompleted) {
[subscriber sendCompleted];
} else if (self.hasError) {
[subscriber sendError:self.error];
} else {
RACDisposable *subscriptionDisposable = [super subscribe:subscriber];
[compoundDisposable addDisposable:subscriptionDisposable];
}
}
}];
[compoundDisposable addDisposable:schedulingDisposable];
return compoundDisposable;
}
当RACReplaySubject被订阅的时候,会把valuesReceived数组里面的值都发送出去。
- (void)sendNext:(id)value {
@synchronized (self) {
[self.valuesReceived addObject:value ?: RACTupleNil.tupleNil];
[super sendNext:value];
if (self.capacity != RACReplaySubjectUnlimitedCapacity && self.valuesReceived.count > self.capacity) {
[self.valuesReceived removeObjectsInRange:NSMakeRange(0, self.valuesReceived.count - self.capacity)];
}
}
}
在sendNext中,valuesReceived会保存每次接收到的值。调用super的sendNext,会依次把值都发送到每个订阅者中。
这里还会判断数组里面存储了多少个值。如果存储的值的个数大于了capacity,那么要移除掉数组里面从0开始的前几个值,保证数组里面只装capacity个数的值。
RACReplaySubject 和 RACSubject 的区别在于,RACReplaySubject还会把历史的信号值都存储起来发送给订阅者。这一点,RACReplaySubject更像是RACSingnal 和 RACSubject 的合体版。RACSignal是冷信号,一旦被订阅就会向订阅者发送所有的值,这一点RACReplaySubject和RACSignal是一样的。但是RACReplaySubject又有着RACSubject的特性,会把所有的值发送给多个订阅者。当RACReplaySubject发送完之前存储的历史值之后,之后再发送信号的行为就和RACSubject完全一致了。
RACSignal冷信号
1.RACEmptySignal
@interface RACEmptySignal : RACSignal
+ (RACSignal *)empty;
@end
这个信号只有一个empty方法。
+ (RACSignal *)empty {
#ifdef DEBUG
return [[[self alloc] init] setNameWithFormat:@"+empty"];
#else
static id singleton;
static dispatch_once_t pred;
dispatch_once(&pred, ^{
singleton = [[self alloc] init];
});
return singleton;
#endif
}
在debug模式下,返回一个名字叫empty的信号。在release模式下,返回一个单例的empty信号。
- (RACDisposable *)subscribe:(id)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendCompleted];
}];
}
RACEmptySignal信号一旦被订阅就会发送sendCompleted。
2. RACReturnSignal
@interface RACReturnSignal : RACSignal
@property (nonatomic, strong, readonly) id value;
+ (RACSignal *)return:(id)value;
@end
RACReturnSignal信号的定义也很简单,直接根据value的值返回一个RACSignal。
+ (RACSignal *)return:(id)value {
#ifndef DEBUG
if (value == RACUnit.defaultUnit) {
static RACReturnSignal *unitSingleton;
static dispatch_once_t unitPred;
dispatch_once(&unitPred, ^{
unitSingleton = [[self alloc] init];
unitSingleton->_value = RACUnit.defaultUnit;
});
return unitSingleton;
} else if (value == nil) {
static RACReturnSignal *nilSingleton;
static dispatch_once_t nilPred;
dispatch_once(&nilPred, ^{
nilSingleton = [[self alloc] init];
nilSingleton->_value = nil;
});
return nilSingleton;
}
#endif
RACReturnSignal *signal = [[self alloc] init];
signal->_value = value;
#ifdef DEBUG
[signal setNameWithFormat:@"+return: %@", value];
#endif
return signal;
}
在debug模式下直接新建一个RACReturnSignal信号里面的值存储的是入参value。在release模式下,会依照value的值是否是空,来新建对应的单例RACReturnSignal。
- (RACDisposable *)subscribe:(id)subscriber {
NSCParameterAssert(subscriber != nil);
return [RACScheduler.subscriptionScheduler schedule:^{
[subscriber sendNext:self.value];
[subscriber sendCompleted];
}];
}
RACReturnSignal在被订阅的时候,就只会发送一个value值的信号,发送完毕之后就sendCompleted。
3. RACDynamicSignal
这个信号是创建RACSignal createSignal:的真身。
4. RACErrorSignal
@interface RACErrorSignal : RACSignal
@property (nonatomic, strong, readonly) NSError *error;
+ (RACSignal *)error:(NSError *)error;
@end
RACErrorSignal信号里面就存储了一个NSError。
+ (RACSignal *)error:(NSError *)error {
RACErrorSignal *signal = [[self alloc] init];
signal->_error = error;
#ifdef DEBUG
[signal setNameWithFormat:@"+error: %@", error];
#else
signal.name = @"+error:";
#endif
return signal;
}
RACErrorSignal初始化的时候把外界传进来的Error保存起来。当被订阅的时候就发送这个Error出去。
5. RACChannelTerminal
@interface RACChannelTerminal : RACSignal
- (id)init __attribute__((unavailable("Instantiate a RACChannel instead")));
@property (nonatomic, strong, readonly) RACSignal *values;
@property (nonatomic, strong, readonly) id otherTerminal;
- (id)initWithValues:(RACSignal *)values otherTerminal:(id)otherTerminal;
@end
RACChannelTerminal在RAC日常开发中,用来双向绑定的。它和RACSubject一样,既继承自RACSignal,同样又遵守RACSubscriber协议。虽然具有RACSubject的发送和接收信号的特性,但是它依旧是冷信号,因为它无法一对多,它发送信号还是只能一对一。
RACChannelTerminal无法手动初始化,需要靠RACChannel去初始化。
- (id)init {
self = [super init];
if (self == nil) return nil;
RACReplaySubject *leadingSubject = [[RACReplaySubject replaySubjectWithCapacity:0] setNameWithFormat:@"leadingSubject"];
RACReplaySubject *followingSubject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"followingSubject"];
[[leadingSubject ignoreValues] subscribe:followingSubject];
[[followingSubject ignoreValues] subscribe:leadingSubject];
_leadingTerminal = [[[RACChannelTerminal alloc] initWithValues:leadingSubject otherTerminal:followingSubject] setNameWithFormat:@"leadingTerminal"];
_followingTerminal = [[[RACChannelTerminal alloc] initWithValues:followingSubject otherTerminal:leadingSubject] setNameWithFormat:@"followingTerminal"];
return self;
}
在RACChannel的初始化中会调用RACChannelTerminal的initWithValues:方法,这里的入参都是RACReplaySubject类型的。所以订阅RACChannelTerminal过程的时候:
- (RACDisposable *)subscribe:(id)subscriber {
return [self.values subscribe:subscriber];
}
self.values其实就是一个RACReplaySubject,就相当于订阅RACReplaySubject。订阅过程同上面RACReplaySubject的订阅过程。
- (void)sendNext:(id)value {
[self.otherTerminal sendNext:value];
}
- (void)sendError:(NSError *)error {
[self.otherTerminal sendError:error];
}
- (void)sendCompleted {
[self.otherTerminal sendCompleted];
}
self.otherTerminal也是RACReplaySubject类型的,RACChannelTerminal管道两边都是RACReplaySubject类型的信号。当RACChannelTerminal开始sendNext,sendError,sendCompleted是调用的管道另外一个的RACReplaySubject进行这些对应的操作的。
平时使用RACChannelTerminal的地方在View和ViewModel的双向绑定上面。
例如在登录界面,输入密码文本框TextField和ViewModel的Password双向绑定
RACChannelTerminal *passwordTerminal = [_passwordTextField rac_newTextChannel];
RACChannelTerminal *viewModelPasswordTerminal = RACChannelTo(_viewModel, password);
[viewModelPasswordTerminal subscribe:passwordTerminal];
[passwordTerminal subscribe:viewModelPasswordTerminal];
双向绑定的两个信号都会因为对方的改变而收到新的信号。
至此所有的RACSignal的分类就都理顺了,按照冷信号和热信号的分类也分好了。
冷信号是如何转换成热信号的
在ReactiveCocoa v2.5中,冷信号转换成热信号需要用到RACMulticastConnection 这个类。
@interface RACMulticastConnection : NSObject
@property (nonatomic, strong, readonly) RACSignal *signal;
- (RACDisposable *)connect;
- (RACSignal *)autoconnect;
@end
@interface RACMulticastConnection () {
RACSubject *_signal;
int32_t volatile _hasConnected;
}
@property (nonatomic, readonly, strong) RACSignal *sourceSignal;
@property (strong) RACSerialDisposable *serialDisposable;
@end
看看RACMulticastConnection类的定义。最主要的是保存了两个信号,一个是RACSubject,一个是sourceSignal(RACSignal类型)。在.h中暴露给外面的是RACSignal,在.m中实际使用的是RACSubject。看它的定义就能猜到接下去它会做什么:用sourceSignal去发送信号,内部再用RACSubject去订阅sourceSignal,然后RACSubject会把sourceSignal的信号值依次发给它的订阅者们。
在看看RACMulticastConnection的初始化
- (id)initWithSourceSignal:(RACSignal *)source subject:(RACSubject *)subject {
NSCParameterAssert(source != nil);
NSCParameterAssert(subject != nil);
self = [super init];
if (self == nil) return nil;
_sourceSignal = source;
_serialDisposable = [[RACSerialDisposable alloc] init];
_signal = subject;
return self;
}
初始化方法就是把外界传进来的RACSignal保存成sourceSignal,把外界传进来的RACSubject保存成自己的signal属性。
RACMulticastConnection有两个连接方法。
- (RACDisposable *)connect {
BOOL shouldConnect = OSAtomicCompareAndSwap32Barrier(0, 1, &_hasConnected);
if (shouldConnect) {
self.serialDisposable.disposable = [self.sourceSignal subscribe:_signal];
}
return self.serialDisposable;
}
这里出现了一个不多见的函数OSAtomicCompareAndSwap32Barrier,它是原子运算的操作符,主要用于Compare and swap,原型如下:
bool OSAtomicCompareAndSwap32Barrier( int32_t __oldValue, int32_t __newValue, volatile int32_t *__theValue );
关键字volatile只确保每次获取volatile变量时都是从内存加载变量,而不是使用寄存器里面的值,但是它不保证代码访问变量是正确的。
如果用伪代码去实现这个函数:
f (*__theValue == __oldValue) {
*__theValue = __newValue;
return 1;
} else {
return 0;
}
如果_hasConnected为0,意味着没有连接,OSAtomicCompareAndSwap32Barrier返回1,shouldConnect就应该连接。如果_hasConnected为1,意味着已经连接过了,OSAtomicCompareAndSwap32Barrier返回0,shouldConnect不会再次连接。
所谓连接的过程就是RACMulticastConnection内部用RACSubject订阅self.sourceSignal。sourceSignal是RACSignal,会把订阅者RACSubject保存到RACPassthroughSubscriber中,sendNext的时候就会调用RACSubject sendNext,这时就会把sourceSignal的信号都发送给各个订阅者了。
- (RACSignal *)autoconnect {
__block volatile int32_t subscriberCount = 0;
return [[RACSignal
createSignal:^(id subscriber) {
OSAtomicIncrement32Barrier(&subscriberCount);
RACDisposable *subscriptionDisposable = [self.signal subscribe:subscriber];
RACDisposable *connectionDisposable = [self connect];
return [RACDisposable disposableWithBlock:^{
[subscriptionDisposable dispose];
if (OSAtomicDecrement32Barrier(&subscriberCount) == 0) {
[connectionDisposable dispose];
}
}];
}]
setNameWithFormat:@"[%@] -autoconnect", self.signal.name];
}
OSAtomicIncrement32Barrier 和 OSAtomicDecrement32Barrier也是原子运算的操作符,分别是+1和-1操作。在autoconnect为了保证线程安全,用到了一个subscriberCount的类似信号量的volatile变量,保证第一个订阅者能连接上。返回的新的信号的订阅者订阅RACSubject,RACSubject也会去订阅内部的sourceSignal。
把冷信号转换成热信号用以下5种方式,5种方法都会用到RACMulticastConnection。接下来一一分析它们的具体实现。
1. multicast:
- (RACMulticastConnection *)multicast:(RACSubject *)subject {
[subject setNameWithFormat:@"[%@] -multicast: %@", self.name, subject.name];
RACMulticastConnection *connection = [[RACMulticastConnection alloc] initWithSourceSignal:self subject:subject];
return connection;
}
multicast:的操作就是初始化一个RACMulticastConnection对象,SourceSignal是self,内部的RACSubject是入参subject。
RACMulticastConnection *connection = [signal multicast:[RACSubject subject]];
[connection.signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[connection connect];
调用 multicast:把冷信号转换成热信号有一个点不方便的是,需要自己手动connect。注意转换完之后的热信号在RACMulticastConnection的signal属性中,所以需要订阅的是connection.signal。
2. publish
- (RACMulticastConnection *)publish {
RACSubject *subject = [[RACSubject subject] setNameWithFormat:@"[%@] -publish", self.name];
RACMulticastConnection *connection = [self multicast:subject];
return connection;
}
publish方法只不过是去调用了multicast:方法,publish内部会新建好一个RACSubject,并把它当成入参传递给RACMulticastConnection。
RACMulticastConnection *connection = [signal publish];
[connection.signal subscribeNext:^(id x) {
NSLog(@"%@",x);
}];
[connection connect];
同样publish方法也需要手动的调用connect方法。
3. replay
- (RACSignal *)replay {
RACReplaySubject *subject = [[RACReplaySubject subject] setNameWithFormat:@"[%@] -replay", self.name];
RACMulticastConnection *connection = [self multicast:subject];
[connection connect];
return connection.signal;
}
replay方法会把RACReplaySubject当成RACMulticastConnection的RACSubject传递进去,初始化好了RACMulticastConnection,再自动调用connect方法,返回的信号就是转换好的热信号,即RACMulticastConnection里面的RACSubject信号。
这里必须是RACReplaySubject,因为在replay方法里面先connect了。如果用RACSubject,那信号在connect之后就会通过RACSubject把原信号发送给各个订阅者了。用RACReplaySubject把信号保存起来,即使replay方法里面先connect,订阅者后订阅也是可以拿到之前的信号值的。
4. replayLast
- (RACSignal *)replayLast {
RACReplaySubject *subject = [[RACReplaySubject replaySubjectWithCapacity:1] setNameWithFormat:@"[%@] -replayLast", self.name];
RACMulticastConnection *connection = [self multicast:subject];
[connection connect];
return connection.signal;
}
replayLast 和 replay的实现基本一样,唯一的不同就是传入的RACReplaySubject的Capacity是1,意味着只能保存最新的值。所以使用replayLast,订阅之后就只能拿到原信号最新的值。
5. replayLazily
- (RACSignal *)replayLazily {
RACMulticastConnection *connection = [self multicast:[RACReplaySubject subject]];
return [[RACSignal
defer:^{
[connection connect];
return connection.signal;
}]
setNameWithFormat:@"[%@] -replayLazily", self.name];
}
replayLazily 的实现也和 replayLast、replay实现很相似。只不过把connect放到了defer的操作里面去了。
defer操作的实现如下:
+ (RACSignal *)defer:(RACSignal * (^)(void))block {
NSCParameterAssert(block != NULL);
return [[RACSignal createSignal:^(id subscriber) {
return [block() subscribe:subscriber];
}] setNameWithFormat:@"+defer:"];
}
defer 单词的字面意思是延迟的。也和这个函数实现的效果是一致的。只有当defer返回的新信号被订阅的时候,才会执行入参block( )闭包。订阅者会订阅这个block( )闭包的返回值RACSignal。
block( )闭包被延迟创建RACSignal了,这就是defer。如果block( )闭包含有和时间有关的操作,或者副作用,想要延迟执行,就可以用defer。
还有一个类似的操作,then
- (RACSignal *)then:(RACSignal * (^)(void))block {
NSCParameterAssert(block != nil);
return [[[self
ignoreValues]
concat:[RACSignal defer:block]]
setNameWithFormat:@"[%@] -then:", self.name];
}
then的操作也是延迟,只不过它是把block( )闭包延迟到原信号发送complete之后。通过then信号变化得到的新的信号,在原信号发送值的期间的时间内,都不会发送任何值,因为ignoreValues了,一旦原信号sendComplete之后,就紧接着block( )闭包产生的信号。
回到replayLazily操作上来,作用同样是把冷信号转换成热信号,只不过sourceSignal是在返回的新信号第一次被订阅的时候才被订阅。原因就是defer延迟了block( )闭包的执行了。
RACCommand底层实现分析
RACCommand的定义
首先说说RACCommand的作用。 RACCommand 在ReactiveCocoa 中是对一个动作的触发条件以及它产生的触发事件的封装。
-
触发条件:初始化RACCommand的入参enabledSignal就决定了RACCommand是否能开始执行。入参enabledSignal就是触发条件。举个例子,一个按钮是否能点击,是否能触发点击事情,就由入参enabledSignal决定。
-
触发事件:初始化RACCommand的另外一个入参(RACSignal * (^)(id input))signalBlock就是对触发事件的封装。RACCommand每次执行都会调用一次signalBlock闭包。
RACCommand最常见的例子就是在注册登录的时候,点击获取验证码的按钮,这个按钮的点击事件和触发条件就可以用RACCommand来封装,触发条件是一个信号,它可以是验证手机号,验证邮箱,验证身份证等一些验证条件产生的enabledSignal。触发事件就是按钮点击之后执行的事件,可以是发送验证码的网络请求。
RACCommand在ReactiveCocoa中算是很特别的一种存在,因为它的实现并不是FRP实现的,是OOP实现的。RACCommand的本质就是一个对象,在这个对象里面封装了4个信号。
关于RACCommand的定义如下:
@interface RACCommand : NSObject
@property (nonatomic, strong, readonly) RACSignal *executionSignals;
@property (nonatomic, strong, readonly) RACSignal *executing;
@property (nonatomic, strong, readonly) RACSignal *enabled;
@property (nonatomic, strong, readonly) RACSignal *errors;
@property (atomic, assign) BOOL allowsConcurrentExecution;
volatile uint32_t _allowsConcurrentExecution;
@property (atomic, copy, readonly) NSArray *activeExecutionSignals;
NSMutableArray *_activeExecutionSignals;
@property (nonatomic, strong, readonly) RACSignal *immediateEnabled;
@property (nonatomic, copy, readonly) RACSignal * (^signalBlock)(id input);
@end
RACCommand中4个最重要的信号就是定义开头的那4个信号,executionSignals,executing,enabled,errors。需要注意的是,这4个信号基本都是(并不是完全是)在主线程上执行的。
- RACSignal *executionSignals
executionSignals是一个高阶信号,所以在使用的时候需要进行降阶操作,降价操作在前面分析过了,在ReactiveCocoa v2.5中只支持3种降阶方式,flatten,switchToLatest,concat。降阶的方式就根据需求来选取。
还有选择原则是,如果在不允许Concurrent并发的RACCommand中一般使用switchToLatest。如果在允许Concurrent并发的RACCommand中一般使用flatten。
- RACSignal *executing
executing这个信号就表示了当前RACCommand是否在执行,信号里面的值都是BOOL类型的。YES表示的是RACCommand正在执行过程中,命名也说明的是正在进行时ing。NO表示的是RACCommand没有被执行或者已经执行结束。
- RACSignal *enabled
enabled信号就是一个开关,RACCommand是否可用。这个信号除去以下2种情况会返回NO:
RACCommand 初始化传入的enabledSignal信号,如果返回NO,那么enabled信号就返回NO。
RACCommand开始执行中,allowsConcurrentExecution为NO,那么enabled信号就返回NO。
除去以上2种情况以外,enabled信号基本都是返回YES。
- RACSignal *errors
errors信号就是RACCommand执行过程中产生的错误信号。这里特别需要注意的是:在对RACCommand进行错误处理的时候,我们不应该使用subscribeError:对RACCommand的executionSignals 进行错误的订阅,因为executionSignals这个信号是不会发送error事件的,那当RACCommand包裹的信号发送error事件时,我们要怎样去订阅到它呢?应该用subscribeNext:去订阅错误信号。
[commandSignal.errors subscribeNext:^(NSError *x) {
NSLog(@"ERROR! --> %@",x);
}];
- BOOL allowsConcurrentExecution
allowsConcurrentExecution是一个BOOL变量,它是用来表示当前RACCommand是否允许并发执行。默认值是NO。
如果allowsConcurrentExecution为NO,那么RACCommand在执行过程中,enabled信号就一定都返回NO,不允许并发执行。如果allowsConcurrentExecution为YES,允许并发执行。
如果是允许并发执行的话,就会出现多个信号就会出现一起发送值的情况。那么这种情况产生的高阶信号一般可以采取flatten(等效于flatten:0,+merge:)的方式进行降阶。
这个变量在具体实现中是用的volatile原子的操作,在实现中重写了它的get和set方法。
// 重写 get方法
- (BOOL)allowsConcurrentExecution {
return _allowsConcurrentExecution != 0;
}
// 重写 set方法
- (void)setAllowsConcurrentExecution:(BOOL)allowed {
[self willChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
if (allowed) {
OSAtomicOr32Barrier(1, &_allowsConcurrentExecution);
} else {
OSAtomicAnd32Barrier(0, &_allowsConcurrentExecution);
}
[self didChangeValueForKey:@keypath(self.allowsConcurrentExecution)];
}
OSAtomicOr32Barrier是原子运算,它的意义是进行逻辑的“或”运算。通过原子性操作访问被volatile修饰的_allowsConcurrentExecution对象即可保障函数只执行一次。相应的OSAtomicAnd32Barrier也是原子运算,它的意义是进行逻辑的“与”运算。
- NSArray *activeExecutionSignals
这个NSArray数组里面装了一个个有序排列的,执行中的信号。NSArray的数组是可以被KVO监听的。
- (NSArray *)activeExecutionSignals {
@synchronized (self) {
return [_activeExecutionSignals copy];
}
}
当然内部还有一个NSMutableArray的版本,NSArray数组是它的copy版本,使用它的时候需要加上线程锁,进行线程安全的保护。
在RACCommand内部,是对NSMutableArray数组进行操作的,在这里可变数组里面进行增加和删除的操作。
- (void)addActiveExecutionSignal:(RACSignal *)signal {
NSCParameterAssert([signal isKindOfClass:RACSignal.class]);
@synchronized (self) {
NSIndexSet *indexes = [NSIndexSet indexSetWithIndex:_activeExecutionSignals.count];
[self willChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
[_activeExecutionSignals addObject:signal];
[self didChange:NSKeyValueChangeInsertion valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
}
}
在往数组里面添加数据的时候是满足KVO的,这里对index进行了NSKeyValueChangeInsertion监听。
- (void)removeActiveExecutionSignal:(RACSignal *)signal {
NSCParameterAssert([signal isKindOfClass:RACSignal.class]);
@synchronized (self) {
NSIndexSet *indexes = [_activeExecutionSignals indexesOfObjectsPassingTest:^ BOOL (RACSignal *obj, NSUInteger index, BOOL *stop) {
return obj == signal;
}];
if (indexes.count == 0) return;
[self willChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
[_activeExecutionSignals removeObjectsAtIndexes:indexes];
[self didChange:NSKeyValueChangeRemoval valuesAtIndexes:indexes forKey:@keypath(self.activeExecutionSignals)];
}
}
在移除数组里面也是依照indexes来进行移除的。注意,增加和删除的操作都必须包在@synchronized (self)中保证线程安全。
+ (BOOL)automaticallyNotifiesObserversForKey:(NSString *)key {
return NO;
}
从上面增加和删除的操作中我们可以看见了RAC的作者在手动发送change notification,手动调用willChange: 和 didChange:方法。作者的目的在于防止一些不必要的swizzling可能会影响到增加和删除的操作,所以这里选择的手动发送通知的方式。
美团博客上这篇ReactiveCocoa核心元素与信号流文章里面对activeExecutionSignals的变化引起的一些变化画了一张数据流图:

除去没有影响到enabled信号,activeExecutionSignals的变化会影响到其他三个信号。
- RACSignal *immediateEnabled
这个信号也是一个enabled信号,但是和之前的enabled信号不同的是,它并不能保证在main thread主线程上,它可以在任意一个线程上。
- RACSignal * (^signalBlock)(id input)
这个闭包返回值是一个信号,这个闭包是在初始化RACCommand的时候会用到,下面分析源码的时候会出现。
- (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock;
- (id)initWithEnabled:(RACSignal *)enabledSignal signalBlock:(RACSignal * (^)(id input))signalBlock;
- (RACSignal *)execute:(id)input;
RACCommand 暴露出来的就3个方法,2个初始化方法和1个execute:的方法,接下来就来分析一下这些方法的底层实现。
initWithEnabled: signalBlock: 底层实现分析
首先先来看看比较短的那个初始化方法。
- (id)initWithSignalBlock:(RACSignal * (^)(id input))signalBlock {
return [self initWithEnabled:nil signalBlock:signalBlock];
}
initWithSignalBlock:方法实际就是调用了initWithEnabled: signalBlock:方法。
initWithSignalBlock:方法相当于第一个参数传的是nil的initWithEnabled: signalBlock:方法。第一个参数是enabledSignal,第二个参数是signalBlock的闭包。enabledSignal如果传的是nil,那么就相当于是传进了[RACSignal return:@YES]。
接下来详细分析一下initWithEnabled: signalBlock:方法的实现。
这个方法的实现非常长,需要分段来分析。RACCommand的初始化就是对自己的4个信号,executionSignals,executing,enabled,errors的初始化。
- executionSignals信号的初始化
RACSignal *newActiveExecutionSignals = [[[[[self rac_valuesAndChangesForKeyPath:@keypath(self.activeExecutionSignals) options:NSKeyValueObservingOptionNew observer:nil]
reduceEach:^(id _, NSDictionary *change) {
NSArray *signals = change[NSKeyValueChangeNewKey];
if (signals == nil) return [RACSignal empty];
return [signals.rac_sequence signalWithScheduler:RACScheduler.immediateScheduler];
}]
concat]
publish]
autoconnect];
通过rac_valuesAndChangesForKeyPath: options: observer: 方法监听self.activeExecutionSignals数组里面是否有增加新的信号。rac_valuesAndChangesForKeyPath: options: observer: 方法的返回时是一个RACTuple,它的定义是这样的:RACTuplePack(value, change)。
只要每次数组里面加入了新的信号,那么rac_valuesAndChangesForKeyPath: options: observer: 方法就会把新加的值和change字典包装成RACTuple返回。再对这个信号进行一次reduceEach:操作。
举个例子,change字典可能是如下的样子:
{
indexes = "[number of indexes: 1 (in 1 ranges), indexes: (0)]";
kind = 2;
new = (
" name: "
);
}
取出change[NSKeyValueChangeNewKey]就能取出每次变化新增的信号数组,然后把这个数组通过signalWithScheduler:转换成信号。
把原信号中每个值是里面装满RACTuple的信号通过变换,变换成了装满RACSingnal的三阶信号,通过concat进行降阶操作,降阶成了二阶信号。最后通过publish和autoconnect操作,把冷信号转换成热信号。
newActiveExecutionSignals最终是一个二阶热信号。
接下来再看看executionSignals是如何变换而来的。
_executionSignals = [[[newActiveExecutionSignals
map:^(RACSignal *signal) {
return [signal catchTo:[RACSignal empty]];
}]
deliverOn:RACScheduler.mainThreadScheduler]
setNameWithFormat:@"%@ -executionSignals", self];
executionSignals把newActiveExecutionSignals中错误信号都换成空信号。经过map变换之后,executionSignals是newActiveExecutionSignals的无错误信号的版本。由于map只是变换并没有降阶,所以executionSignals还是一个二阶的高阶冷信号。
注意最后加上了deliverOn,executionSignals信号每个值都是在主线程中发送的。
- errors信号的初始化
在RACCommand中会搜集其所有的error信号,都装进自己的errors的信号中。这也是RACCommand的特点之一,能把错误统一处理。
RACMulticastConnection *errorsConnection = [[[newActiveExecutionSignals
flattenMap:^(RACSignal *signal) {
return [[signal ignoreValues]
catch:^(NSError *error) {
return [RACSignal return:error];
}];
}]
deliverOn:RACScheduler.mainThreadScheduler]
publish];
从上面分析中,我们知道,newActiveExecutionSignals最终是一个二阶热信号。这里在errorsConnection的变换中,我们对这个二阶的热信号进行flattenMap:降阶操作,只留下所有的错误信号,最后把所有的错误信号都装在一个低阶的信号中,这个信号中每个值都是一个error。同样,变换中也追加了deliverOn:操作,回到主线程中去操作。最后把这个冷信号转换成热信号,但是注意,还没有connect。
_errors = [errorsConnection.signal setNameWithFormat:@"%@ -errors", self];
[errorsConnection connect];
假设某个订阅者在RACCommand中的信号已经开始执行之后才订阅的,如果错误信号是一个冷信号,那么订阅之前的错误就接收不到了。所以错误应该是一个热信号,不管什么时候订阅都可以接收到所有的错误。
error信号就是热信号errorsConnection传出来的一个热信号。error信号每个值都是在主线程上发送的。
- executing信号的初始化
executing这个信号表示了当前RACCommand是否在执行,信号里面的值都是BOOL类型的。那么如何拿到这样一个BOOL信号呢?
RACSignal *immediateExecuting = [RACObserve(self, activeExecutionSignals) map:^(NSArray *activeSignals) {
return @(activeSignals.count > 0);
}];
由于self.activeExecutionSignals是可以被KVO的,所以每当activeExecutionSignals变化的时候,判断当前数组里面是否还有信号,如果数组里面有值,就代表了当前有在执行中的信号。
_executing = [[[[[immediateExecuting
deliverOn:RACScheduler.mainThreadScheduler]
startWith:@NO]
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -executing", self];
immediateExecuting信号表示当前是否有信号在执行。初始值为NO,一旦immediateExecuting不为NO的时候就会发出信号。最后通过replayLast转换成永远只保存最新的一个值的热信号。
executing信号除去第一个默认值NO,其他的每个值也是在主线程中发送的。
- enabled信号的初始化
RACSignal *moreExecutionsAllowed = [RACSignal
if:RACObserve(self, allowsConcurrentExecution)
then:[RACSignal return:@YES]
else:[immediateExecuting not]];
先监听self.allowsConcurrentExecution变量是否有变化,allowsConcurrentExecution默认值为NO。如果有变化,allowsConcurrentExecution为YES,就说明允许并发执行,那么就返回YES的RACSignal,allowsConcurrentExecution为NO,就说明不允许并发执行,那么就要看当前是否有正在执行的信号。immediateExecuting就是代表当前是否有在执行的信号,对这个信号取非,就是是否允许执行下一个信号的BOOL值。这就是moreExecutionsAllowed的信号。
if (enabledSignal == nil) {
enabledSignal = [RACSignal return:@YES];
} else {
enabledSignal = [[[enabledSignal
startWith:@YES]
takeUntil:self.rac_willDeallocSignal]
replayLast];
}
这里的代码就说明了,如果第一个参数传的是nil,那么就相当于传进来了一个[RACSignal return:@YES]信号。
如果enabledSignal不为nil,就在enabledSignal信号前面插入一个YES的信号,目的是为了防止传入的enabledSignal虽然不为nil,但是里面是没有信号的,比如[RACSignal never],[RACSignal empty],这些信号传进来也相当于是没用的,所以在开头加一个YES的初始值信号。
最后同样通过replayLast操作转换成只保存最新的一个值的热信号。
_immediateEnabled = [[RACSignal
combineLatest:@[ enabledSignal, moreExecutionsAllowed ]]
and];
这里涉及到了combineLatest:的变换操作,这个操作在之前的文章里面分析过了,这里不再详细分析源码实现。combineLatest:的作用就是把后面数组里面传入的每个信号,不管是谁发送出来一个信号,都会把数组里面所有信号的最新的值组合到一个RACTuple里面。immediateEnabled会把每个RACTuple里面的元素都进行逻辑and运算,这样immediateEnabled信号里面装的也都是BOOL值了。
immediateEnabled信号的意义就是每时每刻监听RACCommand是否可以enabled。它是由2个信号进行and操作得来的。每当allowsConcurrentExecution变化的时候就会产生一个信号,此时再加上enabledSignal信号,就能判断这一刻RACCommand是否能够enabled。每当enabledSignal变化的时候也会产生一个信号,再加上allowsConcurrentExecution是否允许并发,也能判断这一刻RACCommand是否能够enabled。所以immediateEnabled是由这两个信号combineLatest:之后再进行and操作得来的。
_enabled = [[[[[self.immediateEnabled
take:1]
concat:[[self.immediateEnabled skip:1] deliverOn:RACScheduler.mainThreadScheduler]]
distinctUntilChanged]
replayLast]
setNameWithFormat:@"%@ -enabled", self];
由上面源码可以知道,self.immediateEnabled是由enabledSignal, moreExecutionsAllowed组合而成的。根据源码,enabledSignal的第一个信号值一定是[RACSignal return:@YES],moreExecutionsAllowed是RACObserve(self, allowsConcurrentExecution)产生的,由于allowsConcurrentExecution默认值是NO,所以moreExecutionsAllowed的第一个值是[immediateExecuting not]。
这里比较奇怪的地方是为何要用一次concat操作,把第一个信号值和后面的连接起来。如果直接写[self.immediateEnabled deliverOn:RACScheduler.mainThreadScheduler],那么整个self.immediateEnabled就都在主线程上了。作者既然没有这么写,肯定是有原因的。
通过查看文档,明白了作者的意图,作者的目的是为了让第一个值以后的每个值都发送在主线程上,所以这里skip:1之后接着deliverOn:RACScheduler.mainThreadScheduler。那第一个值呢?第一个值在一订阅的时候就发送出去了,同订阅者所在线程一致。
distinctUntilChanged保证enabled信号每次状态变化的时候只取到一个状态值。最后调用replayLast转换成只保存最新值的热信号。
从源码上看,enabled信号除去第一个值以外的每个值也都是在主线程上发送的。
execute:底层实现分析
- (RACSignal *)execute:(id)input {
// 1
BOOL enabled = [[self.immediateEnabled first] boolValue];
if (!enabled) {
NSError *error = [NSError errorWithDomain:RACCommandErrorDomain code:RACCommandErrorNotEnabled userInfo:@{
NSLocalizedDescriptionKey: NSLocalizedString(@"The command is disabled and cannot be executed", nil),RACUnderlyingCommandErrorKey: self }];
return [RACSignal error:error];
}
// 2
RACSignal *signal = self.signalBlock(input);
NSCAssert(signal != nil, @"nil signal returned from signal block for value: %@", input);
// 3
RACMulticastConnection *connection = [[signal subscribeOn:RACScheduler.mainThreadScheduler] multicast:[RACReplaySubject subject]];
@weakify(self);
// 4
[self addActiveExecutionSignal:connection.signal];
[connection.signal subscribeError:^(NSError *error) {
@strongify(self);
// 5
[self removeActiveExecutionSignal:connection.signal];
} completed:^{
@strongify(self);
// 5
[self removeActiveExecutionSignal:connection.signal];
}];
[connection connect];
// 6
return [connection.signal setNameWithFormat:@"%@ -execute: %@", self, [input rac_description]];
}
把上述代码分成6步来分析:
- self.immediateEnabled为了保证第一个值能正常的发送给订阅者,所以这里用了同步的first的方法,也是可以接受的。调用了first方法之后,根据这第一个值来判断RACCommand是否可以开始执行。如果不能执行就返回一个错误信号。
- 这里就是RACCommand开始执行的地方。self.signalBlock是RACCommand在初始化的时候传入的一个参数,RACSignal * (^signalBlock)(id input)这个闭包的入参是一个id input,返回值是一个信号。这里正好把execute的入参input传进来。
- 把RACCommand执行之后的信号先调用subscribeOn:保证didSubscribe block( )闭包在主线程中执行,再转换成RACMulticastConnection,准备转换成热信号。
- 在最终的信号被订阅者订阅之前,我们需要优先更新RACCommand里面的executing和enabled信号,所以这里要先把connection.signal加入到self.activeExecutionSignals数组里面。
- 订阅最终结果信号,出现错误或者完成,都要更新self.activeExecutionSignals数组。 这里想说明的是,最终的execute:返回的信号,和executionSignals是一样的。
Category
RACCommand在日常iOS开发过程中,很适合上下拉刷新,按钮点击等操作,所以ReactiveCocoa就帮我们在这些UI控件上封装了一个RACCommand属性——rac_command。
- UIBarButtonItem+RACCommandSupport
一旦UIBarButtonItem被点击,RACCommand就会执行。
- (RACCommand *)rac_command {
return objc_getAssociatedObject(self, UIControlRACCommandKey);
}
- (void)setRac_command:(RACCommand *)command {
objc_setAssociatedObject(self, UIControlRACCommandKey, command, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
// 检查已经存储过的信号,移除老的,添加一个新的
RACDisposable *disposable = objc_getAssociatedObject(self, UIControlEnabledDisposableKey);
[disposable dispose];
if (command == nil) return;
disposable = [command.enabled setKeyPath:@keypath(self.enabled) onObject:self];
objc_setAssociatedObject(self, UIControlEnabledDisposableKey, disposable, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
[self rac_hijackActionAndTargetIfNeeded];
}
给UIBarButtonItem添加rac_command属性用到了runtime里面的AssociatedObject关联对象。这里给UIBarButtonItem类新增了2个关联对象,key分别是UIControlRACCommandKey,UIControlEnabledDisposableKey。UIControlRACCommandKey对应的是绑定的command,UIControlEnabledDisposableKey对应的是command.enabled的disposable信号。
set方法里面最后会调用rac_hijackActionAndTargetIfNeeded,这个方法需要特别注意:
- (void)rac_hijackActionAndTargetIfNeeded {
SEL hijackSelector = @selector(rac_commandPerformAction:);
if (self.target == self && self.action == hijackSelector) return;
if (self.target != nil) NSLog(@"WARNING: UIBarButtonItem.rac_command hijacks the control's existing target and action.");
self.target = self;
self.action = hijackSelector;
}
- (void)rac_commandPerformAction:(id)sender {
[self.rac_command execute:sender];
}
rac_hijackActionAndTargetIfNeeded方法是对当前UIBarButtonItem的target和action进行检查。
如果当前UIBarButtonItem的target = self,并且action = @selector(rac_commandPerformAction:),那么就算检查通过符合执行RACCommand的前提条件了,直接return。
如果上述条件不符合,就强制改变UIBarButtonItem的target = self,并且action = @selector(rac_commandPerformAction:),所以这里需要注意的就是,UIBarButtonItem调用rac_command,会被强制改变它的target和action。
- UIButton+RACCommandSupport
一旦UIButton被点击,RACCommand就会执行。
- (RACCommand *)rac_command {
return objc_getAssociatedObject(self, UIButtonRACCommandKey);
}
- (void)setRac_command:(RACCommand *)command {
objc_setAssociatedObject(self, UIButtonRACCommandKey, command, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
RACDisposable *disposable = objc_getAssociatedObject(self, UIButtonEnabledDisposableKey);
[disposable dispose];
if (command == nil) return;
disposable = [command.enabled setKeyPath:@keypath(self.enabled) onObject:self];
objc_setAssociatedObject(self, UIButtonEnabledDisposableKey, disposable, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
[self rac_hijackActionAndTargetIfNeeded];
}
这里给UIButton添加绑定2个属性同样也用到了runtime里面的AssociatedObject关联对象。代码和UIBarButtonItem的实现基本一样。同样是给UIButton类新增了2个关联对象,key分别是UIButtonRACCommandKey,UIButtonEnabledDisposableKey。UIButtonRACCommandKey对应的是绑定的command,UIButtonEnabledDisposableKey对应的是command.enabled的disposable信号。
- (void)rac_hijackActionAndTargetIfNeeded {
SEL hijackSelector = @selector(rac_commandPerformAction:);
for (NSString *selector in [self actionsForTarget:self forControlEvent:UIControlEventTouchUpInside]) {
if (hijackSelector == NSSelectorFromString(selector)) {
return;
}
}
[self addTarget:self action:hijackSelector forControlEvents:UIControlEventTouchUpInside];
}
- (void)rac_commandPerformAction:(id)sender {
[self.rac_command execute:sender];
}
rac_hijackActionAndTargetIfNeeded函数的意思和之前的一样,也是检查UIButton的target和action。最终结果的UIButton的target = self,action = @selector(rac_commandPerformAction:)
- UIRefreshControl+RACCommandSupport
- (RACCommand *)rac_command {
return objc_getAssociatedObject(self, UIRefreshControlRACCommandKey);
}
- (void)setRac_command:(RACCommand *)command {
objc_setAssociatedObject(self, UIRefreshControlRACCommandKey, command, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
[objc_getAssociatedObject(self, UIRefreshControlDisposableKey) dispose];
if (command == nil) return;
RACDisposable *enabledDisposable = [command.enabled setKeyPath:@keypath(self.enabled) onObject:self];
RACDisposable *executionDisposable = [[[[self
rac_signalForControlEvents:UIControlEventValueChanged]
map:^(UIRefreshControl *x) {
return [[[command
execute:x]
catchTo:[RACSignal empty]]
then:^{
return [RACSignal return:x];
}];
}]
concat]
subscribeNext:^(UIRefreshControl *x) {
[x endRefreshing];
}];
RACDisposable *commandDisposable = [RACCompoundDisposable compoundDisposableWithDisposables:@[ enabledDisposable, executionDisposable ]];
objc_setAssociatedObject(self, UIRefreshControlDisposableKey, commandDisposable, OBJC_ASSOCIATION_RETAIN_NONATOMIC);
}
这里给UIRefreshControl添加绑定2个属性同样也用到了runtime里面的AssociatedObject关联对象。代码和UIBarButtonItem的实现基本一样。同样是给UIButton类新增了2个关联对象,key分别是UIRefreshControlRACCommandKey,UIRefreshControlDisposableKey。UIRefreshControlRACCommandKey对应的是绑定的command,UIRefreshControlDisposableKey对应的是command.enabled的disposable信号。
这里多了一个executionDisposable信号,这个信号是用来结束刷新操作的。
[[[command execute:x] catchTo:[RACSignal empty]] then:^{ return [RACSignal return:x]; }];
这个信号变换先把RACCommand执行,执行之后得到的结果信号剔除掉所有的错误。then操作就是忽略掉所有值,在最后添加一个返回UIRefreshControl对象的信号。
[self rac_signalForControlEvents:UIControlEventValueChanged]之后再map升阶为高阶信号,所以最后用concat降阶。最后订阅这个信号,订阅只会收到一个值,command执行完毕之后的信号发送完所有的值的时候,即收到这个值的时刻就是最终刷新结束的时刻。
所以最终的disposable信号还要加上executionDisposable。