If I specify the maxPublishers
parameter then source events after first maxPublishers events won't be flat mapped. While I want to limit only concurrency. That is to continue processing next events after some of the first maxPublishers flat map publishers have completed.
Publishers.Merge(
addImageRequestSubject
.flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
.compactMap { $0 }
.flatMap(maxPublishers: .max(3)) { self.addImage($0) },
addVideoRequestSubject
.flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
I've also tried to limit concurrency with help of OperationQueue. But maxConcurrentOperationCount
seems doesn't have an effect.
Publishers.Merge(
addImageRequestSubject
.receive(on: imageCompressionQueue)
.flatMap { self.compressImage($0) }
.compactMap { $0 }
.receive(on: mediaAddingQueue)
.flatMap { self.addImage($0) },
addVideoRequestSubject
.receive(on: mediaAddingQueue)
.flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)
private lazy var imageCompressionQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
private lazy var mediaAddingQueue: OperationQueue = {
var queue = OperationQueue()
queue.maxConcurrentOperationCount = 3
return queue
}()
Flat map publishers look this way:
func compressImage(_ image: UIImage) -> Future<Data?, Never> {
Future { promise in
DispatchQueue.global().async {
let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
promise(Result.success(result))
}
}
}
See Question&Answers more detail:os