Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QUESTION] Coroutine interface and blocking calls #616

Open
aligirayhanozbay opened this issue Jan 24, 2025 · 3 comments
Open

[QUESTION] Coroutine interface and blocking calls #616

aligirayhanozbay opened this issue Jan 24, 2025 · 3 comments

Comments

@aligirayhanozbay
Copy link

aligirayhanozbay commented Jan 24, 2025

Describe the problem
I'm trying to use the coroutine interface of redis++ to listen to incoming events on a consumer group using XREADGROUP using the block keyword (blocking until a message is received). When I receive an event, I need to kick off a timer to call XCLAIM on the messages I receive on the stream until my application is done with the item.

I noticed that the blocking XREADGROUP call causes my application to be able to do 1 thing at a time. Once I receive something from the XREADGROUP call, I can successfully co_await further commands (such as an XCLAIM), but once another blocking XREADGROUP call goes out, I can't do anything else until the XREADGROUP finishes. Hence, my application cannot submit the further XCLAIM commands. To better illustrate things, this is the scenario:

#include "xclaimresult.hpp"
#include "parse_args.hpp"

boost::cobalt::task<void> startXreadGroup(sw::redis::CoRedisCluster* redis, std::string stream, std::string consumerGroup, std::string consumerName) {
  std::vector<std::string> cmd {"XREADGROUP", "block", "0", "group", consumerGroup, consumerName, "streams", stream,
         ">"};
  /* prepare command */
  auto result = co_await redis->command(cmd.begin(), cmd.end());
}

boost::cobalt::task<XClaimResult> xclaim(XclaimArgs);

boost::cobalt::task<void> printXclaimResult(XclaimArgs a) {
   auto result = co_await xclaim(a);
   std::cout << result << "\n";
};

int main(int argc, char** argv) {
 auto [redis, stream, cg, consumer, xclaimArgs] = parseArgs(argc, argv);
 
  boost::asio::io_context ioc;

  boost::cobalt::spawn(ioc, startXreadGroup(redis, stream, cg, consumer), boost::asio::detached);
  
  boost::cobalt::spawn(ioc, printXclaimResult(xclaimArgs), boost::asio::detached);
  
  ioc.run();

}

The xclaim never happens, because xreadgroup is seemingly blocking further commands from being sent.

Are there any settings I can use to change this behaviour? Is this a limitation of the coroutine interface? Would the regular async interface be better for this purpose? Or do I need multiple client objects?

Environment:

  • OS: Ubuntu 22.04
  • Compiler: GCC 13
  • hiredis version: v1.2.0
  • redis-plus-plus version: 1.3.13

Additional context
I'm interacting with an Amazon MemoryDB cluster so I'm using sw::redis::CoRedisCluster. I tried creating the CoRedisCluster with a connection pool of size greater than 1, but the XREADGROUP call still completely blocks everything else.

@aligirayhanozbay aligirayhanozbay changed the title [QUESTION] [QUESTION] Coroutine interface and blocking calls Jan 24, 2025
@sewenew
Copy link
Owner

sewenew commented Jan 29, 2025

Sorry, but the coroutine feature is still experimental. And I have to say, I'm not quite familiar with C++ coroutine. I'd suggest you use the async interface, which is much stabler.

Regards

@aligirayhanozbay
Copy link
Author

aligirayhanozbay commented Jan 29, 2025 via email

@sewenew
Copy link
Owner

sewenew commented Jan 31, 2025

I checked Redis source code, once you send a blocking command (e.g. blpop), and the data is not ready (e.g. the list is empty), Redis blocks the connection. Any new commands on this connection are accumulated but not processed until the blocking operation finishes, either data is ready, or timeout reaches.

redis-plus-plus maintains a connection pool, by default the pool size is 1. When you send a command, redis-plus-plus randomly picks a connection from the pool. If the connection is in blocking state, i.e. some blocking command runs on this connection, any new commands will not be processed by Redis.

So if you want to send blocking commands, I'd suggest you use the sync API in some threads to avoid blocking the main thread.

Regards

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants