Skip to content

Commit

Permalink
Extend the SnowflakeConfig class to support environment variable loading
Browse files Browse the repository at this point in the history
  • Loading branch information
Francisco-Montanez committed Nov 18, 2024
1 parent a840e3c commit c199286
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 6 deletions.
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ readme = "README.md"
[tool.poetry.dependencies]
python = ">=3.9,<3.12"
snowflake-snowpark-python = "^1.24.0"
python-dotenv = "^1.0.1"


[build-system]
Expand Down
107 changes: 102 additions & 5 deletions src/forge.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import logging
import os
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Sequence, Union
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

import snowflake.connector
from dotenv import load_dotenv
from snowflake.connector import SnowflakeConnection
from snowflake.connector.errors import Error as SnowflakeError

Expand All @@ -22,17 +25,111 @@

@dataclass
class SnowflakeConfig:
"""Configuration for Snowflake connection."""
"""
Configuration for Snowflake connection.
Required Parameters:
account: Snowflake account identifier
user: Snowflake username
password: Snowflake password
Optional Parameters:
warehouse: Snowflake warehouse name
database: Snowflake database name
schema: Snowflake schema name
role: Snowflake role name
session_parameters: Additional session parameters
"""

account: str
user: str
password: str
warehouse: str
database: str
schema: str
warehouse: Optional[str] = None
database: Optional[str] = None
schema: Optional[str] = None
role: Optional[str] = None
session_parameters: Dict[str, Any] = field(default_factory=dict)

@classmethod
def from_env(
cls,
env_path: Optional[Union[str, Path]] = None,
env_prefix: str = "SNOWFLAKE_",
raise_if_missing: bool = True,
) -> SnowflakeConfig:
"""
Create a SnowflakeConfig instance from environment variables.
Args:
env_path: Optional path to .env file. If None, looks for .env in current directory
env_prefix: Prefix for environment variables (default: "SNOWFLAKE_")
raise_if_missing: Whether to raise an error if required variables are missing
Environment variables:
Required:
SNOWFLAKE_ACCOUNT: Snowflake account identifier
SNOWFLAKE_USER: Snowflake username
SNOWFLAKE_PASSWORD: Snowflake password
Optional:
SNOWFLAKE_WAREHOUSE: Snowflake warehouse name
SNOWFLAKE_DATABASE: Snowflake database name
SNOWFLAKE_SCHEMA: Snowflake schema name
SNOWFLAKE_ROLE: Snowflake role name
SNOWFLAKE_SESSION_PARAMETERS: JSON string of session parameters
Returns:
SnowflakeConfig: Configuration instance
Raises:
ValueError: If required environment variables are missing and raise_if_missing is True
"""
if env_path:
load_dotenv(env_path)
else:
load_dotenv()

required_vars = ["ACCOUNT", "USER", "PASSWORD"]
optional_vars = ["WAREHOUSE", "DATABASE", "SCHEMA", "ROLE"]

config_dict = {}
missing_vars = []

# Check required variables
for var in required_vars:
env_var = f"{env_prefix}{var}"
value = os.getenv(env_var)
if value is None:
missing_vars.append(env_var)
config_dict[var.lower()] = value

if missing_vars and raise_if_missing:
raise ValueError(
f"Missing required environment variables: {', '.join(missing_vars)}"
)

# Add optional variables
for var in optional_vars:
env_var = f"{env_prefix}{var}"
value = os.getenv(env_var)
if value is not None:
config_dict[var.lower()] = value

# Session parameters (optional JSON string)
session_params = os.getenv(f"{env_prefix}SESSION_PARAMETERS", "{}")
if session_params:
try:
import json

config_dict["session_parameters"] = json.loads(session_params)
except json.JSONDecodeError as e:
logger.warning(
f"Failed to parse {env_prefix}SESSION_PARAMETERS as JSON: {e}"
)
config_dict["session_parameters"] = {}

return cls(**config_dict)


class Forge:
"""
Expand Down

0 comments on commit c199286

Please sign in to comment.