From 548379990af8ae3bf90c05fb1428a826b90c8f05 Mon Sep 17 00:00:00 2001 From: Marat Omarov Date: Sat, 20 Jul 2024 18:47:16 +0300 Subject: [PATCH] Change mode of session files, add logs for session_gc(), fix session_gc() --- .github/workflows/Build.yml | 4 +- docs/test_session_gc.php | 244 ++++++++++++++++++++++++++++++++++++ runtime/sessions.cpp | 100 ++++++++++++++- 3 files changed, 341 insertions(+), 7 deletions(-) create mode 100644 docs/test_session_gc.php diff --git a/.github/workflows/Build.yml b/.github/workflows/Build.yml index 1be4cffda0..b6bb66761e 100644 --- a/.github/workflows/Build.yml +++ b/.github/workflows/Build.yml @@ -11,7 +11,7 @@ env: kphp_root_dir: /home/kitten/kphp kphp_polyfills_dir: /home/kitten/kphp/kphp-polyfills kphp_build_dir: /home/kitten/kphp/build - session_php_script: /home/kitten/kphp/docs/session_test.php + session_php_script: /home/kitten/kphp/docs/test_session_gc.php kphp_snippets_dir: /home/kitten/kphp/kphp-snippets kphp_snippets_jobs_dir: /home/kitten/kphp/kphp-snippets/JobWorkers @@ -156,7 +156,7 @@ jobs: - name: Compile the session_test project run: docker exec kphp-build-container-${{matrix.os}} bash -c - "cd ${{env.kphp_snippets_dir}}/JobWorkers/ && ${{env.kphp_root_dir}}/objs/bin/kphp2cpp --cxx ${{matrix.compiler}} session_test.php -I .. -M server" + "cd ${{env.kphp_snippets_dir}}/JobWorkers/ && ${{env.kphp_root_dir}}/objs/bin/kphp2cpp --cxx ${{matrix.compiler}} -I .. -M server test_session_gc.php" - name: Start the server run: docker exec kphp-build-container-${{matrix.os}} bash -c diff --git a/docs/test_session_gc.php b/docs/test_session_gc.php new file mode 100644 index 0000000000..99fd4e8737 --- /dev/null +++ b/docs/test_session_gc.php @@ -0,0 +1,244 @@ + */ + public $predefined_consts; + + /** @var mixed */ + public $session_array; + + /** @var bool */ + public $must_sleep; + + /** @var bool */ + public $must_call_gc; + + /** + * @param array $predefined_consts + * @param mixed $session_array + * @param false|string $id + * @param bool $must_sleep + * @param bool $must_call_gc + */ + public function __construct($predefined_consts, $session_array, $id, $must_sleep, $must_call_gc) { + $this->session_id = $id; + $this->predefined_consts = $predefined_consts; + $this->session_array = $session_array; + $this->must_sleep = $must_sleep; + $this->must_call_gc = $must_call_gc; + } + + function handleRequest(): ?\KphpJobWorkerResponse { + $response = new MyResponse(); + session_id($this->session_id); + + $response->start_time = microtime(true); + session_start($this->predefined_consts); + $response->session_status = (session_status() == 2); + $response->end_time = microtime(true); + + $response->session_id = session_id(); + if (!$response->session_status) { + return $response; + } + + $session_array_before = unserialize(session_encode()); + session_decode(serialize(array_merge($session_array_before, $this->session_array))); + // or: $_SESSION = array_merge($this->session_array, $_SESSION); + $response->session_array = unserialize(session_encode()); + // or: $response->session_array = $_SESSION; + + if ($this->must_sleep) { + usleep(4 * 100000); + } + + if ($this->must_call_gc) { + session_gc(); + } + + session_commit(); + + return $response; + } +} + +class MyResponse implements \KphpJobWorkerResponse { + /** @var float */ + public $start_time; + + /** @var float */ + public $end_time; + + /** @var bool */ + public $session_status; + + /** @var string|false */ + public $session_id; + + /** @var mixed */ + public $session_array; +} + + +if (PHP_SAPI !== 'cli' && isset($_SERVER["JOB_ID"])) { + handleKphpJobWorkerRequest(); +} else { + handleHttpRequest(); +} + +function handleHttpRequest() { + if (!\JobWorkers\JobLauncher::isEnabled()) { + echo "JOB WORKERS DISABLED at server start, use -f 2 --job-workers-ratio 0.5", "\n"; + return; + } + + $timeout = 4.5; + $to_write = ["first_message" => "hello"]; + $session_params = ["gc_maxlifetime" => 0]; + + $main_request = new MyRequest($session_params, $to_write, false, false, false); + $main_job_id = \JobWorkers\JobLauncher::start($main_request, $timeout); + + $main_response = wait($main_job_id); + $session_id = false; + + $is_response = ($main_response instanceof MyResponse); + var_dump($is_response); + + if ($main_response instanceof MyResponse) { + echo "\nOpened session:\n"; + var_dump($main_response->session_status); + var_dump($main_response->session_id); + var_dump($main_response->session_array); + $session_id = $main_response->session_id; + + var_dump($main_response->session_status); + var_dump($main_response->session_array == $to_write); + } else { + return; + } + + $session_params["gc_maxlifetime"] = 2000; + $add_request = new MyRequest($session_params, ["add_message" => "welcome"], false, false, false); + $add_job_id = \JobWorkers\JobLauncher::start($add_request, $timeout); + + $session_params["gc_maxlifetime"] = 1; + $main_request = new MyRequest($session_params, ["second_message" => "world"], $session_id, true, false); + $main_job_id = \JobWorkers\JobLauncher::start($main_request, $timeout); + $main_2_request = new MyRequest($session_params, ["third_message" => "buy"], $session_id, false, false); + $main_2_job_id = \JobWorkers\JobLauncher::start($main_2_request, $timeout); + + $new_request = new MyRequest($session_params, ["new_message" => "hi"], false, false, true); + $new_job_id = \JobWorkers\JobLauncher::start($new_request, $timeout); + + $add_response = wait($add_job_id); + $main_response = wait($main_job_id); + $main_2_response = wait($main_2_job_id); + $new_response = wait($new_job_id); + + $s_files = scandir($session_params["save_path"]); + if ($main_response instanceof MyResponse) { + echo "\nOpened session:\n"; + var_dump($main_response->session_status); + var_dump($main_response->session_id); + var_dump($main_response->session_array); + + $to_write["second_message"] = "world"; + var_dump($main_response->session_status); + var_dump($main_response->session_id == $session_id); + var_dump($main_response->session_array == $to_write); + var_dump(!in_array($main_response->session_id, $s_files)); + } + + if ($main_2_response instanceof MyResponse) { + echo "\nOpened session:\n"; + var_dump($main_2_response->session_status); + var_dump($main_2_response->session_id); + var_dump($main_2_response->session_array); + + $to_write["third_message"] = "buy"; + var_dump($main_2_response->session_status); + var_dump($main_2_response->session_id == $session_id); + var_dump($main_2_response->session_array == $to_write); + var_dump(!in_array($main_2_response->session_id, $s_files)); + } + + if ($add_response instanceof MyResponse) { + echo "\nOpened session:\n"; + var_dump($add_response->session_status); + var_dump($add_response->session_id); + var_dump($add_response->session_array); + + var_dump($add_response->session_status); + var_dump($add_response->session_id != $session_id); + var_dump($add_response->session_array == ["add_message" => "welcome"]); + var_dump(in_array($add_response->session_id, $s_files)); + } + + if ($new_response instanceof MyResponse) { + echo "\nOpened session:\n"; + var_dump($new_response->session_status); + var_dump($new_response->session_id); + var_dump($new_response->session_array); + + var_dump($new_response->session_status); + var_dump($new_response->session_id != $session_id); + var_dump($new_response->session_array == ["new_message" => "hi"]); + var_dump(in_array($new_response->session_id, $s_files)); + } +} + +function handleKphpJobWorkerRequest() { + $kphp_job_request = kphp_job_worker_fetch_request(); + if (!$kphp_job_request) { + warning("Couldn't fetch a job worker request"); + return; + } + + if ($kphp_job_request instanceof \JobWorkers\JobWorkerSimple) { + // simple jobs: they start, finish, and return the result + $kphp_job_request->beforeHandle(); + $response = $kphp_job_request->handleRequest(); + if ($response === null) { + warning("Job request handler returned null for " . get_class($kphp_job_request)); + return; + } + kphp_job_worker_store_response($response); + + } else if ($kphp_job_request instanceof \JobWorkers\JobWorkerManualRespond) { + // more complicated jobs: they start, send a result in the middle (here get get it) — and continue working + $kphp_job_request->beforeHandle(); + $kphp_job_request->handleRequest(); + if (!$kphp_job_request->wasResponded()) { + warning("Job request handler didn't call respondAndContinueExecution() manually " . get_class($kphp_job_request)); + } + + } else if ($kphp_job_request instanceof \JobWorkers\JobWorkerNoReply) { + // background jobs: they start and never send any result, just continue in the background and finish somewhen + $kphp_job_request->beforeHandle(); + $kphp_job_request->handleRequest(); + + } else { + warning("Got unexpected job request class: " . get_class($kphp_job_request)); + } +} diff --git a/runtime/sessions.cpp b/runtime/sessions.cpp index f397e966dd..23e2c3c539 100644 --- a/runtime/sessions.cpp +++ b/runtime/sessions.cpp @@ -12,6 +12,8 @@ #include "common/wrappers/to_array.h" #include "runtime/url.h" #include "runtime/math_functions.h" +// #include "common/smart_ptrs/unique_ptr_with_delete_function.h" +// #include "runtime/exec.h" namespace sessions { @@ -211,7 +213,7 @@ static bool session_open() { bool is_new = (!f$file_exists(get_sparam(S_PATH).to_string())) ? 1 : 0; fprintf(stderr, "[%d]\tOpening the session file %s\n", getpid(), get_sparam(S_PATH).to_string().c_str()); - set_sparam(S_FD, open_safe(get_sparam(S_PATH).to_string().c_str(), O_RDWR | O_CREAT, 0777)); + set_sparam(S_FD, open_safe(get_sparam(S_PATH).to_string().c_str(), O_RDWR | O_CREAT, 0666)); if (get_sparam(S_FD).to_int() < 0) { php_warning("Failed to open the file %s", get_sparam(S_PATH).to_string().c_str()); @@ -242,6 +244,9 @@ static bool session_open() { int ret_gc_lifetime = get_tag(get_sparam(S_PATH).to_string().c_str(), S_LIFETIME, NULL, 0); if (is_new or ret_ctime < 0) { // add the creation data to metadata of file + int is_session = 1; + set_tag(get_sparam(S_PATH).to_string().c_str(), "is_session", &is_session, sizeof(int)); + int ctime = std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count(); set_tag(get_sparam(S_PATH).to_string().c_str(), S_CTIME, &ctime, sizeof(int)); } @@ -325,7 +330,7 @@ static bool session_write() { fprintf(stderr, "[%d]\t(rewinding) Closed the file\n", getpid()); fprintf(stderr, "[%d]\t(rewinding) Opening the file\n", getpid()); - set_sparam(S_FD, open_safe(get_sparam(S_PATH).to_string().c_str(), O_RDWR, 0777)); + set_sparam(S_FD, open_safe(get_sparam(S_PATH).to_string().c_str(), O_RDWR, 0666)); fprintf(stderr, "[%d]\t(rewinding) Opened the file\n", getpid()); lock.l_type = F_WRLCK; fprintf(stderr, "[%d]\t(rewinding) Locking the file\n", getpid()); @@ -398,6 +403,11 @@ static bool session_expired(const string &path) { return false; } +// for lsof tests +// static void pclose_wrapper(FILE *f) { +// pclose(f); +// } + static int session_gc(const bool &immediate = false) { double prob = f$lcg_value() * get_sparam(S_DIVISOR).to_float(); double s_prob = get_sparam(S_PROBABILITY).to_float(); @@ -411,6 +421,15 @@ static int session_gc(const bool &immediate = false) { return -1; } + // reset the fd before changing the session directory + close_safe(get_sparam(S_FD).to_int()); + + fprintf(stderr, "\n\tScan the session dir:\n\t"); + for (const auto &filename : s_list.as_array()) { + fprintf(stderr, "%s, ", filename.get_value().to_string().c_str()); + } + fprintf(stderr, "\n\n"); + struct flock lock; lock.l_type = F_UNLCK; lock.l_whence = SEEK_SET; @@ -421,29 +440,100 @@ static int session_gc(const bool &immediate = false) { int result = 0; for (auto s = s_list.as_array().begin(); s != s_list.as_array().end(); ++s) { string path = s.get_value().to_string(); - if (path == string(".") or path == string("..")) { + if (path[0] == '.') { continue; } + path = string(get_sparam(S_DIR).to_string()).append(path); if (path == get_sparam(S_PATH).to_string()) { continue; } + + { // filter session files from others + int is_session, ret_is_session = get_tag(path.c_str(), "is_session", &is_session, sizeof(int)); + if (ret_is_session < 0) { + continue; + } + } + + // { // lsof tests (non-working) + // // dl::CriticalSectionGuard heap_guard; + // // // string cmd = string("lsof +D ").append(get_sparam(S_DIR).to_string()); + // // string cmd = string("lsof"); + // // fprintf(stderr, "lsof cmd: %s\n", cmd.c_str()); + // // vk::unique_ptr_with_delete_function fp{popen(cmd.c_str(), "r")}; + // // if (fp != nullptr) { + // // std::array buff = {}; + // // string result; + // // const int fd = fileno(fp.get()); + // // std::size_t bytes_read = 0; + // // while ((bytes_read = read(fd, buff.data(), buff.size())) > 0) { + // // result.append(buff.data(), bytes_read); + // // } + + // // auto ret = pclose(fp.release()); + // // if (WIFEXITED(ret)) { + // // ret = WEXITSTATUS(ret); + // // } + + // // fprintf(stderr, "lsof result:\n%s\n", result.c_str()); + // // } + + // string cmd = string("lsof +D ").append(get_sparam(S_DIR).to_string()); + // // string cmd = string("lsof -t ").append(path).append(string(" 2>&1")); + // fprintf(stderr, "lsof cmd: %s\n", cmd.c_str()); + // mixed lsof_result; + // int64_t lsof_code; + // f$exec(cmd, lsof_result, lsof_code); + // fprintf(stderr, "lsof return code is %d\nlsof result:\n", static_cast(lsof_code)); + // if (lsof_result.is_array()) { + // for (const auto &it : lsof_result.as_array()) { + // fprintf(stderr, "%s\n", it.get_value().to_string().c_str()); + // } + // } + // } + fprintf(stderr, "\n\tOpening the session file %s\n", s.get_value().to_string().c_str()); int fd; - if ((fd = open(path.c_str(), O_RDWR, 0777)) < 0) { + if ((fd = open_safe(path.c_str(), O_RDWR, 0666)) < 0) { php_warning("Failed to open file on path: %s", path.c_str()); continue; } - + fprintf(stderr, "\tOpened the session file %s\n", s.get_value().to_string().c_str()); + + fprintf(stderr, "\tUnlocking the session file %s\n", s.get_value().to_string().c_str()); if (fcntl(fd, F_SETLK, &lock) < 0) { + fprintf(stderr, "\tsession file %s is opened, skip\n", s.get_value().to_string().c_str()); + close_safe(fd); continue; } + fprintf(stderr, "\tUnlocked the session file %s\n", s.get_value().to_string().c_str()); + close_safe(fd); if (session_expired(path)) { + fprintf(stderr, "\tThe session %s is expired, call unlink()\n\n", s.get_value().to_string().c_str()); f$unlink(path); ++result; } } + + lock.l_type = F_WRLCK; + lock.l_pid = getpid(); + set_sparam(S_FD, open_safe(get_sparam(S_PATH).to_string().c_str(), O_RDWR, 0666)); + if (get_sparam(S_FD).to_int() < 0) { + php_warning("Failed to reopen the file %s after session_gc()", get_sparam(S_PATH).to_string().c_str()); + session_abort(); + } else { + fcntl(get_sparam(S_FD).to_int(), F_SETLKW, &lock); + } + + fprintf(stderr, "\n\tScan the session dir after gc():\n\t"); + s_list = f$scandir(get_sparam(S_DIR).to_string()); + for (const auto &filename : s_list.as_array()) { + fprintf(stderr, "%s, ", filename.get_value().to_string().c_str()); + } + fprintf(stderr, "\n\n"); + return result; }