Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I am trying to figure out what the best way of working with a queue will be. I have a process that returns a DataTable. Each DataTable, in turn, is merged with the previous DataTable. There is one problem, too many records to hold until the final BulkCopy (OutOfMemory).

So, I have determined that I should process each incoming DataTable immediately. Thinking about the ConcurrentQueue<T>...but I don't see how the WriteQueuedData() method would know to dequeue a table and write it to the database.

For instance:

public class TableTransporter
{
    private ConcurrentQueue<DataTable> tableQueue = new ConcurrentQueue<DataTable>();

    public TableTransporter()
    {
        tableQueue.OnItemQueued += new EventHandler(WriteQueuedData);   // no events available
    }

    public void ExtractData()
    {
        DataTable table;

        // perform data extraction
        tableQueue.Enqueue(table);
    }

    private void WriteQueuedData(object sender, EventArgs e)
    {
        BulkCopy(e.Table);
    }
}

My first question is, aside from the fact that I don't actually have any events to subscribe to, if I call ExtractData() asynchronously will this be all that I need? Second, is there something I'm missing about the way ConcurrentQueue<T> functions and needing some form of trigger to work asynchronously with the queued objects?

Update I have just derived a class from ConcurrentQueue<T> that has an OnItemQueued event handler. Then:

new public void Enqueue (DataTable Table)
{
    base.Enqueue(Table);
    OnTableQueued(new TableQueuedEventArgs(Table));
}

public void OnTableQueued(TableQueuedEventArgs table)
{
    EventHandler<TableQueuedEventArgs> handler = TableQueued;

    if (handler != null)
    {
        handler(this, table);
    }
}

Any concerns about this implementation?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
297 views
Welcome To Ask or Share your Answers For Others

1 Answer

From my understanding of the problem, you are missing a few things.

The concurrent queue is a data structure designed to accept multiple threads reading and writing to the queue without you needing to explicitly lock the data structure. (All that jazz is taken care of behind the scenes, or the collection is implemented in such a way that it doesn't need to take a lock.)

With that in mind, it looks like the pattern you are trying to use is the "Produce/Consumer". First, you have some tasks producing work (and adding items to the queue). And second you have a second task Consuming things from the queue (and dequeing items).

So really you want two threads: one adding items and a second removing items. Because you are using a concurrent collection, you can have multiple threads adding items and multiple threads removing items. But obviously the more contention you have on the concurrent queue the quicker that will become the bottleneck.


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...