Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I need to construct TPL dataflow pipeline which will process a lot of messages. Because there are many messages I can not simply Post them into infinite queue of the BufferBlock or I will face memory issues. So I want to use BoundedCapacity = 1 option to disable the queue and use MaxDegreeOfParallelism to use parallel task processing since my TransformBlocks could take some time for each message. I also use PropagateCompletion to make all completion and fail to propagate down the pipeline.

But I'm facing the issue with error handling when error happened just right after the first message: calling await SendAsync simply switch my app into infinite waiting.

I've simplified my case to sample console app:

var data_buffer = new BufferBlock<int>(new DataflowBlockOptions
{
    BoundedCapacity = 1
});

var process_block = new ActionBlock<int>(x =>
{
    throw new InvalidOperationException();
}, new ExecutionDataflowBlockOptions
{
    MaxDegreeOfParallelism = 2,
    BoundedCapacity = 1
});

data_buffer.LinkTo(process_block,
    new DataflowLinkOptions { PropagateCompletion = true });

for (var k = 1; k <= 5; k++)
{
    await data_buffer.SendAsync(k);
    Console.WriteLine("Send: {0}", k);
}

data_buffer.Complete();

await process_block.Completion;
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
644 views
Welcome To Ask or Share your Answers For Others

1 Answer

This is expected behavior. If there's a fault "downstream", the error does not propagate "backwards" up the mesh. The mesh is expecting you to detect that fault (e.g., via process_block.Completion) and resolve it.

If you want to propagate errors backwards, you could have an await or continuation on process_block.Completion that faults the upstream block(s) if the downstream block(s) fault.

Note that this is not the only possible solution; you may want to rebuild that part of the mesh or link the sources to an alternative target. The source block(s) have not faulted, so they can just continue processing with a repaired mesh.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...