From 339236875f610540a3b28e263bb604bc60394dd5 Mon Sep 17 00:00:00 2001 From: Simon Shillaker <554768+Shillaker@users.noreply.github.com> Date: Thu, 12 Aug 2021 16:35:19 +0200 Subject: [PATCH] Return error on failed endpoint request (#132) * Return an error on failed endpoint request * Hook up endpoint test * Endpoint test * Finish endpoint test * Fix endpoint API test --- include/faabric/endpoint/Endpoint.h | 6 +- .../faabric/endpoint/FaabricEndpointHandler.h | 4 +- include/faabric/scheduler/Scheduler.h | 2 - src/endpoint/Endpoint.cpp | 46 +++--- src/endpoint/FaabricEndpointHandler.cpp | 76 ++++++---- src/scheduler/Scheduler.cpp | 13 -- tests/test/endpoint/test_endpoint_api.cpp | 141 ++++++++++++++++++ tests/test/endpoint/test_handler.cpp | 57 +++++-- tests/utils/CMakeLists.txt | 1 + tests/utils/faabric_utils.h | 3 + tests/utils/http_utils.cpp | 62 ++++++++ 11 files changed, 333 insertions(+), 78 deletions(-) create mode 100644 tests/test/endpoint/test_endpoint_api.cpp create mode 100644 tests/utils/http_utils.cpp diff --git a/include/faabric/endpoint/Endpoint.h b/include/faabric/endpoint/Endpoint.h index 62448b13b..86679558b 100644 --- a/include/faabric/endpoint/Endpoint.h +++ b/include/faabric/endpoint/Endpoint.h @@ -13,12 +13,16 @@ class Endpoint Endpoint(int port, int threadCount); - void start(); + void start(bool awaitSignal = true); + + void stop(); virtual std::shared_ptr getHandler() = 0; private: int port = faabric::util::getSystemConfig().endpointPort; int threadCount = faabric::util::getSystemConfig().endpointNumThreads; + + Pistache::Http::Endpoint httpEndpoint; }; } diff --git a/include/faabric/endpoint/FaabricEndpointHandler.h b/include/faabric/endpoint/FaabricEndpointHandler.h index 02428a678..b7c25a1ce 100644 --- a/include/faabric/endpoint/FaabricEndpointHandler.h +++ b/include/faabric/endpoint/FaabricEndpointHandler.h @@ -15,9 +15,9 @@ class FaabricEndpointHandler : public Pistache::Http::Handler void onRequest(const Pistache::Http::Request& request, Pistache::Http::ResponseWriter response) override; - std::string handleFunction(const std::string& requestStr); + std::pair handleFunction(const std::string& requestStr); private: - std::string executeFunction(faabric::Message& msg); + std::pair executeFunction(faabric::Message& msg); }; } diff --git a/include/faabric/scheduler/Scheduler.h b/include/faabric/scheduler/Scheduler.h index 145fbfe7a..e0d1c9147 100644 --- a/include/faabric/scheduler/Scheduler.h +++ b/include/faabric/scheduler/Scheduler.h @@ -123,8 +123,6 @@ class Scheduler void flushLocally(); - std::string getMessageStatus(unsigned int messageId); - void setFunctionResult(faabric::Message& msg); faabric::Message getFunctionResult(unsigned int messageId, int timeout); diff --git a/src/endpoint/Endpoint.cpp b/src/endpoint/Endpoint.cpp index a84b0ce64..da8334164 100644 --- a/src/endpoint/Endpoint.cpp +++ b/src/endpoint/Endpoint.cpp @@ -9,47 +9,57 @@ namespace faabric::endpoint { Endpoint::Endpoint(int portIn, int threadCountIn) : port(portIn) , threadCount(threadCountIn) + , httpEndpoint(Pistache::Address(Pistache::Ipv4::any(), Pistache::Port(port))) {} -void Endpoint::start() +void Endpoint::start(bool awaitSignal) { - SPDLOG_INFO("Starting HTTP endpoint"); + SPDLOG_INFO("Starting HTTP endpoint on {}", port); // Set up signal handler sigset_t signals; - if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 || - sigaddset(&signals, SIGKILL) != 0 || sigaddset(&signals, SIGINT) != 0 || - sigaddset(&signals, SIGHUP) != 0 || sigaddset(&signals, SIGQUIT) != 0 || - pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) { + if (awaitSignal) { + if (sigemptyset(&signals) != 0 || sigaddset(&signals, SIGTERM) != 0 || + sigaddset(&signals, SIGKILL) != 0 || + sigaddset(&signals, SIGINT) != 0 || + sigaddset(&signals, SIGHUP) != 0 || + sigaddset(&signals, SIGQUIT) != 0 || + pthread_sigmask(SIG_BLOCK, &signals, nullptr) != 0) { - throw std::runtime_error("Install signal handler failed"); + throw std::runtime_error("Install signal handler failed"); + } } - Pistache::Address addr(Pistache::Ipv4::any(), Pistache::Port(this->port)); - // Configure endpoint auto opts = Pistache::Http::Endpoint::options() .threads(threadCount) .backlog(256) .flags(Pistache::Tcp::Options::ReuseAddr); - Pistache::Http::Endpoint httpEndpoint(addr); httpEndpoint.init(opts); // Configure and start endpoint httpEndpoint.setHandler(this->getHandler()); httpEndpoint.serveThreaded(); - // Wait for a signal - SPDLOG_INFO("Awaiting signal"); - int signal = 0; - int status = sigwait(&signals, &signal); - if (status == 0) { - SPDLOG_INFO("Received signal: {}", signal); - } else { - SPDLOG_INFO("Sigwait return value: {}", signal); + if (awaitSignal) { + // Wait for a signal + SPDLOG_INFO("Awaiting signal"); + int signal = 0; + int status = sigwait(&signals, &signal); + if (status == 0) { + SPDLOG_INFO("Received signal: {}", signal); + } else { + SPDLOG_INFO("Sigwait return value: {}", signal); + } + + httpEndpoint.shutdown(); } +} +void Endpoint::stop() +{ + SPDLOG_INFO("Shutting down endpoint on {}", port); httpEndpoint.shutdown(); } } diff --git a/src/endpoint/FaabricEndpointHandler.cpp b/src/endpoint/FaabricEndpointHandler.cpp index c765809bf..debc5314e 100644 --- a/src/endpoint/FaabricEndpointHandler.cpp +++ b/src/endpoint/FaabricEndpointHandler.cpp @@ -40,50 +40,70 @@ void FaabricEndpointHandler::onRequest(const Pistache::Http::Request& request, // Parse message from JSON in request const std::string requestStr = request.body(); - std::string responseStr = handleFunction(requestStr); + std::pair result = handleFunction(requestStr); PROF_END(endpointRoundTrip) - response.send(Pistache::Http::Code::Ok, responseStr); + Pistache::Http::Code responseCode = Pistache::Http::Code::Ok; + if (result.first > 0) { + responseCode = Pistache::Http::Code::Internal_Server_Error; + } + response.send(responseCode, result.second); } -std::string FaabricEndpointHandler::handleFunction( +std::pair FaabricEndpointHandler::handleFunction( const std::string& requestStr) { - std::string responseStr; + std::pair response; if (requestStr.empty()) { - responseStr = "Empty request"; + SPDLOG_ERROR("Faabric handler received empty request"); + response = std::make_pair(1, "Empty request"); } else { faabric::Message msg = faabric::util::jsonToMessage(requestStr); faabric::scheduler::Scheduler& sched = faabric::scheduler::getScheduler(); if (msg.isstatusrequest()) { - responseStr = sched.getMessageStatus(msg.id()); + SPDLOG_DEBUG("Processing status request"); + const faabric::Message result = + sched.getFunctionResult(msg.id(), 0); + if (result.type() == faabric::Message_MessageType_EMPTY) { + response = std::make_pair(0, "RUNNING"); + } else if (result.returnvalue() == 0) { + response = std::make_pair(0, "SUCCESS: " + result.outputdata()); + } else { + response = std::make_pair(1, "FAILED: " + result.outputdata()); + } } else if (msg.isexecgraphrequest()) { + SPDLOG_DEBUG("Processing execution graph request"); faabric::scheduler::ExecGraph execGraph = sched.getFunctionExecGraph(msg.id()); - responseStr = faabric::scheduler::execGraphToJson(execGraph); + response = + std::make_pair(0, faabric::scheduler::execGraphToJson(execGraph)); } else if (msg.type() == faabric::Message_MessageType_FLUSH) { SPDLOG_DEBUG("Broadcasting flush request"); sched.broadcastFlush(); + response = std::make_pair(0, "Flush sent"); } else { - responseStr = executeFunction(msg); + response = executeFunction(msg); } } - return responseStr; + return response; } -std::string FaabricEndpointHandler::executeFunction(faabric::Message& msg) +std::pair FaabricEndpointHandler::executeFunction( + faabric::Message& msg) { faabric::util::SystemConfig& conf = faabric::util::getSystemConfig(); if (msg.user().empty()) { - return "Empty user"; - } else if (msg.function().empty()) { - return "Empty function"; + return std::make_pair(1, "Empty user"); + } + + if (msg.function().empty()) { + return std::make_pair(1, "Empty function"); } // Set message ID and master host @@ -101,23 +121,25 @@ std::string FaabricEndpointHandler::executeFunction(faabric::Message& msg) // Await result on global bus (may have been executed on a different worker) if (msg.isasync()) { - return faabric::util::buildAsyncResponse(msg); - } else { - SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr); + return std::make_pair(0, faabric::util::buildAsyncResponse(msg)); + } - try { - const faabric::Message result = - sch.getFunctionResult(msg.id(), conf.globalMessageTimeout); - SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr); + SPDLOG_DEBUG("Worker thread {} awaiting {}", tid, funcStr); - if (result.sgxresult().empty()) { - return result.outputdata() + "\n"; - } else { - return faabric::util::getJsonOutput(result); - } - } catch (faabric::redis::RedisNoResponseException& ex) { - return "No response from function\n"; + try { + const faabric::Message result = + sch.getFunctionResult(msg.id(), conf.globalMessageTimeout); + SPDLOG_DEBUG("Worker thread {} result {}", tid, funcStr); + + if (result.sgxresult().empty()) { + return std::make_pair(result.returnvalue(), + result.outputdata() + "\n"); } + + return std::make_pair(result.returnvalue(), + faabric::util::getJsonOutput(result)); + } catch (faabric::redis::RedisNoResponseException& ex) { + return std::make_pair(1, "No response from function\n"); } } } diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index 4b13f1200..85934ea4f 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -784,19 +784,6 @@ faabric::Message Scheduler::getFunctionResult(unsigned int messageId, return msgResult; } -std::string Scheduler::getMessageStatus(unsigned int messageId) -{ - const faabric::Message result = getFunctionResult(messageId, 0); - - if (result.type() == faabric::Message_MessageType_EMPTY) { - return "RUNNING"; - } else if (result.returnvalue() == 0) { - return "SUCCESS: " + result.outputdata(); - } else { - return "FAILED: " + result.outputdata(); - } -} - faabric::HostResources Scheduler::getThisHostResources() { return thisHostResources; diff --git a/tests/test/endpoint/test_endpoint_api.cpp b/tests/test/endpoint/test_endpoint_api.cpp new file mode 100644 index 000000000..10f6a40cb --- /dev/null +++ b/tests/test/endpoint/test_endpoint_api.cpp @@ -0,0 +1,141 @@ +#include + +#include "faabric_utils.h" + +#include +#include +#include +#include +#include +#include + +using namespace Pistache; +using namespace faabric::scheduler; + +namespace tests { + +// This is a bit gnarly, we get "Address already in use" errors if we try to use +// the same port for each case, so we need to switch it every time. +static int port = 8080; + +class EndpointApiTestExecutor final : public Executor +{ + public: + EndpointApiTestExecutor(faabric::Message& msg) + : Executor(msg) + {} + + ~EndpointApiTestExecutor() {} + + int32_t executeTask( + int threadPoolIdx, + int msgIdx, + std::shared_ptr reqOrig) override + { + faabric::Message& msg = reqOrig->mutable_messages()->at(msgIdx); + + std::string funcStr = faabric::util::funcToString(msg, true); + + int returnVal = 0; + if (msg.function() == "valid") { + msg.set_outputdata( + fmt::format("Endpoint API test executed {}", msg.id())); + + } else if (msg.function() == "error") { + returnVal = 1; + msg.set_outputdata(fmt::format( + "Endpoint API returning {} for {}", returnVal, msg.id())); + } else { + throw std::runtime_error("Endpoint API error"); + } + + return returnVal; + } +}; + +class EndpointApiTestExecutorFactory : public ExecutorFactory +{ + protected: + std::shared_ptr createExecutor(faabric::Message& msg) override + { + return std::make_shared(msg); + } +}; + +class EndpointApiTestFixture : public SchedulerTestFixture +{ + public: + EndpointApiTestFixture() + { + executorFactory = std::make_shared(); + setExecutorFactory(executorFactory); + } + + ~EndpointApiTestFixture() {} + + protected: + std::shared_ptr executorFactory; +}; + +TEST_CASE_METHOD(EndpointApiTestFixture, + "Test requests to endpoint", + "[endpoint]") +{ + port++; + + faabric::endpoint::FaabricEndpoint endpoint(port, 2); + + std::thread serverThread([&endpoint]() { endpoint.start(false); }); + + // Wait for the server to start + SLEEP_MS(2000); + + std::string body; + int expectedReturnCode = 200; + std::string expectedResponseBody; + + SECTION("Empty request") + { + expectedReturnCode = 500; + expectedResponseBody = "Empty request"; + } + + SECTION("Valid request") + { + faabric::Message msg = faabric::util::messageFactory("foo", "valid"); + body = faabric::util::messageToJson(msg); + expectedReturnCode = 200; + expectedResponseBody = + fmt::format("Endpoint API test executed {}\n", msg.id()); + } + + SECTION("Error request") + { + faabric::Message msg = faabric::util::messageFactory("foo", "error"); + body = faabric::util::messageToJson(msg); + expectedReturnCode = 500; + expectedResponseBody = + fmt::format("Endpoint API returning 1 for {}\n", msg.id()); + } + + SECTION("Invalid function") + { + faabric::Message msg = faabric::util::messageFactory("foo", "junk"); + body = faabric::util::messageToJson(msg); + expectedReturnCode = 500; + expectedResponseBody = fmt::format( + "Task {} threw exception. What: Endpoint API error\n", msg.id()); + } + + std::pair result = + submitGetRequestToUrl(LOCALHOST, port, body); + REQUIRE(result.first == expectedReturnCode); + REQUIRE(result.second == expectedResponseBody); + + endpoint.stop(); + + if (serverThread.joinable()) { + serverThread.join(); + } +} +} diff --git a/tests/test/endpoint/test_handler.cpp b/tests/test/endpoint/test_handler.cpp index 5347c4557..964767a97 100644 --- a/tests/test/endpoint/test_handler.cpp +++ b/tests/test/endpoint/test_handler.cpp @@ -2,6 +2,9 @@ #include "faabric_utils.h" +#include +#include + #include #include #include @@ -9,10 +12,27 @@ using namespace Pistache; namespace tests { -TEST_CASE("Test valid calls to endpoint", "[endpoint]") + +class EndpointHandlerTestFixture : public SchedulerTestFixture { - cleanFaabric(); + public: + EndpointHandlerTestFixture() + { + executorFactory = + std::make_shared(); + setExecutorFactory(executorFactory); + } + + ~EndpointHandlerTestFixture() { executorFactory->reset(); } + + protected: + std::shared_ptr executorFactory; +}; +TEST_CASE_METHOD(EndpointHandlerTestFixture, + "Test valid calls to endpoint", + "[endpoint]") +{ // Note - must be async to avoid needing a result faabric::Message call = faabric::util::messageFactory("foo", "bar"); call.set_isasync(true); @@ -34,11 +54,13 @@ TEST_CASE("Test valid calls to endpoint", "[endpoint]") // Handle the function endpoint::FaabricEndpointHandler handler; - const std::string responseStr = handler.handleFunction(requestStr); + std::pair response = handler.handleFunction(requestStr); + + REQUIRE(response.first == 0); + std::string responseStr = response.second; // Check actual call has right details including the ID returned to the // caller - scheduler::Scheduler& sch = scheduler::getScheduler(); std::vector msgs = sch.getRecordedMessagesAll(); REQUIRE(msgs.size() == 1); faabric::Message actualCall = msgs.at(0); @@ -51,9 +73,10 @@ TEST_CASE("Test valid calls to endpoint", "[endpoint]") TEST_CASE("Test empty invocation", "[endpoint]") { endpoint::FaabricEndpointHandler handler; - std::string actual = handler.handleFunction(""); + std::pair actual = handler.handleFunction(""); - REQUIRE(actual == "Empty request"); + REQUIRE(actual.first == 1); + REQUIRE(actual.second == "Empty request"); } TEST_CASE("Test empty JSON invocation", "[endpoint]") @@ -77,21 +100,22 @@ TEST_CASE("Test empty JSON invocation", "[endpoint]") endpoint::FaabricEndpointHandler handler; const std::string& requestStr = faabric::util::messageToJson(call); - std::string actual = handler.handleFunction(requestStr); + std::pair actual = handler.handleFunction(requestStr); - REQUIRE(actual == expected); + REQUIRE(actual.first == 1); + REQUIRE(actual.second == expected); } -TEST_CASE("Check getting function status from endpoint", "[endpoint]") +TEST_CASE_METHOD(EndpointHandlerTestFixture, + "Check getting function status from endpoint", + "[endpoint]") { - cleanFaabric(); - - faabric::scheduler::Scheduler& sch = faabric::scheduler::getScheduler(); - // Create a message faabric::Message msg = faabric::util::messageFactory("demo", "echo"); + int expectedReturnCode = 0; std::string expectedOutput; + SECTION("Running") { expectedOutput = "RUNNING"; } SECTION("Failure") @@ -101,6 +125,8 @@ TEST_CASE("Check getting function status from endpoint", "[endpoint]") msg.set_returnvalue(1); sch.setFunctionResult(msg); + expectedReturnCode = 1; + expectedOutput = "FAILED: " + errorMsg; } @@ -118,8 +144,9 @@ TEST_CASE("Check getting function status from endpoint", "[endpoint]") endpoint::FaabricEndpointHandler handler; const std::string& requestStr = faabric::util::messageToJson(msg); - std::string actual = handler.handleFunction(requestStr); + std::pair actual = handler.handleFunction(requestStr); - REQUIRE(actual == expectedOutput); + REQUIRE(actual.first == expectedReturnCode); + REQUIRE(actual.second == expectedOutput); } } diff --git a/tests/utils/CMakeLists.txt b/tests/utils/CMakeLists.txt index 34e8fbe76..725700e64 100644 --- a/tests/utils/CMakeLists.txt +++ b/tests/utils/CMakeLists.txt @@ -6,6 +6,7 @@ set(LIB_FILES exec_graph_utils.cpp fixtures.h message_utils.cpp + http_utils.cpp system_utils.cpp faabric_utils.h ) diff --git a/tests/utils/faabric_utils.h b/tests/utils/faabric_utils.h index acc86cb73..b2a3f042d 100644 --- a/tests/utils/faabric_utils.h +++ b/tests/utils/faabric_utils.h @@ -71,4 +71,7 @@ void checkExecGraphNodeEquality(const scheduler::ExecGraphNode& nodeA, void checkExecGraphEquality(const scheduler::ExecGraph& graphA, const scheduler::ExecGraph& graphB); +std::pair submitGetRequestToUrl(const std::string& host, + int port, + const std::string& body); } diff --git a/tests/utils/http_utils.cpp b/tests/utils/http_utils.cpp new file mode 100644 index 000000000..b592e57b3 --- /dev/null +++ b/tests/utils/http_utils.cpp @@ -0,0 +1,62 @@ +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +using namespace Pistache; + +#define HTTP_REQ_TIMEOUT 5000 + +namespace tests { + +std::pair submitGetRequestToUrl(const std::string& host, + int port, + const std::string& body) + +{ + Http::Client client; + client.init(); + + std::string fullUrl = fmt::format("{}:{}", host, port); + SPDLOG_DEBUG("Making HTTP GET request to {}", fullUrl); + + // Set up the request and callbacks + auto requestBuilder = client.get(fullUrl); + if (!body.empty()) { + requestBuilder.body(body); + } + Async::Promise resp = + requestBuilder.timeout(std::chrono::milliseconds(HTTP_REQ_TIMEOUT)) + .send(); + + std::stringstream out; + Http::Code respCode; + resp.then( + [&](Http::Response response) { + respCode = response.code(); + out << response.body(); + }, + Async::Throw); + + // Make calls synchronous + Async::Barrier barrier(resp); + std::chrono::milliseconds timeout(HTTP_REQ_TIMEOUT); + barrier.wait_for(timeout); + + client.shutdown(); + + return std::make_pair((int)respCode, out.str()); +} +}