Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a sequence of events that happen every 10-1000 ms. I subscribe to this source of events, but want to handle them at a fixed (or minimum) interval of 500ms. I also want to process ONE event at a time, not in batches (like Buffer(x > 1)).

Something like this in pseudo code:

observable.MinimumInterval(TimeSpan.FromMiliseconds(500)).Subscribe(v=>...);

Tried e.g.:

observable.Buffer(1).Delay(TimeSpan.FromMiliseconds(500).Subscribe(v=>...);

and a lot of other potential solutions. No luck so far.

Any ideas?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
274 views
Welcome To Ask or Share your Answers For Others

1 Answer

I answered this very question on my blog here.

Reproducing (in case of link rot!) with the addition of presenting as an extension method:

Constraining a stream of events in Rx to a maximum rate

Sometimes, you want to limit the rate at which events arrive from an Rx stream.

The Throttle operator will suppress an event if another arrives within a specified interval. This is very useful in many instances, but it does have two important side-effects – even an unsuppressed event will be delayed by the interval, and events will get dropped altogether if they arrive too quickly.

I came across a situation where both of these were unacceptable. In this particular case, the desired behaviour was as follows: The events should be output at a maximum rate specified by a TimeSpan, but otherwise as soon as possible.

One solution works like this. Imagine our input stream is a bunch of people arriving at a railway station. For our output, we want people leave the station at a maximum rate. We set the maximum rate by having each person stand at the front of a flatbed railroad truck and sending that truck out of the station at a fixed speed. Because there is only one track, and all trucks travel at the same speed and have the same length, people will leave the station at a maximum rate when trucks are departing back-to-back. However, if the track is clear, the next person will be able to depart immediately.

So how do we translate this metaphor into Rx?

We will use the Concat operator’s ability to accept a stream of streams and merge them together back-to-back – just like sending railroad trucks down the track.

To get the equivalent of each person onto a railroad truck, we will use a Select to project each event (person) to an observable sequence (railroad truck) that starts with a single OnNext event (the person) and ends with an OnComplete exactly the defined interval later.

Lets assume the input events are an IObservable in the variable input. Here’s the code:

var paced = input.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

As an extension method this becomes:

public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
    return source.Select(i => Observable.Empty<T>()
                                        .Delay(interval)
                                        .StartWith(i)).Concat();

}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...