Skip to content

Commit

Permalink
Merge pull request #275 from leontedev/master
Browse files Browse the repository at this point in the history
Adds latest parameter for throttle
  • Loading branch information
srdanrasic authored Oct 25, 2021
2 parents 9821826 + 3ba9556 commit 0feaff7
Showing 1 changed file with 58 additions and 31 deletions.
89 changes: 58 additions & 31 deletions Sources/SignalProtocol+Filtering.swift
Original file line number Diff line number Diff line change
Expand Up @@ -374,44 +374,71 @@ extension SignalProtocol {
}
}

/// Throttle the signal to emit at most one element per given `seconds` interval. Signal will emit latest element from each interval.
/// Throttle the signal to emit at most one element per given `seconds` interval. Emits either the most-recent or first element in the specified time interval.
///
/// - parameter seconds: The interval at which to find and emit either the most recent or the first element.
/// - parameter latest: Defaults to `true`. A Bool value that indicates whether to publish the most recent element. If `false`, it emits the first element received during the interval. If `true` Signal will emit latest element from each interval.
///
/// Check out interactive example at [https://rxmarbles.com/#throttle](https://rxmarbles.com/#throttle)
public func throttle(for seconds: Double, queue: DispatchQueue = DispatchQueue(label: "com.reactive_kit.signal.throttle")) -> Signal<Element, Error> {
return Signal { observer in
var isInitialElement = true
var throttledDisposable: Disposable? = nil
var lastElement: Element? = nil
var isFinished: Bool = false
return self.observe { event in
queue.async {
switch event {
case .next(let element):
if isInitialElement {
isInitialElement = false
observer.receive(element)
} else {
lastElement = element
}
guard throttledDisposable == nil else { return }
throttledDisposable = queue.disposableAfter(when: seconds) {
if let element = lastElement {
public func throttle(
for seconds: Double,
queue: DispatchQueue = DispatchQueue(label: "com.reactive_kit.signal.throttle"),
latest: Bool = true
) -> Signal<Element, Error> {
if latest {
return Signal { observer in
var isInitialElement = true
var throttledDisposable: Disposable? = nil
var lastElement: Element? = nil
var isFinished: Bool = false
return self.observe { event in
queue.async {
switch event {
case .next(let element):
if isInitialElement {
isInitialElement = false
observer.receive(element)
lastElement = nil
} else {
lastElement = element
}
if isFinished {
observer.receive(completion: .finished)
guard throttledDisposable == nil else { return }
throttledDisposable = queue.disposableAfter(when: seconds) {
if let element = lastElement {
observer.receive(element)
lastElement = nil
}
if isFinished {
observer.receive(completion: .finished)
}
throttledDisposable = nil
}
throttledDisposable = nil
case .failed(let error):
observer.receive(completion: .failure(error))
case .completed:
guard throttledDisposable == nil else {
isFinished = true
return
}
observer.receive(completion: .finished)
}
case .failed(let error):
observer.receive(completion: .failure(error))
case .completed:
guard throttledDisposable == nil else {
isFinished = true
return
}
}
}
} else {
return Signal { observer in
let lock = NSRecursiveLock(name: "com.reactive_kit.signal.throttle")
var _lastEventTime: DispatchTime?
return self.observe { event in
switch event {
case .next(let element):
lock.lock(); defer { lock.unlock() }
let now = DispatchTime.now()
if _lastEventTime == nil || now.rawValue > (_lastEventTime! + seconds).rawValue {
_lastEventTime = now
observer.receive(element)
}
observer.receive(completion: .finished)
default:
observer.on(event)
}
}
}
Expand Down

0 comments on commit 0feaff7

Please sign in to comment.