利用 GCD 提供的 dispatch_async 方法在并发队列中执行任务时, 其任务的执行顺序以及并发数量不可控, 想要实现并发控制的解决方案其实有很多. 比如使用 NSOperationQueuemaxConcurrentOperationCount 属性可以控制 NSOperation 中同时执行任务的最大数量. 下面我们来看看再 GCD 中的解决方法

dispatch_semaphore

我们先来看看有关 dispatch_semaphore 的三个方法.

1
2
3
dispatch_semaphore_create => 创建一个信号量
dispatch_semaphore_signal => 发送一个信号
dispatch_semaphore_wait => 等待信号

我们可以像这样 dispatch_semaphore_t semaphore = dispatch_semaphore_create(0) 创建一个信号量, 方法需要传入一个 long 型的参数, 我们可以想象其为库存量.

dispatch_semaphore_wait 很直观可以判断其是一个阻塞当前线程的方法, 其等待逻辑是每运行一次, 响应的库存就会减少一, 当库存量为 0 时, 这个方法就会根据传入的等待时间, 来决定等待添加库存的时间. 如果设置成 DISPATCHTIMEFOREVER, 那么就是永久等待添加库存, 否则就永远不往下执行.

dispatch_semaphore_signal 就是添加库存的方法.

考虑下面的代码:

1
2
3
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
NSLog(@"等待 semaphore");

上面代码中 NSLog(@"等待 semaphore"); 永远不会执行, 因为初始化的信号量库存是 0, 且等待库存增加的时间为 DISPATCH_TIME_FOREVER, 也就是说, 除非有地方执行 dispatch_semaphore_signal 增加库存, 否则就永远等待.

我们来看一个实际应用的场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ThreadSafeDictionary *dict = [[ThreadSafeDictionary alloc] init];
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
dispatch_async(concurrentQueue, ^{
for (int i = 0; i < 1000; i++) {
[dict setObject:@(i) forKey:[NSString stringWithFormat:@"%d", i] block:^(ThreadSafeDictionary *dict, NSString *key, id object) {
}];
}
dispatch_semaphore_signal(semaphore);
});
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
NSLog(@"----End dict = %@", dict);

我们使用一个自定义线程安全的 Dictionary, 声明信号量库存为 0, 并执行一个 1000 次的写入数据的循环, 并选择在循环结束后,增加信号库存, 由于向字典写入数据的是异步在并发队列中的, 所以并不会阻塞当前线程, 会直接继续执行到dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);这行代码, 显而易见, 只有当 1000 次的写入数据结束, 库存量才会增加, 因此最后的 NSLog 会输出完整的字典.

当然我们也可以利用 dispatch_semaphore_t 控制并发数量. 下面是代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@implementation CustomOperationQueue
- (id)initWithConcurrentCount:(int)count {
self = [super init];
if (self) {
if (count < 1) count = 5;
semaphore = dispatch_semaphore_create(count);
queue = Dispatch_queue_create("com.custom.queue", DISPATCH_QUEUE_CONCURRENT);
}
}
-(id)init {
return [self initWithConcurrentCount:5];
}
- (void)addTask:(TaskBlock)block {
dispatch_async(queue, ^{
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
dispatch_async(dispatch_get_global_queue(0, 0), ^{
if (block) {
block();
}
dispatch_semaphore_signal(semaphore);
});
});
}

我们把重点放在 - (void)addTask:(TaskBlock)block 方法中.
首先我们创建了一个库存量为 5 的信号量.
在方法内部因为库存为 5, 所以每添加一个任务 dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER); 都会放行, 并减少一个库存, 而当我们完成一个任务, 也就是 block() 执行完毕后, 又会调用 dispatch_semaphore_signal(semaphore) 方法增加一个库存.
那么假如我们执行的任务耗时很长, 我们就会一直消耗库存, 如果添加到第 6 个任务时, 此时的库存数为 0,所以 dispatch_semaphore_wait 方法就会一直等待, 不执行第 6 个任务, 直到前面的某个任务完成,又增加了库存量, 才会放行执行第 6 个任务. 这样我们就实现了并发量的控制.

dispatch_group

检测并发任务是否全部执行完毕是 dispatch_group 常用的应用场景. 我们直接来看代码

1
2
3
4
5
dispatch_group_t serviceGroup = dispatch_group_create(); // 开始第一个请求 // 进入组 dispatch_group_enter(serviceGroup); [self.configService startWithCompletion:^(ConfigResponse *results, NSError* error){ configError = error; // 离开组 dispatch_group_leave(serviceGroup); }]; // 开始第二个请求 // 进入组 dispatch_group_enter(serviceGroup); [self.preferenceService startWithCompletion:^(PreferenceResponse *results, NSError* error){
// 离开组 preferenceError = error;
dispatch_group_leave(serviceGroup); }]; // 当组内任务清空后,通知主线程完成了所有的任务
dispatch_group_notify(serviceGroup, dispatch_get_main_queue(),^{
NSLog(@"两个请求都已经完成"); });