Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a sequence of stock ticks coming in and I want to take all the data in the last hour and do some processing on it. I am trying to achieve this with reactive extensions 2.0. I read on another post to use Interval but i think that is deprecated.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
480 views
Welcome To Ask or Share your Answers For Others

1 Answer

Would this extension method solve your problem?

public static IObservable<T[]> RollingBuffer<T>(
    this IObservable<T> @this,
    TimeSpan buffering)
{
    return Observable.Create<T[]>(o =>
    {
        var list = new LinkedList<Timestamped<T>>();
        return @this.Timestamp().Subscribe(tx =>
        {
            list.AddLast(tx);
            while (list.First.Value.Timestamp < DateTime.Now.Subtract(buffering))
            {
                list.RemoveFirst();
            }
            o.OnNext(list.Select(tx2 => tx2.Value).ToArray());
        }, ex => o.OnError(ex), () => o.OnCompleted());
    });
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...