Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(connect): internal refactoring to make connect code more organized & extensible #3680

Merged

Conversation

universalmind303
Copy link
Contributor

@universalmind303 universalmind303 commented Jan 14, 2025

There's 2 refactors that happen in this PR.

coalesce SparkAnalyzer impls

a lot of method impls for SparkAnalyzer were in a single file making the code difficult to navigate. Most IDE's/editors have much better support for "goto symbol" or "symbol search" for single buffers instead of project wide symbol search. So coalescing all impl SparkAnalyzer into a single file makes things much more navigable without needing to use project wide symbol search, and without needing to jump between many files.

functions refactor for extensibility

previously, all of the supported spark functions were hardcoded and inlined inside a single function. This felt kinda unintuitive. Adding certain functionality (udf) becomes difficult, if not impossible without a registry. So I refactored it to mirror our daft-sql function implementation. Now there is a function registry, and you just need to impl the trait and register it. So implementing a connect function should now feel very similar to implementing a sql function.

ex:

you can register a single function

pub struct CountFunction;

impl SparkFunction for CountFunction {
    fn to_expr(
        &self,
        args: &[Expression],
        analyzer: &SparkAnalyzer,
    ) -> eyre::Result<daft_dsl::ExprRef> {
        todo!()
    }
}

// functions.rs
let mut functions = SparkFunctions::new();
functions.add_fn("count", CountFunction);

or you can register an entire function module

// functions/core.rs
pub struct CoreFunctions;

impl FunctionModule for CoreFunctions {
    fn register(parent: &mut super::SparkFunctions) {
        parent.add_fn("count", CountFunction);
    }
}
// functions.rs
let mut functions = SparkFunctions::new();
functions.register::<core::CoreFunctions>();

Copy link

codspeed-hq bot commented Jan 14, 2025

CodSpeed Performance Report

Merging #3680 will not alter performance

Comparing universalmind303:connect-functions-refactor (3b5901a) with main (34d2036)

Summary

✅ 27 untouched benchmarks

@raunakab
Copy link
Contributor

Is this PR a combination of multiple other, smaller PRs? Seems like some of the changes I saw here were reflected in an earlier PR.

Copy link

codecov bot commented Jan 14, 2025

Codecov Report

Attention: Patch coverage is 69.44065% with 224 lines in your changes missing coverage. Please review.

Project coverage is 77.89%. Comparing base (34d2036) to head (3b5901a).
Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-connect/src/spark_analyzer.rs 71.45% 167 Missing ⚠️
src/daft-connect/src/spark_analyzer/datatype.rs 11.11% 24 Missing ⚠️
src/daft-dsl/src/expr/mod.rs 0.00% 21 Missing ⚠️
src/daft-connect/src/spark_analyzer/literal.rs 0.00% 8 Missing ⚠️
src/daft-connect/src/functions/core.rs 95.65% 3 Missing ⚠️
src/daft-connect/src/connect_service.rs 0.00% 1 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #3680      +/-   ##
==========================================
+ Coverage   77.79%   77.89%   +0.10%     
==========================================
  Files         729      718      -11     
  Lines       90477    90391      -86     
==========================================
+ Hits        70384    70411      +27     
+ Misses      20093    19980     -113     
Files with missing lines Coverage Δ
src/daft-connect/src/execute.rs 84.78% <100.00%> (ø)
src/daft-connect/src/functions.rs 100.00% <100.00%> (ø)
src/daft-connect/src/lib.rs 92.53% <ø> (ø)
src/daft-connect/src/connect_service.rs 56.32% <0.00%> (ø)
src/daft-connect/src/functions/core.rs 95.65% <95.65%> (ø)
src/daft-connect/src/spark_analyzer/literal.rs 18.18% <0.00%> (ø)
src/daft-dsl/src/expr/mod.rs 74.08% <0.00%> (-1.89%) ⬇️
src/daft-connect/src/spark_analyzer/datatype.rs 11.92% <11.11%> (ø)
src/daft-connect/src/spark_analyzer.rs 71.45% <71.45%> (ø)

