Skip to content

Commit

Permalink
Add local-python execution and dlt integration to opendbt
Browse files Browse the repository at this point in the history
  • Loading branch information
ismailsimsek committed Mar 2, 2025
1 parent ac1733a commit 4fbc0cd
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 63 deletions.
1 change: 1 addition & 0 deletions opendbt/dbt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from opendbt.dbt.shared.cli.main import sqlfluff
from opendbt.dbt.shared.cli.main import sqlfluff_lint
from opendbt.dbt.shared.cli.main import sqlfluff_fix
from opendbt.dbt.shared.adapters.impl import OpenDbtBaseAdapter

# dbt imports
from dbt.cli.main import dbtRunner as DbtCliRunner
Expand Down
Empty file.
54 changes: 54 additions & 0 deletions opendbt/dbt/shared/adapters/impl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import importlib
import sys
import tempfile
from typing import Dict

from dbt.adapters.base import available, BaseAdapter

from opendbt.runtime_patcher import PatchClass


@PatchClass(module_name="dbt.adapters.base", target_name="BaseAdapter")
class OpenDbtBaseAdapter(BaseAdapter):

def _execute_python_model(self, model_name: str, compiled_code: str, **kwargs):
with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt_obj = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt_obj, **kwargs)
model_file.close()

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()
self._execute_python_model(model_name=parsed_model['name'], compiled_code=compiled_code,
session=connection.handle)

@available
def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

import dlt
# IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model!
_pipeline = dlt.pipeline(
pipeline_name=str(parsed_model['unique_id']).replace(".", "-"),
destination=dlt.destinations.duckdb(connection.handle._env.conn),
dataset_name=parsed_model['schema'],
dev_mode=False,
)
self._execute_python_model(model_name=parsed_model['name'], compiled_code=compiled_code, pipeline=_pipeline)
64 changes: 1 addition & 63 deletions opendbt/examples.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,11 @@
import importlib
import logging
import sys
import tempfile
from multiprocessing.context import SpawnContext
from typing import Dict

from dbt.adapters.base import available
from dbt.adapters.duckdb import DuckDBAdapter


class DuckDBAdapterV2Custom(DuckDBAdapter):

@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# load and execute python code!
model_name = parsed_model['name']
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt = module.dbtObj(None)
# Access and call `model` function of the model!
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, session=connection.handle)
model_file.close()

@available
def submit_local_dlt_job(self, parsed_model: Dict, compiled_code: str):
connection = self.connections.get_if_exists()
if not connection:
connection = self.connections.get_thread_connection()

import dlt
# IMPORTANT: here we are pre-configuring and preparing dlt.pipeline for the model!
_pipeline = dlt.pipeline(
pipeline_name=str(parsed_model['unique_id']).replace(".", "-"),
destination=dlt.destinations.duckdb(connection.handle._env.conn),
dataset_name=parsed_model['schema'],
dev_mode=False,
)

with tempfile.NamedTemporaryFile(suffix=f'.py', delete=True) as model_file:
model_file.write(compiled_code.lstrip().encode('utf-8'))
model_file.flush()
print(f"Created temp py file {model_file.name}")
# load and execute python code!
model_name = parsed_model['name']
# Load the module spec
spec = importlib.util.spec_from_file_location(model_name, model_file.name)
# Create a module object
module = importlib.util.module_from_spec(spec)
# Load the module
sys.modules[model_name] = module
spec.loader.exec_module(module)
dbt = module.dbtObj(None)
# IMPORTANT: here we are passing down duckdb session from the adapter to the model
module.model(dbt=dbt, pipeline=_pipeline)
model_file.close()
pass

# NOTE! used for testing
class DuckDBAdapterTestingOnlyDbt17(DuckDBAdapter):
Expand Down

0 comments on commit 4fbc0cd

Please sign in to comment.