diff --git a/Sources/SignalProtocol+Filtering.swift b/Sources/SignalProtocol+Filtering.swift index f8a0d3b..6dae1c0 100644 --- a/Sources/SignalProtocol+Filtering.swift +++ b/Sources/SignalProtocol+Filtering.swift @@ -374,44 +374,71 @@ extension SignalProtocol { } } - /// Throttle the signal to emit at most one element per given `seconds` interval. Signal will emit latest element from each interval. + /// Throttle the signal to emit at most one element per given `seconds` interval. Emits either the most-recent or first element in the specified time interval. + /// + /// - parameter seconds: The interval at which to find and emit either the most recent or the first element. + /// - parameter latest: Defaults to `true`. A Bool value that indicates whether to publish the most recent element. If `false`, it emits the first element received during the interval. If `true` Signal will emit latest element from each interval. /// /// Check out interactive example at [https://rxmarbles.com/#throttle](https://rxmarbles.com/#throttle) - public func throttle(for seconds: Double, queue: DispatchQueue = DispatchQueue(label: "com.reactive_kit.signal.throttle")) -> Signal { - return Signal { observer in - var isInitialElement = true - var throttledDisposable: Disposable? = nil - var lastElement: Element? = nil - var isFinished: Bool = false - return self.observe { event in - queue.async { - switch event { - case .next(let element): - if isInitialElement { - isInitialElement = false - observer.receive(element) - } else { - lastElement = element - } - guard throttledDisposable == nil else { return } - throttledDisposable = queue.disposableAfter(when: seconds) { - if let element = lastElement { + public func throttle( + for seconds: Double, + queue: DispatchQueue = DispatchQueue(label: "com.reactive_kit.signal.throttle"), + latest: Bool = true + ) -> Signal { + if latest { + return Signal { observer in + var isInitialElement = true + var throttledDisposable: Disposable? = nil + var lastElement: Element? = nil + var isFinished: Bool = false + return self.observe { event in + queue.async { + switch event { + case .next(let element): + if isInitialElement { + isInitialElement = false observer.receive(element) - lastElement = nil + } else { + lastElement = element } - if isFinished { - observer.receive(completion: .finished) + guard throttledDisposable == nil else { return } + throttledDisposable = queue.disposableAfter(when: seconds) { + if let element = lastElement { + observer.receive(element) + lastElement = nil + } + if isFinished { + observer.receive(completion: .finished) + } + throttledDisposable = nil } - throttledDisposable = nil + case .failed(let error): + observer.receive(completion: .failure(error)) + case .completed: + guard throttledDisposable == nil else { + isFinished = true + return + } + observer.receive(completion: .finished) } - case .failed(let error): - observer.receive(completion: .failure(error)) - case .completed: - guard throttledDisposable == nil else { - isFinished = true - return + } + } + } + } else { + return Signal { observer in + let lock = NSRecursiveLock(name: "com.reactive_kit.signal.throttle") + var _lastEventTime: DispatchTime? + return self.observe { event in + switch event { + case .next(let element): + lock.lock(); defer { lock.unlock() } + let now = DispatchTime.now() + if _lastEventTime == nil || now.rawValue > (_lastEventTime! + seconds).rawValue { + _lastEventTime = now + observer.receive(element) } - observer.receive(completion: .finished) + default: + observer.on(event) } } }