... and 4 files with indirect coverage changes

@universalmind303
Copy link
Contributor Author

Is this PR a combination of multiple other, smaller PRs? Seems like some of the changes I saw here were reflected in an earlier PR.

yes, its an extension of #3677 and #3675

@universalmind303
Copy link
Contributor Author

@raunakab this should be ready for review now.

pub struct UnaryFunction(fn(ExprRef) -> ExprRef);
pub struct CountFunction;

impl SparkFunction for BinaryOpFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't need to be implemented in this PR, but usually, if you have a fixed number of types implementing a trait, then opting for an enum seems to be better form.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it's currently a fixed number, but this will expand quite a bit, and it will become impractical to contain all of those in a single enum. as spark has A LOT of functions.

Additionally, we'll need to support dynamically registered UDF's later down the road, so it makes sense to lay the groundwork now instead of needing to do another refactor later down the road.

&self,
args: &[Expression],
analyzer: &SparkAnalyzer,
) -> eyre::Result<daft_dsl::ExprRef> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the right PR for this, but I'm just curious: Why are we using eyre instead of thiserror or anyhow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree here. It's been on my todo list to change the error handling in daft-connect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +53 to +62
let args = args
.iter()
.map(|arg| analyzer.to_daft_expr(arg))
.collect::<eyre::Result<Vec<_>>>()?;

let [lhs, rhs] = args
.try_into()
.map_err(|args| eyre::eyre!("requires exactly two arguments; got {:?}", args))?;

Ok(binary_op(self.0, lhs, rhs))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could try to keep the same pattern for to_expr as the other implementations? I.e., something like:

match args {
  [lhs, rhs] => ..,
  _ => return invalid_argument_err!("requires exactly two arguments; got {args:?}"),
}

Comment on lines +15 to +21
pub trait SparkFunction: Send + Sync {
fn to_expr(
&self,
args: &[Expression],
analyzer: &SparkAnalyzer,
) -> eyre::Result<daft_dsl::ExprRef>;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like:

enum SparkFunction {
  BinaryOpFunction(BinaryOpFunction),
  UnaryFunction(UnaryFunction),
  CountFunction(CountFunction),
}

impl SparkFunction {
  fn to_expr(&self, args: &[Expression], analyzer: &SparkAnalyzer) -> eyre::Result<daft_dsl::ExprRef> {
    match self {
      Self::BinaryOpFunction(..) => ..,
      Self::UnaryFunction(..) => ..,
      Self::CountFunction(..) => ..,
    }
  }
}

This would allow us to avoid all the dyn SparkFunction stuff elsewhere.

Copy link
Contributor Author

@universalmind303 universalmind303 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so the main reasons behind using the trait impl instead of enums is

  • it's (subjectively) a little easier to work with
  • performance overhead here is "relatively" small in comparison to the actual execution
  • most importantly, it makes it possible to support udfs later down the road

[arg] => {
let arg = analyzer.to_daft_expr(arg)?;

let arg = if arg.as_literal().and_then(|lit| lit.as_i32()) == Some(1i32) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't count(2) also equivalent to count(1), which is equivalent to count(*)? In fact, isn't count(n), for any integer n, equivalent to count(*)?

Copy link
Contributor Author

@universalmind303 universalmind303 Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in spark, it is only ever sent over as count(1), and it never allows any args

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.count.html

so it can only ever be called via

df.count()

which gets serialized via protobuf as count(1). So this is really about special casing for what spark connect is sending us vs how we internally represent a count(*) / count(n).

Copy link
Contributor

@raunakab raunakab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some general questions, but looks good to me given my current understanding of this connect code.

@universalmind303 universalmind303 merged commit beae462 into Eventual-Inc:main Jan 16, 2025
43 of 44 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants