Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Join remains Pending until both streams yield an item #12

Open
zeenix opened this issue Dec 27, 2022 · 15 comments · Fixed by #13
Open

Join remains Pending until both streams yield an item #12

zeenix opened this issue Dec 27, 2022 · 15 comments · Fixed by #13

Comments

@zeenix
Copy link
Collaborator

zeenix commented Dec 27, 2022

This code hangs:

            let stream1 = FromStream::with_ordering(pending::<Message>(), |m| m.serial);
            let stream2 = FromStream::with_ordering(
                futures_util::stream::once(Box::pin(async { Message { serial: 1 } })),
                |m| m.serial,
            );
            let mut joined = join(stream1, stream2);
            let msg = joined.next().await.unwrap();
            assert_eq!(msg.serial, 1);
@zeenix zeenix changed the title Join gets stuck in Pending state until both streams yield an item Join remains Pending until both streams yield an item Dec 27, 2022
@zeenix
Copy link
Collaborator Author

zeenix commented Dec 27, 2022

@danieldg Is this intentional? 🤔

zeenix added a commit to zeenix/ordered-stream that referenced this issue Dec 27, 2022
zeenix added a commit to zeenix/ordered-stream that referenced this issue Dec 27, 2022
zeenix added a commit to zeenix/ordered-stream that referenced this issue Dec 27, 2022
If one of the streams is ready and the other is pending, we should
just return the item from the ready stream. Otherwise, join will never
resolve until both streams are ready (which could be never).

Fixes danieldg#12.
zeenix added a commit to zeenix/ordered-stream that referenced this issue Dec 29, 2022
zeenix added a commit to zeenix/ordered-stream that referenced this issue Dec 29, 2022
If one of the streams is ready and the other is pending, we should
just return the item from the ready stream. Otherwise, join will never
resolve until both streams are ready (which could be never).

Fixes danieldg#12.
zeenix added a commit to zeenix/ordered-stream that referenced this issue Dec 29, 2022
If one of the streams is ready and the other is pending, we should
just return the item from the ready stream. Otherwise, join will never
resolve until both streams are ready (which could be never).

Fixes danieldg#12.
@danieldg
Copy link
Owner

danieldg commented Feb 5, 2023

@danieldg Is this intentional?

Yes, this was intentional. The fix (and therefore the 0.1.3 and 0.1.4 releases) will produce a stream with items out of order if one of the producing streams is being processed by another task or thread.

@danieldg danieldg reopened this Feb 5, 2023
@zeenix
Copy link
Collaborator Author

zeenix commented Feb 5, 2023

Yes, this was intentional. The fix (and therefore the 0.1.3 and 0.1.4 releases) will produce a stream with items out of order if one of the producing streams is being processed by another task or thread.

But this breaks zbus. I want items in order but that doesn't mean I want things to just hang forever to make it happen. I could add a timeout in the client but that's a bad workaround.

@zeenix zeenix reopened this Feb 5, 2023
@zeenix
Copy link
Collaborator Author

zeenix commented Feb 5, 2023

Also now that you're not the only one maintaining/developing this, please don't push directly to master and only via PRs, giving me a chance to have a look.

@zeenix
Copy link
Collaborator Author

zeenix commented Feb 5, 2023

Yes, this was intentional. The fix (and therefore the 0.1.3 and 0.1.4 releases) will produce a stream with items out of order if one of the producing streams is being processed by another task or thread.

I don't understand something: how will one of the streams be polled by another task if Join has take over all its streams and does the polling?

@danieldg
Copy link
Owner

danieldg commented Feb 5, 2023

Example:
Task 1: zbus socket reader, produces Message events on a broadcast channel B
Task 2: reads from B, filters, and sends messages to a new mpsc channel C
Task 3: filters B to produce D, then runs join(C, D)

Assume task 1 produces two messages M2 and M3: M3 is accepted by the filter in task 3, and M2 is accepted by the filter in task 2. If task 3 runs prior to task 2, then the join will observe D is ready (M3 is present) but C is Pending as task 2 has not yet run.

In this instance, the Join must wait until C either produces an item, because once task 2 runs, M2 will be available in C - but it will be Pending until that happens. This is the main difference between Pending and NoneBefore - only if C has returned NoneBefore is safe to return M3.

@zeenix
Copy link
Collaborator Author

zeenix commented Feb 5, 2023

Thanks for the example. I thought about it on the tram and realized that the issue is multiple producers. In case of zbus, this doesn't happen and we've a single producer. I guess this means, that within zbus I don't need to use UnorderedStream even.

@danieldg
Copy link
Owner

danieldg commented Feb 5, 2023

I think zbus shouldn't run into this as a visible problem, since any stream it provides should be correctly returning NoneBefore on any message serial numbers that have been sent.

@zeenix
Copy link
Collaborator Author

zeenix commented Feb 5, 2023

Well, you can run cargo test in zbus with git master of ordered-stream and see at least 3 tests falling. Did you read my comments on the related PR?

@zeenix
Copy link
Collaborator Author

zeenix commented Feb 5, 2023

Here is the problematic code.

@danieldg
Copy link
Owner

danieldg commented Feb 6, 2023

I've found the problem: because of how Subscriptions now works in zbus, the MessageStream is not able to update its last_seq field to match the last message considered. As a result, it never returns NoneBefore and so ordered-stream is correct in continuing to wait.

Fixing this is a bit trickier because you need to communicate new sequence numbers to all interested listeners, not just those whose rule matches. I'll take a shot at doing that.

@zeenix
Copy link
Collaborator Author

zeenix commented Feb 6, 2023

Fixing this is a bit trickier because you need to communicate new sequence numbers to all interested listeners, not just those whose rule matches.

That would be very unfortunate and beat one of the main points of the new match rule based API. As I wrote before, I'm not even sure we need this in zbus since we only have 1 producer running in a single task.

1 similar comment
@zeenix
Copy link
Collaborator Author

zeenix commented Feb 6, 2023

Fixing this is a bit trickier because you need to communicate new sequence numbers to all interested listeners, not just those whose rule matches.

That would be very unfortunate and beat one of the main points of the new match rule based API. As I wrote before, I'm not even sure we need this in zbus since we only have 1 producer running in a single task.

@guicostaarantes
Copy link

guicostaarantes commented Mar 18, 2023

Thanks for the discussion @danieldg and @zeenix. My use case for this crate is that I'm dealing with an event-driven system and I need to consume messages (the payloads of the events) from multiple Kafka topics in order to generate a database whose state is the accumulation of all events. But they need to be consumed orderly based on the Kafka timestamp rather than in a race, otherwise I risk getting an incorrect state.

With the current behavior, I can only consume an event once all streams have a message ready. This is good because in case one stream has an issue with connection, the others will wait and I'll be sure events are always being consumed in the right order. The not so good news is that I'll never reach a state where all my events are consumed. If one topic has 1,000 messages ready and the other one doesn't receive a message in a week, all 1,000 events will not be in the database during this week.

Both behaviors have advantages, but based on the fact that this crate is oriented towards guaranteeing the correct order of items, my understanding is that waiting for all to complete is what most users will expect and like. I'm trying to fix my scenario by making the event streams return Option<KafkaMessage> instead of KafkaMessage, where receiving None indicates the connection is healthy but there are no new messages available.

@danieldg
Copy link
Owner

danieldg commented Mar 19, 2023

@guicostaarantes that is actually one of the use-cases that I considered when making this crate. My best thought for a solution for the "long-idle stream" problem is to have each stream produce periodic (hourly or whatever) heartbeat messages that serve to show that the stream is alive but nothing is ready to be sent. If the original stream is adapted using FromStream and heartbeat messages are filtered out by OrderedStreamExt::filter, this produces the behavior you describe without actually making the heartbeats visible in the combined stream.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants