在RxSwift中,我们可以使用hasObserver
检查*Subject
是否有任何观察者,如何在例如PassthroughSubject
Combine中做到这一点?
在RxSwift中,我们可以使用hasObserver
检查*Subject
是否有任何观察者,如何在例如PassthroughSubject
Combine中做到这一点?
No one time needed this... Apple does not provide this by API, and, actually, I do not recommend such thing, because it is like manually checking value of retainCount
in pre-ARC Objective-C for some decision in code.
(不需要任何时间... Apple并没有通过API提供此功能,实际上,我不推荐这样的事情,因为这就像在ARC Objective-C之前手动检查retainCount
值以决定代码。)
Anyway it is possible.
(无论如何都是可能的。)
Let's consider it as a lab exercise.(让我们将其视为实验室练习。)
Hope someone find this helpful.(希望有人觉得这有帮助。)
Disclaimer: below code was not tested with all Publisher(s) and not safe as for some real-world project.
(免责声明:以下代码并未在所有发布服务器上进行测试,因此对于某些实际项目并不安全。)
It is just approach demo.(这只是方法演示。)
So, as there are many kind of publishers and all of them are final and private and, moreover there might be come via type-eraser, we needed generic thing applying to any publisher, thus operator
(因此,由于存在多种类型的发布者,并且所有发布者都是最终的和私有的,而且可能通过类型擦除器来实现,因此我们需要将通用的东西应用于任何发布者,从而使运算符)
extension Publisher {
public func countingSubscribers(_ callback: ((Int) -> Void)? = nil)
-> Publishers.SubscribersCounter<Self> {
return Publishers.SubscribersCounter<Self>(upstream: self, callback: callback)
}
}
Operator gives us possibility to inject in any place of of publishers chain and provide interesting value via callback.
(运算符使我们有可能注入发布者链的任何位置,并通过回调提供有趣的价值。)
Interesting value in our case will be count of subscribers.(在我们的案例中,有趣的价值将是订阅者的数量。)
As operator is injected in both Upstream & Downstream we need bidirectional custom pipe implementation, ie.
(由于在上游和下游都注入了操作员,因此我们需要双向自定义管道实施。)
custom publisher, custom subscriber, custom subscription.(自定义发布者,自定义订阅者,自定义订阅。)
In our case they must be transparent, as we don't need to modify streams... actually it will be Combine-proxy.(在我们的情况下,它们必须是透明的,因为我们不需要修改流……实际上,它将是Combine-proxy。)
Posible usage:
(可能的用法:)
1) when SubscribersCounter
publisher is last in chain, the numberOfSubscribers
property can be used directly
(1)当SubscribersCounter
发布者位于链中的最后一位时,可以直接使用numberOfSubscribers
属性)
let publisher = NotificationCenter.default
.publisher(for: UIApplication.didBecomeActiveNotification)
.countingSubscribers()
...
publisher.numberOfSubscribers
2) when it somewhere in the middle of the chain, then receive callback about changed subscribers count
(2)当它位于链的中间时,则接收有关已更改订户数的回调)
let publisher = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.google.com")!)
.countingSubscribers({ count in print("Observers: (count)") })
.receive(on: DispatchQueue.main)
.map { _ in "Data received" }
.replaceError(with: "An error occurred")
Here is implementation:
(这是实现:)
import Combine
extension Publishers {
public class SubscribersCounter<Upstream> : Publisher where Upstream : Publisher {
private(set) var numberOfSubscribers = 0
public typealias Output = Upstream.Output
public typealias Failure = Upstream.Failure
public let upstream: Upstream
public let callback: ((Int) -> Void)?
public init(upstream: Upstream, callback: ((Int) -> Void)?) {
self.upstream = upstream
self.callback = callback
}
public func receive<S>(subscriber: S) where S : Subscriber,
Upstream.Failure == S.Failure, Upstream.Output == S.Input {
self.increase()
upstream.receive(subscriber: SubscribersCounterSubscriber<S>(counter: self, subscriber: subscriber))
}
fileprivate func increase() {
numberOfSubscribers += 1
self.callback?(numberOfSubscribers)
}
fileprivate func decrease() {
numberOfSubscribers -= 1
self.callback?(numberOfSubscribers)
}
// own subscriber is needed to intercept upstream/downstream events
private class SubscribersCounterSubscriber<S> : Subscriber where S: Subscriber {
let counter: SubscribersCounter<Upstream>
let subscriber: S
init (counter: SubscribersCounter<Upstream>, subscriber: S) {
self.counter = counter
self.subscriber = subscriber
}
deinit {
Swift.print(">> Subscriber deinit")
}
func receive(subscription: Subscription) {
subscriber.receive(subscription: SubscribersCounterSubscription<Upstream>(counter: counter, subscription: subscription))
}
func receive(_ input: S.Input) -> Subscribers.Demand {
return subscriber.receive(input)
}
func receive(completion: Subscribers.Completion<S.Failure>) {
subscriber.receive(completion: completion)
}
typealias Input = S.Input
typealias Failure = S.Failure
}
// own subcription is needed to handle cancel and decrease
private class SubscribersCounterSubscription<Upstream>: Subscription where Upstream: Publisher {
let counter: SubscribersCounter<Upstream>
let wrapped: Subscription
private var cancelled = false
init(counter: SubscribersCounter<Upstream>, subscription: Subscription) {
self.counter = counter
self.wrapped = subscription
}
deinit {
Swift.print(">> Subscription deinit")
if !cancelled {
counter.decrease()
}
}
func request(_ demand: Subscribers.Demand) {
wrapped.request(demand)
}
func cancel() {
wrapped.cancel()
if !cancelled {
cancelled = true
counter.decrease()
}
}
}
}
}