From 4fbc0cd6911512157c96db4e33402c0b83ab7f88 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sun, 2 Mar 2025 16:16:26 +0100 Subject: [PATCH] Add local-python execution and dlt integration to opendbt --- opendbt/dbt/__init__.py | 1 + opendbt/dbt/shared/adapters/__init__.py | 0 opendbt/dbt/shared/adapters/impl.py | 54 +++++++++++++++++++++ opendbt/examples.py | 64 +------------------------ 4 files changed, 56 insertions(+), 63 deletions(-) create mode 100644 opendbt/dbt/shared/adapters/__init__.py create mode 100644 opendbt/dbt/shared/adapters/impl.py diff --git a/opendbt/dbt/__init__.py b/opendbt/dbt/__init__.py index fb9286b..ca6a6fb 100644 --- a/opendbt/dbt/__init__.py +++ b/opendbt/dbt/__init__.py @@ -26,6 +26,7 @@ from opendbt.dbt.shared.cli.main import sqlfluff from opendbt.dbt.shared.cli.main import sqlfluff_lint from opendbt.dbt.shared.cli.main import sqlfluff_fix + from opendbt.dbt.shared.adapters.impl import OpenDbtBaseAdapter # dbt imports from dbt.cli.main import dbtRunner as DbtCliRunner diff --git a/opendbt/dbt/shared/adapters/__init__.py b/opendbt/dbt/shared/adapters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/opendbt/dbt/shared/adapters/impl.py b/opendbt/dbt/shared/adapters/impl.py new file mode 100644 index 0000000..4b22204 --- /dev/null +++ b/opendbt/dbt/shared/adapters/impl.py @@ -0,0 +1,54 @@ +import importlib +import sys +import tempfile +from typing import Dict + +from dbt.adapters.base import available, BaseAdapter + +from opendbt.runtime_patcher import PatchClass + + +@PatchClass(module_name="dbt.adapters.base", target_name="BaseAdapter") +class OpenDbtBaseAdapter(BaseAdapter): + + def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs): + with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file: + model_file.write(compiled_code.lstrip().encode('utf-8')) + model_file.flush() + print(f"Created temp py file {model_file.name}") + # Load the module spec + spec = importlib.util.spec_from_file_location(model_name, model_file.name) + # Create a module object + module = importlib.util.module_from_spec(spec) + # Load the module + sys.modules[model_name] = module + spec.loader.exec_module(module) + dbt_obj = module.dbtObj(None) + # Access and call `model` function of the model! + # IMPORTANT: here we are passing down duckdb session from the adapter to the model + module.model(dbt=dbt_obj, **kwargs) + model_file.close() + + @available + def submit_local_python_job(self, parsed_model: Dict, compiled_code: str): + connection = self.connections.get_if_exists() + if not connection: + connection = self.connections.get_thread_connection() + self._execute_python_model(model_name=parsed_model['name'], compiled_code=compiled_code, + session=connection.handle) + + @available + def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str): + connection = self.connections.get_if_exists() + if not connection: + connection = self.connections.get_thread_connection() + + import dlt + # IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model! + _pipeline = dlt.pipeline( + pipeline_name=str(parsed_model['unique_id']).replace(".", "-"), + destination=dlt.destinations.duckdb(connection.handle._env.conn), + dataset_name=parsed_model['schema'], + dev_mode=False, + ) + self._execute_python_model(model_name=parsed_model['name'], compiled_code=compiled_code, pipeline=_pipeline) diff --git a/opendbt/examples.py b/opendbt/examples.py index 539a2bf..0b009c6 100644 --- a/opendbt/examples.py +++ b/opendbt/examples.py @@ -1,73 +1,11 @@ -import importlib import logging -import sys -import tempfile from multiprocessing.context import SpawnContext -from typing import Dict -from dbt.adapters.base import available from dbt.adapters.duckdb import DuckDBAdapter class DuckDBAdapterV2Custom(DuckDBAdapter): - - @available - def submit_local_python_job(self, parsed_model: Dict, compiled_code: str): - connection = self.connections.get_if_exists() - if not connection: - connection = self.connections.get_thread_connection() - - with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file: - model_file.write(compiled_code.lstrip().encode('utf-8')) - model_file.flush() - print(f"Created temp py file {model_file.name}") - # load and execute python code! - model_name = parsed_model['name'] - # Load the module spec - spec = importlib.util.spec_from_file_location(model_name, model_file.name) - # Create a module object - module = importlib.util.module_from_spec(spec) - # Load the module - sys.modules[model_name] = module - spec.loader.exec_module(module) - dbt = module.dbtObj(None) - # Access and call `model` function of the model! - # IMPORTANT: here we are passing down duckdb session from the adapter to the model - module.model(dbt=dbt, session=connection.handle) - model_file.close() - - @available - def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str): - connection = self.connections.get_if_exists() - if not connection: - connection = self.connections.get_thread_connection() - - import dlt - # IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model! - _pipeline = dlt.pipeline( - pipeline_name=str(parsed_model['unique_id']).replace(".", "-"), - destination=dlt.destinations.duckdb(connection.handle._env.conn), - dataset_name=parsed_model['schema'], - dev_mode=False, - ) - - with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file: - model_file.write(compiled_code.lstrip().encode('utf-8')) - model_file.flush() - print(f"Created temp py file {model_file.name}") - # load and execute python code! - model_name = parsed_model['name'] - # Load the module spec - spec = importlib.util.spec_from_file_location(model_name, model_file.name) - # Create a module object - module = importlib.util.module_from_spec(spec) - # Load the module - sys.modules[model_name] = module - spec.loader.exec_module(module) - dbt = module.dbtObj(None) - # IMPORTANT: here we are passing down duckdb session from the adapter to the model - module.model(dbt=dbt, pipeline=_pipeline) - model_file.close() + pass # NOTE! used for testing class DuckDBAdapterTestingOnlyDbt17(DuckDBAdapter):