Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add local-python execution and dlt integration to opendbt #61

Merged
merged 1 commit into from
Mar 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opendbt/dbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from opendbt.dbt.shared.cli.main import sqlfluff
from opendbt.dbt.shared.cli.main import sqlfluff_lint
from opendbt.dbt.shared.cli.main import sqlfluff_fix
from opendbt.dbt.shared.adapters.impl import OpenDbtBaseAdapter

# dbt imports
from dbt.cli.main import dbtRunner as DbtCliRunner
Expand Down
Empty file.
54 changes: 54 additions & 0 deletions opendbt/dbt/shared/adapters/impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import importlib
import sys
import tempfile
from typing import Dict

from dbt.adapters.base import available, BaseAdapter

from opendbt.runtime_patcher import PatchClass


@PatchClass(module_name="dbt.adapters.base", target_name="BaseAdapter")
class OpenDbtBaseAdapter(BaseAdapter):

def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs):
with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt_obj = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt_obj, **kwargs)
model_file.close()

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()
self._execute_python_model(model_name=parsed_model['name'], compiled_code=compiled_code,
session=connection.handle)

@available
def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

import dlt
# IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model!
_pipeline = dlt.pipeline(
pipeline_name=str(parsed_model['unique_id']).replace(".", "-"),
destination=dlt.destinations.duckdb(connection.handle._env.conn),
dataset_name=parsed_model['schema'],
dev_mode=False,
)
self._execute_python_model(model_name=parsed_model['name'], compiled_code=compiled_code, pipeline=_pipeline)
64 changes: 1 addition & 63 deletions opendbt/examples.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,11 @@
import importlib
import logging
import sys
import tempfile
from multiprocessing.context import SpawnContext
from typing import Dict

from dbt.adapters.base import available
from dbt.adapters.duckdb import DuckDBAdapter


class DuckDBAdapterV2Custom(DuckDBAdapter):

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# load and execute python code!
model_name = parsed_model['name']
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, session=connection.handle)
model_file.close()

@available
def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

import dlt
# IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model!
_pipeline = dlt.pipeline(
pipeline_name=str(parsed_model['unique_id']).replace(".", "-"),
destination=dlt.destinations.duckdb(connection.handle._env.conn),
dataset_name=parsed_model['schema'],
dev_mode=False,
)

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# load and execute python code!
model_name = parsed_model['name']
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt = module.dbtObj(None)
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, pipeline=_pipeline)
model_file.close()
pass

# NOTE! used for testing
class DuckDBAdapterTestingOnlyDbt17(DuckDBAdapter):
Expand Down