Skip to content

Commit

Permalink
[SPARK-50948][ML][PYTHON][CONNECT] Add support for StringIndexer/PCA …
Browse files Browse the repository at this point in the history
…on Connect

### What changes were proposed in this pull request?
This PR adds support for PCA and StringIndexer on Connect:

### Why are the changes needed?

for feature parity

### Does this PR introduce _any_ user-facing change?
yes, new algorithms supported on connect

### How was this patch tested?
The newly added tests pass

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #49600 from wbo4958/string_indexer_pca.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
  • Loading branch information
wbo4958 authored and zhengruifeng committed Jan 22, 2025
1 parent 454463b commit 6658846
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,5 @@ org.apache.spark.ml.feature.StandardScaler
org.apache.spark.ml.feature.MaxAbsScaler
org.apache.spark.ml.feature.MinMaxScaler
org.apache.spark.ml.feature.RobustScaler
org.apache.spark.ml.feature.StringIndexer
org.apache.spark.ml.feature.PCA
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,5 @@ org.apache.spark.ml.feature.StandardScalerModel
org.apache.spark.ml.feature.MaxAbsScalerModel
org.apache.spark.ml.feature.MinMaxScalerModel
org.apache.spark.ml.feature.RobustScalerModel
org.apache.spark.ml.feature.StringIndexerModel
org.apache.spark.ml.feature.PCAModel
5 changes: 5 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,11 @@ class PCAModel private[ml] (

import PCAModel._

// For ml connect only
@Since("4.0.0")
private[ml] def this() = this(Identifiable.randomUID("pca"),
DenseMatrix.zeros(1, 1), Vectors.empty.asInstanceOf[DenseVector])

/** @group setParam */
@Since("1.5.0")
def setInputCol(value: String): this.type = set(inputCol, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ class StringIndexerModel (
@Since("3.0.0")
def this(labelsArray: Array[Array[String]]) = this(Identifiable.randomUID("strIdx"), labelsArray)

// For ml connect only
@Since("4.0.0")
private[ml] def this() = this(labels = Array.empty)

@deprecated("`labels` is deprecated and will be removed in 3.1.0. Use `labelsArray` " +
"instead.", "3.0.0")
@Since("1.5.0")
Expand Down
109 changes: 109 additions & 0 deletions python/pyspark/ml/tests/test_feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import tempfile
import unittest
from typing import List, Tuple, Any

import numpy as np

Expand All @@ -43,6 +44,8 @@
TargetEncoder,
VectorSizeHint,
VectorAssembler,
PCA,
PCAModel,
)
from pyspark.ml.linalg import DenseVector, SparseVector, Vectors
from pyspark.sql import Row
Expand All @@ -51,6 +54,112 @@


class FeatureTestsMixin:
def test_string_indexer(self):
df = (
self.spark.createDataFrame(
[
(1, "a", "e"),
(2, "b", "f"),
(3, "c", "e"),
(4, "a", "f"),
(5, "a", "f"),
(6, "c", "f"),
],
["id", "label1", "label2"],
)
.coalesce(1)
.sortWithinPartitions("id")
)
# single input
si = StringIndexer(inputCol="label1", outputCol="index1")
model = si.fit(df.select("label1"))

# read/write
with tempfile.TemporaryDirectory(prefix="read_write") as tmp_dir:
si.write().overwrite().save(tmp_dir)
si2 = StringIndexer.load(tmp_dir)
self.assertEqual(str(si), str(si2))
self.assertEqual(si.getInputCol(), "label1")
self.assertEqual(si2.getInputCol(), "label1")

model.write().overwrite().save(tmp_dir)
model2 = StringIndexerModel.load(tmp_dir)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.getInputCol(), "label1")
self.assertEqual(model.getOutputCol(), "index1")
self.assertEqual(model2.getInputCol(), "label1")

indexed_df = model.transform(df.select("label1"))
self.assertEqual(sorted(indexed_df.schema.names), sorted(["label1", "index1"]))

def check_a_b(result: List[Tuple[Any, Any]]) -> None:
self.assertTrue(result[0][0] == "a" and result[1][0] == "b" and result[2][0] == "c")
sorted_value = sorted([v for _, v in result])
self.assertEqual(sorted_value, [0.0, 1.0, 2.0])

check_a_b(sorted(set([(i[0], i[1]) for i in indexed_df.collect()]), key=lambda x: x[0]))

# multiple inputs
input_cols = ["label1", "label2"]
output_cols = ["index1", "index2"]
si = StringIndexer(inputCols=input_cols, outputCols=output_cols)
model = si.fit(df.select(*input_cols))
self.assertEqual(model.getInputCols(), input_cols)
self.assertEqual(model.getOutputCols(), output_cols)

indexed_df = model.transform(df.select(*input_cols))
self.assertEqual(
sorted(indexed_df.schema.names), sorted(["label1", "index1", "label2", "index2"])
)

rows = indexed_df.collect()
check_a_b(sorted(set([(i[0], i[2]) for i in rows]), key=lambda x: x[0]))

# check e f
result = sorted(set([(i[1], i[3]) for i in rows]), key=lambda x: x[0])
self.assertTrue(result[0][0] == "e" and result[1][0] == "f")
sorted_value = sorted([v for _, v in result])
self.assertEqual(sorted_value, [0.0, 1.0])

def test_pca(self):
df = self.spark.createDataFrame(
[
(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),
(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),),
],
["features"],
)
pca = PCA(k=2, inputCol="features", outputCol="pca_features")

model = pca.fit(df)
self.assertEqual(model.getK(), 2)
self.assertTrue(
np.allclose(model.explainedVariance.toArray(), [0.79439, 0.20560], atol=1e-4)
)
model.setOutputCol("output")
# Transform the data using the PCA model
transformed_df = model.transform(df)
self.assertTrue(
np.allclose(
transformed_df.collect()[0].output.toArray(), [1.64857, -4.013282], atol=1e-4
)
)

# read/write
with tempfile.TemporaryDirectory(prefix="read_write") as tmp_dir:
pca.write().overwrite().save(tmp_dir)
pca2 = PCA.load(tmp_dir)
self.assertEqual(str(pca), str(pca2))
self.assertEqual(pca.getInputCol(), "features")
self.assertEqual(pca2.getInputCol(), "features")

model.write().overwrite().save(tmp_dir)
model2 = PCAModel.load(tmp_dir)
self.assertEqual(str(model), str(model2))
self.assertEqual(model.getInputCol(), "features")
self.assertEqual(model2.getInputCol(), "features")

def test_vector_assembler(self):
# Create a DataFrame
df = (
Expand Down

0 comments on commit 6658846

Please sign in to comment.