From a29f707700dec49e869df830b4d595730fd7c29c Mon Sep 17 00:00:00 2001 From: Shakleen Ishfar Date: Sat, 5 Oct 2024 09:55:31 -0400 Subject: [PATCH] Removed pipelines folder --- src/pipelines/__init__.py | 0 src/pipelines/cyclic_encoder.py | 16 --- .../data_preparation_pipeline_xgboost.py | 111 ------------------ test/pipelines/__init__.py | 0 test/pipelines/cyclic_encoder_test.py | 22 ---- .../data_preparation_pipeline_xgboost_test.py | 50 -------- 6 files changed, 199 deletions(-) delete mode 100644 src/pipelines/__init__.py delete mode 100644 src/pipelines/cyclic_encoder.py delete mode 100644 src/pipelines/data_preparation_pipeline_xgboost.py delete mode 100644 test/pipelines/__init__.py delete mode 100644 test/pipelines/cyclic_encoder_test.py delete mode 100644 test/pipelines/data_preparation_pipeline_xgboost_test.py diff --git a/src/pipelines/__init__.py b/src/pipelines/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/pipelines/cyclic_encoder.py b/src/pipelines/cyclic_encoder.py deleted file mode 100644 index f1ad023..0000000 --- a/src/pipelines/cyclic_encoder.py +++ /dev/null @@ -1,16 +0,0 @@ -import numpy as np -from sklearn.base import BaseEstimator, TransformerMixin - - -class CyclicEncoder(BaseEstimator, TransformerMixin): - def __init__(self, period: int): - self.period = period - - def fit(self, X, y=None): - return self - - def transform(self, X): - X = np.asarray(X) - sin_transform = np.sin(2 * np.pi * X / self.period) - cos_transform = np.cos(2 * np.pi * X / self.period) - return np.concatenate([sin_transform, cos_transform], axis=1) diff --git a/src/pipelines/data_preparation_pipeline_xgboost.py b/src/pipelines/data_preparation_pipeline_xgboost.py deleted file mode 100644 index f0fa71a..0000000 --- a/src/pipelines/data_preparation_pipeline_xgboost.py +++ /dev/null @@ -1,111 +0,0 @@ -import os -import pandas as pd -import numpy as np -from dataclasses import dataclass -import pyspark -from pyspark.sql import SparkSession -from delta import configure_spark_with_delta_pip -from sklearn.compose import ColumnTransformer -from sklearn.impute import SimpleImputer -from sklearn.pipeline import Pipeline -from sklearn.preprocessing import OneHotEncoder, StandardScaler -from typing import List -from sklearn.model_selection import TimeSeriesSplit - -from src.utils import read_delta, save_as_pickle -from src.pipelines.cyclic_encoder import CyclicEncoder - - -@dataclass -class DataPreparationPipelineXGBoostConfig: - root_delta_path: str = os.path.join("Data", "delta") - gold_delta_path: str = os.path.join(root_delta_path, "silver") - data_artifact_path: str = os.path.join("artifacts", "data") - pipeline_artifact_path: str = os.path.join( - "artifacts", "pipelines", "xgboost_data_pipeline" - ) - label_column_names = ["bike_demand", "dock_demand"] - categorical_column_names = ["weekday", "month", "is_holiday"] - numerical_column_names = ["latitude", "longitude", "year"] - cyclic_column_periods = { - "hour": 24, - "dayofmonth": 31, - "weekofyear": 52, - "dayofyear": 366, - } - cv_folds: int = 10 - max_train_size: int = 1e7 - - -class DataPreparationPipelineXGboost: - def __init__(self, spark: SparkSession): - self.spark = spark - self.config = DataPreparationPipelineXGBoostConfig() - - def get_column_transformer(self) -> ColumnTransformer: - num_pipeline = Pipeline( - steps=[ - ("imputer", SimpleImputer(strategy="mean")), - ("scaler", StandardScaler()), - ], - ) - - cat_pipeline = Pipeline( - steps=[ - ("imputer", SimpleImputer(strategy="most_frequent")), - ("one_hot_encoder", OneHotEncoder()), - ("scaler", StandardScaler(with_mean=False)), - ] - ) - - cyclic_transformers = [ - ( - f"{name}_pipeline", - Pipeline( - steps=[ - ("cyclic_encoder", CyclicEncoder(period)), - ("scaler", StandardScaler()), - ] - ), - [name], - ) - for name, period in self.config.cyclic_column_periods.items() - ] - - preprocessor = ColumnTransformer( - [ - ("num_pipeline", num_pipeline, self.config.numerical_column_names), - ("cat_pipeline", cat_pipeline, self.config.categorical_column_names), - ] - + cyclic_transformers - ) - - return preprocessor - - def transform(self): - dataframe = read_delta(self.spark, self.config.gold_delta_path) - features_df, label_df = self.split_features_and_labels(dataframe) - - features_df = self.transform_features(features_df) - data = np.c_[features_df, label_df.to_numpy()] - - tscv = TimeSeriesSplit( - n_splits=self.config.cv_folds, - max_train_size=self.config.max_train_size, - ) - # for i, (train_index, test_index) in enumerate(tscv.split(data)): - - def transform_features(self, features_df): - preprocessor = self.get_column_transformer() - features_df = preprocessor.fit_transform(features_df) - save_as_pickle(preprocessor, self.config.pipeline_artifact_path) - return features_df - - def split_features_and_labels(self, df): - features_df = df.drop(columns=self.config.label_column_names) - label_df = df.loc[:, self.config.label_column_names] - return features_df, label_df - - -if __name__ == "__main__": - pipeline = DataPreparationPipelineXGboost(None) diff --git a/test/pipelines/__init__.py b/test/pipelines/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/test/pipelines/cyclic_encoder_test.py b/test/pipelines/cyclic_encoder_test.py deleted file mode 100644 index 7989cec..0000000 --- a/test/pipelines/cyclic_encoder_test.py +++ /dev/null @@ -1,22 +0,0 @@ -import pytest -import numpy as np -from src.pipelines.cyclic_encoder import CyclicEncoder - - -@pytest.mark.parametrize("period", [24, 30, 365]) -def test_init(period: int): - encoder = CyclicEncoder(period) - assert encoder.period == period - - -def test_fit(): - encoder = CyclicEncoder(24) - assert encoder.fit(None, None) is encoder - - -@pytest.mark.parametrize("period", [24, 30, 365]) -def test_transform(period: int): - encoder = CyclicEncoder(period) - X = np.array(list(range(period))).reshape(1, -1) - output = encoder.transform(X) - assert output.shape == (1, X.shape[1] * 2) diff --git a/test/pipelines/data_preparation_pipeline_xgboost_test.py b/test/pipelines/data_preparation_pipeline_xgboost_test.py deleted file mode 100644 index f6d5325..0000000 --- a/test/pipelines/data_preparation_pipeline_xgboost_test.py +++ /dev/null @@ -1,50 +0,0 @@ -import pytest -from pyspark.sql import SparkSession -from sklearn.compose import ColumnTransformer - -from src.pipelines.data_preparation_pipeline_xgboost import ( - DataPreparationPipelineXGBoostConfig, DataPreparationPipelineXGboost, -) - -@pytest.fixture(scope="session") -def spark(): - spark = ( - SparkSession.builder.master("local[1]") - .appName("local-tests") - .config("spark.executor.cores", "1") - .config("spark.executor.instances", "1") - .config("spark.sql.shuffle.partitions", "1") - .config("spark.driver.bindAddress", "127.0.0.1") - .config("spark.sql.legacy.timeParserPolicy", "LEGACY") - .getOrCreate() - ) - yield spark - spark.stop() - -@pytest.fixture -def pipeline(spark: SparkSession): - return DataPreparationPipelineXGboost(spark) - - -def test_config(): - config = DataPreparationPipelineXGBoostConfig() - - assert hasattr(config, "root_delta_path") - assert hasattr(config, "gold_delta_path") - assert hasattr(config, "data_artifact_path") - assert hasattr(config, "pipeline_artifact_path") - assert hasattr(config, "categorical_column_names") - assert hasattr(config, "numerical_column_names") - assert hasattr(config, "cyclic_column_periods") - assert hasattr(config, "label_column_names") - assert hasattr(config, "cv_folds") - -def test_init(pipeline: DataPreparationPipelineXGboost): - assert hasattr(pipeline, "spark") - assert hasattr(pipeline, "config") - - -def test_get_column_transformer(pipeline: DataPreparationPipelineXGboost): - transformer = pipeline.get_column_transformer() - - assert isinstance(transformer, ColumnTransformer) \ No newline at end of file