Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
menu search
person
Welcome To Ask or Share your Answers For Others

Categories

I have a thread pushing events to a PublishProcessor and then a piece of code that consumes the messages with backpressure:

    final PublishProcessor<String> publishProcessor = PublishProcessor.create();

    // a separate thread calls publishProcessor.onNext every 1s

    publishProcessor
        .onBackpressureBuffer(
            1,
            () -> System.out.println("Buffer full!"),
            BackpressureOverflowStrategy.DROP_OLDEST)
        .observeOn(Schedulers.newThread(), false, 1)
        .map(this::parseMessage)
        .subscribe(this::consume); // consume has a fixed delay of 5s

This works fine, however what I'd like is to be able to expose a function that would return a Flowable of already parsed messages. Like this:

final Flowable<String> parsedMessages = publishProcessor.map(this::parseMessage);

// and then somewhere else:

parsedMessages
    .onBackpressureBuffer(
        1,
        () -> System.out.println("Buffer full!"),
        BackpressureOverflowStrategy.DROP_OLDEST)
    .observeOn(Schedulers.newThread(), false, 1)
    .subscribe(this::consume);

the thing is, I don't want the messages to get parsed if there is no space left in the buffer. This is what happens in the first case when buffer is full:

> Publish msg
> Buffer full!

and this is the 2nd case:

> Publish msg
> Parse msg
> Buffer full!

is there a way to make backpressure work all the way through the flow, even on stages that are added before backpressure buffer is set up?


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
617 views
Welcome To Ask or Share your Answers For Others

1 Answer

等待大神答复

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
thumb_up_alt 0 like thumb_down_alt 0 dislike
Welcome to ShenZhenJia Knowledge Sharing Community for programmer and developer-Open, Learning and Share
...