diff --git a/docs/worker/log_url.rst b/docs/worker/log_url.rst index 856a275b..78bdef21 100644 --- a/docs/worker/log_url.rst +++ b/docs/worker/log_url.rst @@ -9,5 +9,5 @@ The configuration parameter is: SYNCMASTER__SERVER__LOG_URL_TEMPLATE=https://grafana.example.com?correlation_id={{ correlation_id }}&run_id={{ run.id }} -You can search for each run by either its correlation id ``CORRELATION_CELERY_HEADER_ID`` in http headers or the ``Run.Id``. +You can search for each run by either its correlation id ``x-request-id`` in http headers or the ``Run.Id``. diff --git a/syncmaster/worker/settings/__init__.py b/syncmaster/worker/settings/__init__.py index d73c0e42..abddf14f 100644 --- a/syncmaster/worker/settings/__init__.py +++ b/syncmaster/worker/settings/__init__.py @@ -20,13 +20,9 @@ class WorkerSettings(BaseSettings): .. code-block:: bash - SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY + SYNCMASTER__WORKER__CREATE_SPARK_SESSION_FUNCTION=custom_syncmaster.spark.get_worker_spark_session """ - CORRELATION_CELERY_HEADER_ID: str = Field( - "CORRELATION_ID", - description="Header ID for correlation in Celery", - ) CREATE_SPARK_SESSION_FUNCTION: ImportString = Field( "syncmaster.worker.spark.get_worker_spark_session", description="Function to create Spark session for worker", @@ -51,9 +47,6 @@ class WorkerAppSettings(BaseSettings): .. code-block:: bash - # Example of setting a CORRELATION_CELERY_HEADER_ID via environment variable - SYNCMASTER__WORKER__CORRELATION_CELERY_HEADER_ID=CORRELATION_ID_CELERY - # Example of setting a database URL via environment variable SYNCMASTER__DATABASE__URL=postgresql+asyncpg://user:password@localhost:5432/dbname diff --git a/syncmaster/worker/transfer.py b/syncmaster/worker/transfer.py index a5428ff1..c2677c8b 100644 --- a/syncmaster/worker/transfer.py +++ b/syncmaster/worker/transfer.py @@ -2,10 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 from datetime import datetime, timezone -from asgi_correlation_id import correlation_id from asgi_correlation_id.extensions.celery import load_correlation_ids from celery import Celery -from celery.signals import after_setup_task_logger, before_task_publish, task_prerun +from celery.signals import after_setup_task_logger from celery.utils.log import get_task_logger from sqlalchemy import select from sqlalchemy.orm import Session, selectinload @@ -22,7 +21,6 @@ load_correlation_ids() WORKER_SETTINGS = WorkerAppSettings() -CORRELATION_CELERY_HEADER_ID = WORKER_SETTINGS.worker.CORRELATION_CELERY_HEADER_ID @celery.task(name="run_transfer_task", bind=True, track_started=True) @@ -85,18 +83,3 @@ def run_transfer(session: Session, run_id: int, settings: WorkerAppSettings): @after_setup_task_logger.connect def setup_loggers(*args, **kwargs): setup_logging(WorkerAppSettings().logging.get_log_config_path()) - - -@before_task_publish.connect() -def transfer_correlation_id(headers, *args, **kwargs) -> None: - # This is called before task.delay() finishes - # Here we're able to transfer the correlation ID via the headers kept in our backend - headers[CORRELATION_CELERY_HEADER_ID] = correlation_id.get() - - -@task_prerun.connect() -def load_correlation_id(task, *args, **kwargs) -> None: - # This is called when the worker picks up the task - # Here we're able to load the correlation ID from the headers - id_value = task.request.get(CORRELATION_CELERY_HEADER_ID) - correlation_id.set(id_value)