I'm building a straightforward processing pipeline where an item is fetched as an input, it is being operated by multiple processors in a sequential manner and finally it is output. Image below describes the overall architecture:
The way it is currently working: Pipeline is fetching items from the provider as quickly as it can. As soon as an item is fetched, it is passed to the processors. Once an item is processed, the output is notified. While an individual item is processed in a sequential manner, multiple items may be processed in parallel (depending on how fast they are fetched from the provider).
The IObservable
created and returned from the pipeline looks like this:
return Observable.Create<T>(async observer =>
{
while (_provider.HasNext)
{
T item = await _provider.GetNextAsync();
observer.OnNext(item);
}
}).SelectMany(item => Observable.FromAsync(() =>
_processors.Aggregate(
seed: Task.FromResult(item),
func: (current, processor) => current.ContinueWith( // Append continuations.
previous => processor.ProcessAsync(previous.Result))
.Unwrap()))); // We need to unwrap Task{T} from Task{Task{T}};
The missing part: I need a control mechanism which controls how many items (max) can be in the pipeline at any given time.
For example, if max parallel processings is 3, then that would result in the following workflow:
- Item 1 is fetched and passed to the processors.
- Item 2 is fetched and passed to the processors.
- Item 3 is fetched and passed to the processors.
- Item 1 completed processing.
- Item 4 is fetched and passed to the processors.
- Item 3 completed processing.
- Item 5 is fetched and passed to the processors.
- Etc...