From fa62d3eaf6160e6b972516c6168dc070d3780e6e Mon Sep 17 00:00:00 2001 From: Greg Medding Date: Mon, 12 Aug 2024 18:42:52 -0700 Subject: [PATCH] Removing all modes other than pub/sub As of today, the Zenoh query system does not support all required uProtocol use cases, such as capture and playback. In order to prioritize feature completeness over optimizations, we have decided to switch to using only the pub/sub mechanism in Zenoh for now. closes #109 --- .../up-transport-zenoh-cpp/ZenohUTransport.h | 19 -- src/ZenohUTransport.cpp | 189 +----------------- 2 files changed, 3 insertions(+), 205 deletions(-) diff --git a/include/up-transport-zenoh-cpp/ZenohUTransport.h b/include/up-transport-zenoh-cpp/ZenohUTransport.h index 2cc7dbb..38074a4 100644 --- a/include/up-transport-zenoh-cpp/ZenohUTransport.h +++ b/include/up-transport-zenoh-cpp/ZenohUTransport.h @@ -113,35 +113,16 @@ struct ZenohUTransport : public UTransport { static v1::UMessage sampleToUMessage(const zenoh::Sample& sample); static v1::UMessage queryToUMessage(const zenoh::Query& query); - v1::UStatus registerRequestListener_(const std::string& zenoh_key, - CallableConn listener); - - v1::UStatus registerResponseListener_(const std::string& zenoh_key, - CallableConn listener); - v1::UStatus registerPublishNotificationListener_( const std::string& zenoh_key, CallableConn listener); - v1::UStatus sendRequest_(const std::string& zenoh_key, - const std::string& payload, - const v1::UAttributes& attributes); - - v1::UStatus sendResponse_(const std::string& payload, - const v1::UAttributes& attributes); - v1::UStatus sendPublishNotification_(const std::string& zenoh_key, const std::string& payload, const v1::UAttributes& attributes); zenoh::Session session_; - ThreadSafeMap rpc_callback_map_; - ThreadSafeMap> subscriber_map_; - - ThreadSafeMap> queryable_map_; - - ThreadSafeMap> query_map_; }; } // namespace uprotocol::transport diff --git a/src/ZenohUTransport.cpp b/src/ZenohUTransport.cpp index 932730e..b25f690 100644 --- a/src/ZenohUTransport.cpp +++ b/src/ZenohUTransport.cpp @@ -185,46 +185,6 @@ ZenohUTransport::ZenohUTransport(const v1::UUri& defaultUri, spdlog::info("ZenohUTransport init"); } -v1::UStatus ZenohUTransport::registerRequestListener_( - const std::string& zenoh_key, CallableConn listener) { - spdlog::info("registerRequestListener_: {}", zenoh_key); - - // NOTE: listener is captured by copy here so that it does not go out - // of scope when this function returns. - auto on_query = [this, listener](const zenoh::Query& query) mutable { - auto attributes = attachmentToUAttributes(query.get_attachment()); - auto id_str = - datamodel::serializer::uuid::AsString().serialize(attributes.id()); - - // TODO(sashacmc): Replace this workaround with `query.clone()` - // after zenohcpp 1.0.0-rc6 release - auto cloned_query = std::make_shared(nullptr); - z_query_clone(zenoh::detail::as_owned_c_ptr(*cloned_query), - zenoh::detail::loan(query)); - - query_map_.emplace(std::move(id_str), std::move(cloned_query)); - listener(queryToUMessage(query)); - }; - - auto on_drop = []() {}; - - auto queryable = session_.declare_queryable(zenoh_key, std::move(on_query), - std::move(on_drop)); - - queryable_map_.emplace(listener, std::move(queryable)); - - return v1::UStatus(); -} - -v1::UStatus ZenohUTransport::registerResponseListener_( - const std::string& zenoh_key, CallableConn listener) { - spdlog::info("registerResponseListener_: {}", zenoh_key); - - rpc_callback_map_.emplace(zenoh_key, listener); - - return v1::UStatus(); -} - v1::UStatus ZenohUTransport::registerPublishNotificationListener_( const std::string& zenoh_key, CallableConn listener) { spdlog::info("registerPublishNotificationListener_: {}", zenoh_key); @@ -243,93 +203,6 @@ v1::UStatus ZenohUTransport::registerPublishNotificationListener_( return v1::UStatus(); } -v1::UStatus ZenohUTransport::sendRequest_(const std::string& zenoh_key, - const std::string& payload, - const v1::UAttributes& attributes) { - spdlog::debug("sendRequest_: {}: {}", zenoh_key, payload); - zenoh::KeyExpr ke(zenoh_key); - auto ke_search = [&](const std::pair& pair) { - return zenoh::KeyExpr(pair.first).intersects(ke); - }; - - CallableConn resp_callback; - - if (auto resp_callback_opt = rpc_callback_map_.find_if(ke_search); - resp_callback_opt) { - spdlog::debug("sendRequest_: found callback for '{}'", zenoh_key); - resp_callback = *resp_callback_opt; - } else { - spdlog::error("sendRequest_: failed to find response callback for '{}'", - zenoh_key); - return uError(v1::UCode::UNAVAILABLE, - "failed to find response callback"); - } - auto on_reply = [=](const zenoh::Reply& reply) mutable { - spdlog::debug("on_reply for {}", zenoh_key); - if (reply.is_ok()) { - const auto& sample = reply.get_ok(); - spdlog::debug("resp_callback: {}", - sample.get_payload().deserialize()); - resp_callback(sampleToUMessage(sample)); - spdlog::debug("resp_callback: done"); - } else { - spdlog::error( - "on_reply got en error: {}", - reply.get_err().get_payload().deserialize()); - // TODO: error report - } - }; - - auto attachment = uattributesToAttachment(attributes); - - auto on_done = []() {}; - - try { - // -Wpedantic disallows named member initialization until C++20, - // so GetOptions needs to be explicitly created and passed with - // std::move() - zenoh::Session::GetOptions options; - options.target = Z_QUERY_TARGET_BEST_MATCHING; - options.consolidation = - zenoh::QueryConsolidation(Z_CONSOLIDATION_MODE_NONE); - options.payload = zenoh::Bytes::serialize(payload); - options.attachment = zenoh::Bytes::serialize(attachment); - session_.get(zenoh_key, "", std::move(on_reply), std::move(on_done), - std::move(options)); - } catch (const zenoh::ZException& e) { - return uError(v1::UCode::INTERNAL, e.what()); - } - - return v1::UStatus(); -} - -v1::UStatus ZenohUTransport::sendResponse_(const std::string& payload, - const v1::UAttributes& attributes) { - auto reqid_str = - datamodel::serializer::uuid::AsString().serialize(attributes.reqid()); - spdlog::debug("sendResponse_: {}: {}", reqid_str, payload); - std::shared_ptr query(nullptr); - if (auto query_opt = query_map_.find(reqid_str); query_opt) { - query = *query_opt; - } else { - spdlog::error("sendResponse_: query doesn't exist"); - return uError(v1::UCode::INTERNAL, "query doesn't exist"); - } - - spdlog::debug("sendResponse_ to query: {}", - query->get_keyexpr().as_string_view()); - auto attachment = uattributesToAttachment(attributes); - // -Wpedantic disallows named member initialization until C++20, - // so PutOptions needs to be explicitly created and passed with - // std::move() - zenoh::Query::ReplyOptions options = - zenoh::Query::ReplyOptions::create_default(); - options.attachment = zenoh::Bytes::serialize(attachment); - query->reply(query->get_keyexpr(), payload, std::move(options)); - - return v1::UStatus(); -} - v1::UStatus ZenohUTransport::sendPublishNotification_( const std::string& zenoh_key, const std::string& payload, const v1::UAttributes& attributes) { @@ -371,38 +244,7 @@ v1::UStatus ZenohUTransport::sendImpl(const v1::UMessage& message) { attributes.source(), attributes.sink()); } - switch (attributes.type()) { - case v1::UMessageType::UMESSAGE_TYPE_PUBLISH: { - return sendPublishNotification_(zenoh_key, payload, attributes); - } - case v1::UMessageType::UMESSAGE_TYPE_NOTIFICATION: { - return sendPublishNotification_(zenoh_key, payload, attributes); - } - case v1::UMessageType::UMESSAGE_TYPE_REQUEST: { - return sendRequest_(zenoh_key, payload, attributes); - } - case v1::UMessageType::UMESSAGE_TYPE_RESPONSE: { - return sendResponse_(payload, attributes); - } - // These sentinel values come from the protobuf compiler. - // They are illegal for the enum, but cause linting problems. - // In order to suppress the linting error, they need to - // be included in the switch-case statement. - // It is deemed acceptable to use an exception here because - // it is in the sending code. An exception would not be - // acceptable in receiving code. The correct strategy wopuld be - // to drop the message. - case v1::UMessageType::UMessageType_INT_MIN_SENTINEL_DO_NOT_USE_: - case v1::UMessageType::UMessageType_INT_MAX_SENTINEL_DO_NOT_USE_: - throw std::runtime_error( - "Sentinel values detected in attribute type switch-case"); - case v1::UMessageType::UMESSAGE_TYPE_UNSPECIFIED: - default: { - return uError(v1::UCode::INVALID_ARGUMENT, - "Wrong Message type in v1::UAttributes"); - } - } - return v1::UStatus(); + return sendPublishNotification_(zenoh_key, payload, attributes); } v1::UStatus ZenohUTransport::registerListenerImpl( @@ -410,37 +252,12 @@ v1::UStatus ZenohUTransport::registerListenerImpl( std::optional&& sink_filter) { std::string zenoh_key = toZenohKeyString(getEntityUri().authority_name(), source_filter, sink_filter); - if (!sink_filter) { - // When only a single filter is provided, this signals that the - // listener is for a pub/sub-like communication mode where then - // messages are expected to only have a source address. - registerPublishNotificationListener_(zenoh_key, listener); - } else { - // Otherwise, the filters could be for any communication mode. - // We can't use the UUri validators to determine what mode they - // are for because a) there is overlap in allowed values between - // modes and b) any filter is allowed to have wildcards present. - registerRequestListener_(zenoh_key, listener); - registerPublishNotificationListener_(zenoh_key, listener); - - if (sink_filter.has_value()) { - // zenoh_key for response listener should be in revert order - std::string zenoh_response_key = toZenohKeyString( - getEntityUri().authority_name(), *sink_filter, source_filter); - registerResponseListener_(zenoh_response_key, listener); - } - } - v1::UStatus status; - status.set_code(v1::UCode::OK); - return status; + return registerPublishNotificationListener_(zenoh_key, listener); } void ZenohUTransport::cleanupListener(CallableConn listener) { - if (subscriber_map_.erase(listener) > 0) { - return; - } - queryable_map_.erase(listener); + subscriber_map_.erase(listener); } } // namespace uprotocol::transport