-
Notifications
You must be signed in to change notification settings - Fork 182
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(connect): internal refactoring to make connect code more organized & extensible #3680
refactor(connect): internal refactoring to make connect code more organized & extensible #3680
Conversation
…Daft into connect_distinct
CodSpeed Performance ReportMerging #3680 will not alter performanceComparing Summary
|
Is this PR a combination of multiple other, smaller PRs? Seems like some of the changes I saw here were reflected in an earlier PR. |
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3680 +/- ##
==========================================
+ Coverage 77.79% 77.89% +0.10%
==========================================
Files 729 718 -11
Lines 90477 90391 -86
==========================================
+ Hits 70384 70411 +27
+ Misses 20093 19980 -113
|
…Daft into connect-functions-refactor
…ect-functions-refactor
@raunakab this should be ready for review now. |
pub struct UnaryFunction(fn(ExprRef) -> ExprRef); | ||
pub struct CountFunction; | ||
|
||
impl SparkFunction for BinaryOpFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to be implemented in this PR, but usually, if you have a fixed number of types implementing a trait, then opting for an enum seems to be better form.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it's currently a fixed number, but this will expand quite a bit, and it will become impractical to contain all of those in a single enum. as spark has A LOT of functions.
Additionally, we'll need to support dynamically registered UDF's later down the road, so it makes sense to lay the groundwork now instead of needing to do another refactor later down the road.
&self, | ||
args: &[Expression], | ||
analyzer: &SparkAnalyzer, | ||
) -> eyre::Result<daft_dsl::ExprRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not the right PR for this, but I'm just curious: Why are we using eyre
instead of thiserror
or anyhow
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agree here. It's been on my todo list to change the error handling in daft-connect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let args = args | ||
.iter() | ||
.map(|arg| analyzer.to_daft_expr(arg)) | ||
.collect::<eyre::Result<Vec<_>>>()?; | ||
|
||
let [lhs, rhs] = args | ||
.try_into() | ||
.map_err(|args| eyre::eyre!("requires exactly two arguments; got {:?}", args))?; | ||
|
||
Ok(binary_op(self.0, lhs, rhs)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we could try to keep the same pattern for to_expr
as the other implementations? I.e., something like:
match args {
[lhs, rhs] => ..,
_ => return invalid_argument_err!("requires exactly two arguments; got {args:?}"),
}
pub trait SparkFunction: Send + Sync { | ||
fn to_expr( | ||
&self, | ||
args: &[Expression], | ||
analyzer: &SparkAnalyzer, | ||
) -> eyre::Result<daft_dsl::ExprRef>; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like:
enum SparkFunction {
BinaryOpFunction(BinaryOpFunction),
UnaryFunction(UnaryFunction),
CountFunction(CountFunction),
}
impl SparkFunction {
fn to_expr(&self, args: &[Expression], analyzer: &SparkAnalyzer) -> eyre::Result<daft_dsl::ExprRef> {
match self {
Self::BinaryOpFunction(..) => ..,
Self::UnaryFunction(..) => ..,
Self::CountFunction(..) => ..,
}
}
}
This would allow us to avoid all the dyn SparkFunction
stuff elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the main reasons behind using the trait impl instead of enums is
- it's (subjectively) a little easier to work with
- performance overhead here is "relatively" small in comparison to the actual execution
- most importantly, it makes it possible to support udfs later down the road
[arg] => { | ||
let arg = analyzer.to_daft_expr(arg)?; | ||
|
||
let arg = if arg.as_literal().and_then(|lit| lit.as_i32()) == Some(1i32) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't count(2)
also equivalent to count(1)
, which is equivalent to count(*)
? In fact, isn't count(n)
, for any integer n
, equivalent to count(*)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in spark, it is only ever sent over as count(1)
, and it never allows any args
so it can only ever be called via
df.count()
which gets serialized via protobuf as count(1)
. So this is really about special casing for what spark connect is sending us vs how we internally represent a count(*)
/ count(n)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had some general questions, but looks good to me given my current understanding of this connect code.
There's 2 refactors that happen in this PR.
coalesce
SparkAnalyzer
implsa lot of method impls for
SparkAnalyzer
were in a single file making the code difficult to navigate. Most IDE's/editors have much better support for "goto symbol" or "symbol search" for single buffers instead of project wide symbol search. So coalescing allimpl SparkAnalyzer
into a single file makes things much more navigable without needing to use project wide symbol search, and without needing to jump between many files.functions refactor for extensibility
previously, all of the supported spark functions were hardcoded and inlined inside a single function. This felt kinda unintuitive. Adding certain functionality (udf) becomes difficult, if not impossible without a registry. So I refactored it to mirror our daft-sql function implementation. Now there is a function registry, and you just need to impl the trait and register it. So implementing a connect function should now feel very similar to implementing a sql function.
ex:
you can register a single function
or you can register an entire function module