Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I want to ensure that, if my eventhub client crashes (currently a console application), it only picks up events it has not yet taken from the eventhub. One way to achieve this, is to exploit offsets. However, this (to my understanding) requires the client to store the latest offset (besides events do not necessarily seem to hit the foreach loop of the ProcessEventsAsync method ordered by SequenceNumber).

An alternative, is to use checkpoints. I think they are persisted via the server (eventhub) using the provided storage account credentials. Is this correct?

This is some preliminary code I am currently using:

public class SimpleEventProcessor : IEventProcessor
{
    private Stopwatch _checkpointStopWatch;

    async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine("Processor Shutting Down. Partition '{0}', Reason: '{1}'.", context.Lease.PartitionId, reason);
        if (reason == CloseReason.Shutdown)
        {
            await context.CheckpointAsync();
        }
    }

    Task IEventProcessor.OpenAsync(PartitionContext context)
    {
        Console.WriteLine("SimpleEventProcessor initialized.  Partition: '{0}', Offset: '{1}'", context.Lease.PartitionId, context.Lease.Offset);
        _checkpointStopWatch = new Stopwatch();
        _checkpointStopWatch.Start();
        return Task.FromResult<object>(null);
    }

    async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
    {
        foreach (var eventData in messages)
        {
            // do something                    
        }

        //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts.
        if (_checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5))
        {
            await context.CheckpointAsync();
            _checkpointStopWatch.Restart();
        }
    }
}

I believe it sends creates a checkpoint to the server every 5 minutes. How does the server know, which client has submitted the checkpoint (via the context)? Also, how can I prevent events from processed again if the client restarts? Furthermore, there could still be an up to 5 minutes window in which events are processed again. Perhaps I should rather use a queue/topic given my requirement?

PS:

This seems to be sufficient:

async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
    foreach (var eventData in messages)
    {
        // do something
    }
    await context.CheckpointAsync();
}
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
266 views
Welcome To Ask or Share your Answers For Others

1 Answer

Lemme put forward a few basic terminology before answering:

EventHubs is high-thruput durable event ingestion pipeline. Simply put - its a reliable stream of events on Cloud.

Offset on EventData (one Event in the stream) is literally a Cursor on the Stream. Having this Cursor - will enable operations like - restart reading from this cursor (aka Offset) - inclusive or exclusive.

EventProcessor library is a framework that EventHubs team built, on-Top-of ServiceBus SDK to make "eventhub receiver gu" - look easier. ZooKeeper for Kafka <-> EPH for Event Hub. It will make sure when the process running EventProcessor on a specific partition dies/crashes - it will be resumed from last Checkpointed offset - in other available EventProcessorHost instance.

CheckPoint : as of today - EventHubs only supports client-side check-pointing. When you call Checkpoint from your Client-code:

await context.CheckpointAsync();

- it will translate to a Storage call (directly from Client) - which will store the current offset in the storage account you provided. EventHubs Service will not talk to Storage for Check-pointing.

THE ANSWER

EventProcessor framework is meant to achieve exactly what you are looking for.

Checkpoints are not persisted via Server (aka EVENTHUBS Service). Its purely client-side. You are talking to Azure storage. That's the reason EventProcessor library brings in a new additional dependency - AzureStorageClient. You can connect to the storage account & the container to which the checkpoints are written to - we maintain the ownership information - EPH instances (name) to Partitions of EventHubs they own and at what checkpoint they currently read/processed until.

As per the timer based checkpoint'ing pattern - you originally had - if the Process goes down - you will re-do the events in last 5 minute window. This is a healthy pattern as:

  1. fundamental assumption is that Faults are rare events - so you will deal with duplicate events rarely
  2. you will end-up make less calls to Storage service (which you could easily overwhelm by check-pointing frequently). I would go one step further and actually, would fire checkpoint call asynchronously. OnProcessEvents need not fail if checkpoint fails!

if you want absolutely no-events to repeat - you will need to build this de-duplication logic in the down-stream pipeline.

  • every time the EventProcessorImpl starts - query your downstream for the last sequence no. it got and keep discarding events until the current sequence no.

here's more general reading on Event Hubs...


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...