Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I'm writing a Windows Service that will kick off multiple worker threads that will listen to Amazon SQS queues and process messages. There will be about 20 threads listening to 10 queues.

The threads will have to be always running and that's why I'm leaning towards to actually using actual threads for the worker loops rather than threadpool threads.

Here is a top level implementation. Windows service will kick off multiple worker threads and each will listen to it's queue and process messages.

protected override void OnStart(string[] args)
{
     for (int i = 0; i < _workers; i++)
   {
      new Thread(RunWorker).Start();
   }
}

Here is the implementation of the work

public async void RunWorker()
{
  while(true)
  {
    // .. get message from amazon sqs sync.. about 20ms
    var message = sqsClient.ReceiveMessage();

    try
    {
       await PerformWebRequestAsync(message);
       await InsertIntoDbAsync(message);
    }
    catch(SomeExeception)
    {
       // ... log
       //continue to retry
       continue;
    }
    sqsClient.DeleteMessage();
  }
}

I know I can perform the same operation with Task.Run and execute it on the threadpool thread rather than starting individual thread, but I don't see a reason for that since each thread will always be running.

Do you see any problems with this implementation? How reliable would it be to leave threads always running in this fashion and what can I do to make sure that each thread is always running?

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
105 views
Welcome To Ask or Share your Answers For Others

1 Answer

One problem with your existing solution is that you call your RunWorker in a fire-and-forget manner, albeit on a new thread (i.e., new Thread(RunWorker).Start()).

RunWorker is an async method, it will return to the caller when the execution point hits the first await (i.e. await PerformWebRequestAsync(message)). If PerformWebRequestAsync returns a pending task, RunWorker returns and the new thread you just started terminates.

I don't think you need a new thread here at all, just use AmazonSQSClient.ReceiveMessageAsync and await its result. Another thing is that you shouldn't be using async void methods unless you really don't care about tracking the state of the asynchronous task. Use async Task instead.

Your code might look like this:

List<Task> _workers = new List<Task>();
CancellationTokenSource _cts = new CancellationTokenSource();

protected override void OnStart(string[] args)
{
  for (int i = 0; i < _MAX_WORKERS; i++)
  {
    _workers.Add(RunWorkerAsync(_cts.Token)); 
  }
}

public async Task RunWorkerAsync(CancellationToken token)
{
  while(true)
  {
    token.ThrowIfCancellationRequested();

    // .. get message from amazon sqs sync.. about 20ms
    var message = await sqsClient.ReceiveMessageAsync().ConfigureAwait(false);

    try
    {
       await PerformWebRequestAsync(message);
       await InsertIntoDbAsync(message);
    }
    catch(SomeExeception)
    {
       // ... log
       //continue to retry
       continue;
    }
    sqsClient.DeleteMessage();
  }
}

Now, to stop all pending workers, you could simple do this (from the main "request dispatcher" thread):

_cts.Cancel();
try
{
    Task.WaitAll(_workers.ToArray()); 
}
catch (AggregateException ex) 
{
    ex.Handle(inner => inner is OperationCanceledException);
}

Note, ConfigureAwait(false) is optional for Windows Service, because there's no synchronization context on the initial thread, by default. However, I'd keep it that way to make the code independent of the execution environment (for cases where there is synchronization context).

Finally, if for some reason you cannot use ReceiveMessageAsync, or you need to call another blocking API, or simply do a piece of CPU intensive work at the beginning of RunWorkerAsync, just wrap it with Task.Run (as opposed to wrapping the whole RunWorkerAsync):

var message = await Task.Run(
    () => sqsClient.ReceiveMessage()).ConfigureAwait(false);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share

548k questions

547k answers

4 comments

86.3k users

...