-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mongodb: Allow projection and mongoarrow schema #592
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks really good! and thanks for writing tests.
your questions:
1 - 2: answered in the review
pymongoarrow_schema
is not yet committed so I'll guess the answer to 3:
- I think you can reuse validation for projection to also validate columns in the mongo schema.
- this is really power user feature so we do not need to do a deep validation
- it is OK to fail in extract phase but we always try to provide useful exception messages for the most common case. If you write test properly you'll have those cases there ie. schema has some crazy type mismatch or refers to unknow columns - just wrap those in 1-2 meaningful exception (
Out of buffer
is not really helpful)
Yeah please upgrade the pymongoarrow - AFAIK the newest arrow is handling UUIDs as a type so maybe we do not need to do this crazy duckdb conversion from the other ticket
sources/mongodb/__init__.py
Outdated
@@ -90,6 +91,7 @@ def mongodb_collection( | |||
chunk_size: Optional[int] = 10000, | |||
data_item_format: Optional[TDataItemFormat] = "object", | |||
filter_: Optional[Dict[str, Any]] = None, | |||
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- You should add
projection
toMongoDbCollectionResourceConfiguration
so it becomes part of the configuration. for this particular resource we have explicit config (not derived from signature). This also allows user to configure projection per collection: even if they usemongodb
source. - We are using legacy way to create resources with dynamic names. Could you try
@dlt.resource(
name=lambda args: args["collection"], standalone=True, spec=MongoDbCollectionResourceConfiguration
)
def ...
- What is the reason for not mixing exclude and include projections? is it a limitation of pymongo? If not then IMO it makes sense replace
projection
withinclude_projection
andexclude_projection
, both lists and let user to specify both....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- What is the reason for not mixing exclude and include projections?
The projection
argument is implemented the same as pymongo
(so users can copy-paste from their existing code). AFAIK, the query engine won't let you specify "include" and "exclude" clauses at once. It's either / or, so allowing two kwargs include_projection
and exclude_projection
would make it ambiguous which is used
sources/mongodb/helpers.py
Outdated
|
||
projection_dict = dict(_fields_list_to_dict(projection, "projection")) | ||
|
||
# NOTE we can still filter on primary_key if it's excluded from projection |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mongo has "_id" column which is a kind of primary key. Currently when we create a resource for a collection, we set the primary_key to "_id". if that field is removed, dlt will later complain that primary key is not assuming any value so I'd add it back with a warning (like you do below). that we can do way earlier (when we create a resource)
In very rare cases when someone sets the primary key to something else (on resource / incremental) your code blow is fine. Adding back primary key and warning is good.
cool you thought about this edge case!
and in 99% of cases this is what you'll find in the incremental. Now the question is if
2742c83
to
33c8063
Compare
Update and PR summaryOriginal issueThis follows this issue which is specific to MongoDB source with pyarrow data loading, which uses TODOs
This PR
QuestionsPlease validate the changes I made to the user-facing Other notes
|
) -> Optional[Dict[str, Any]]: | ||
"""Build a projection operator. | ||
|
||
A tuple of fields to include or a dict specifying fields to include or exclude. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A tuple of fields to include or a dict specifying fields to include or exclude. | |
Args: | |
projection (Optional[Union[Mapping[str, Any], Iterable[str]]]): A tuple of fields to include or a dict specifying fields to include or exclude. |
pass # primary_key was properly not included in exclusion projection | ||
else: | ||
dlt.common.logger.warn( | ||
f"Primary key `{self.incremental.primary_key} was removed from exclusion projection" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the backtick isn't closed 👀 in the warning message,
Wondering if it will be understandable for the user if we just give the primary key name of the incremental
) -> Iterator[TDataItem]: | ||
"""Construct the query and load the documents from the collection. | ||
|
||
Args: | ||
filter_ (Dict[str, Any]): The filter to apply to the collection. | ||
limit (Optional[int]): The number of documents to load. | ||
projection: selection of fields to create Cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should we include the typing in brackets for consistency
self, | ||
filter_: Dict[str, Any], | ||
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None, | ||
) -> TCursor: | ||
"""Get a reading cursor for the collection. | ||
|
||
Args: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Argument specification missing here
self, | ||
filter_: Dict[str, Any], | ||
limit: Optional[int] = None, | ||
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None, | ||
) -> Iterator[TDataItem]: | ||
"""Load all documents from the collection in parallel batches. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also here
) -> Iterator[Any]: | ||
""" | ||
Load documents from the collection in Apache Arrow format. | ||
|
||
Args: | ||
filter_ (Dict[str, Any]): The filter to apply to the collection. | ||
limit (Optional[int]): The number of documents to load. | ||
projection: selection of fields to create Cursor | ||
pymongoarrow_schema: mapping of field types to convert BSON to Arrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typing here maybe 👀
Args: | ||
filter_ (Dict[str, Any]): The filter to apply to the collection. | ||
limit (Optional[int]): The number of documents to load. | ||
projection: selection of fields to create Cursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typing 👀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will not repeat this comment anymore
@@ -348,6 +490,13 @@ def collection_documents( | |||
Supported formats: | |||
object - Python objects (dicts, lists). | |||
arrow - Apache Arrow tables. | |||
projection: (Optional[Union[Mapping[str, Any], Iterable[str]]]): The projection to select fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be moved above? - just to match the order of arguments, but probably not too important
"""MongoDB to dlt type conversion when using Python loaders. | ||
|
||
Notes: | ||
The method `ObjectId.__str__()` creates an hexstring using `binascii.hexlify(__id).decode()` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method `ObjectId.__str__()` creates an hexstring using `binascii.hexlify(__id).decode()` | |
The method `ObjectId.__str__()` creates a hex string using `binascii.hexlify(__id).decode()` |
pipeline_name="mongodb_test", | ||
destination=destination_name, | ||
dataset_name="mongodb_test_data", | ||
full_refresh=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think full_refresh is deprecated, needs to be replaced with dev_mode=True in all tests
Tell us what you do here
Short description
This implements two sets of features:
objectId
type and addpymongoarrow_schema
for a user-facing type-casting API formongodb -> pyarrow
projection
kwarg: allows to select the fields to include/exclude when pulling the data from MongoDB. This reduces egress volume and helps with regulatory compliance (e.g., don't pull sensitive information)Notes
pymongoarrow_schema
can intersect with the column selection inprojection
. Thepymongoarrow_schema
overrides the projection kwarg.pymongoarrow_schema
will "fail silently" if you cast to an invalid type (e.g.,ObjectId
toint
): Avoid Type Mismatch Conversion to NaN mongodb-labs/mongo-arrow#246ObjectId
is represented as abinary[12]
type in Arrow usingpymongoarrow
. There's no vectorized operation to convert this to a string since the buffer contains ASCII characters.pymongoarrow_schema
will makepipeline.run()
fail with an hard to trace errorOut of buffer
coming from arrow (e.g.,_id: pyarrow.string()
.pymongoarrow.schema.Schema
validates that its type annotations are valid a__init__()
Future work
pymongoarrow
is updated, we can raise failed type casting.ObjectId
to string.ObjectId
tostring
, then we can't efficiently handle nested arrays and structs without looping through eachOpen questions
projection
was added tomongodb_collection
resource. Should it be added tomongodb
source? Then, the projection would be applied to all the generated resources.projection
+ incremental loading: should we raise an exception if theprimary_key
is excluded? or force its inclusion and log a warning (current implementation)?.pymongoarrow_schema
is applied at extract-time and it's hard to surface schema issues to the user. How much validation needs to happen within the dlt source code vs. user code?Should we upgrade frompymongoarrow == 1.4.0
to== 1.6.0
given their better type support?Related Issues