From 25f60ce05a71fcb26fc90e6c1f871da54f4dfe1c Mon Sep 17 00:00:00 2001 From: Alex Bair Date: Fri, 28 Feb 2025 11:27:26 -0500 Subject: [PATCH] source-intercom-native: hydrate contacts with all tags Contacts contain multiple sub-resources, like tags. There can be a large number of sub-resources attached to one contact, but only a subset are included when fetching a contact from Intercom's API. For example, only the first 10 tags are included in a contact. To get all sub-resources, the connector would need to make an additional request to fetch them then hydrate the contact with the full set of sub-resources. This commit adds this hydration functionality for tags within a contact. There are other possible sub-resources that we could hydrate (notes, companies, opted_out_subscription_types, etc.), but users are only asking about tags right now. So I'm only adding functionality for tags. Adding support for hydrating other sub-resources should be fairly straightforward if we need to do so later. --- .../source_intercom_native/api.py | 34 +++++++- .../source_intercom_native/models.py | 41 +++++++++- .../snapshots__capture__capture.stdout.json | 77 ++++++++++++++++++- .../tests/test_snapshots.py | 2 +- 4 files changed, 147 insertions(+), 7 deletions(-) diff --git a/source-intercom-native/source_intercom_native/api.py b/source-intercom-native/source_intercom_native/api.py index 925ba308a0..57d1e3f979 100644 --- a/source-intercom-native/source_intercom_native/api.py +++ b/source-intercom-native/source_intercom_native/api.py @@ -12,6 +12,9 @@ from .models import ( IntercomResource, TimestampedResource, + Contact, + NestedTag, + ContactTagsResponse, ContactsSearchResponse, TicketsSearchResponse, ConversationsSearchResponse, @@ -184,6 +187,33 @@ def _is_large_date_window(start: int, end: int) -> bool: return delta > timedelta(hours=1) +async def _hydrate_contact( + http: HTTPSession, + contact: Contact, + log: Logger, +) -> Contact: + if contact.tags.has_more: + url = f"{API}/contacts/{contact.id}/tags" + response = ContactTagsResponse.model_validate_json( + await http.request(log, url) + ) + + # Tags and nested tags have different shapes. We have to transform tags into nested tags + # to ensure the tags in a Contact are always the same shape. + nested_tags = [] + for tag in response.data: + nested_tag = NestedTag.model_validate({ + "id": tag.id, + "type": tag.type, + "url": f"/tags/{tag.id}" + }) + nested_tags.append(nested_tag) + + contact.tags.data = nested_tags + contact.tags.has_more = False + return contact + + async def fetch_contacts( http: HTTPSession, window_size: int, @@ -234,7 +264,9 @@ async def fetch_contacts( if updated_at > last_seen_ts: last_seen_ts = updated_at if updated_at > start: - yield contact + # Nested subresources within a contact are capped at 10 elements, even if more exist. + # We hydrate the contact with the additional subresources if they aren't all present. + yield await _hydrate_contact(http, contact, log) if pagination_ended_early or response.pages.next is None: break diff --git a/source-intercom-native/source_intercom_native/models.py b/source-intercom-native/source_intercom_native/models.py index 0785c39f20..e587643bb0 100644 --- a/source-intercom-native/source_intercom_native/models.py +++ b/source-intercom-native/source_intercom_native/models.py @@ -1,5 +1,13 @@ from datetime import datetime, UTC, timedelta -from typing import Annotated, AsyncGenerator, Callable, Literal, TYPE_CHECKING, Optional +from typing import ( + Annotated, + AsyncGenerator, + Callable, + Generic, + Optional, + TYPE_CHECKING, + TypeVar, +) from estuary_cdk.capture.common import ( AccessToken, @@ -112,8 +120,37 @@ class Next(BaseModel, extra="forbid"): pages: Pagination +_Subresource = TypeVar("_Subresource", bound=BaseModel) + + +class NestedTag(BaseModel): + id: str + type: str + url: str + + +class Contact(TimestampedResource): + class Subresources(BaseModel, Generic[_Subresource]): + data: list[_Subresource] + has_more: bool + total_count: int + type: str + url: str + + tags: Subresources[NestedTag] + + +class ContactTagsResponse(BaseModel, extra="allow"): + class Tag(BaseModel, extra="forbid"): + id: str + type: str + name: str + + data: list[Tag] + + class ContactsSearchResponse(SearchResponse): - data: list[TimestampedResource] + data: list[Contact] class TicketsSearchResponse(SearchResponse): diff --git a/source-intercom-native/tests/snapshots/snapshots__capture__capture.stdout.json b/source-intercom-native/tests/snapshots/snapshots__capture__capture.stdout.json index ebb4238fad..3b150ce713 100644 --- a/source-intercom-native/tests/snapshots/snapshots__capture__capture.stdout.json +++ b/source-intercom-native/tests/snapshots/snapshots__capture__capture.stdout.json @@ -187,16 +187,87 @@ "type": "list" }, "tags": { - "data": [], + "data": [ + { + "id": "10718342", + "type": "tag", + "url": "/tags/10718342" + }, + { + "id": "10718351", + "type": "tag", + "url": "/tags/10718351" + }, + { + "id": "10718352", + "type": "tag", + "url": "/tags/10718352" + }, + { + "id": "10718349", + "type": "tag", + "url": "/tags/10718349" + }, + { + "id": "10718354", + "type": "tag", + "url": "/tags/10718354" + }, + { + "id": "10718353", + "type": "tag", + "url": "/tags/10718353" + }, + { + "id": "10718347", + "type": "tag", + "url": "/tags/10718347" + }, + { + "id": "10718350", + "type": "tag", + "url": "/tags/10718350" + }, + { + "id": "10718345", + "type": "tag", + "url": "/tags/10718345" + }, + { + "id": "10718343", + "type": "tag", + "url": "/tags/10718343" + }, + { + "id": "10718348", + "type": "tag", + "url": "/tags/10718348" + }, + { + "id": "10718344", + "type": "tag", + "url": "/tags/10718344" + }, + { + "id": "10718355", + "type": "tag", + "url": "/tags/10718355" + }, + { + "id": "10718346", + "type": "tag", + "url": "/tags/10718346" + } + ], "has_more": false, - "total_count": 0, + "total_count": 14, "type": "list", "url": "/contacts/67570a878b6e26fe4ce6a894/tags" }, "type": "contact", "unsubscribed_from_emails": false, "unsubscribed_from_sms": false, - "updated_at": 1734097734, + "updated_at": 1740758913, "utm_campaign": null, "utm_content": null, "utm_medium": null, diff --git a/source-intercom-native/tests/test_snapshots.py b/source-intercom-native/tests/test_snapshots.py index 6326719eb0..ed14fd3dba 100644 --- a/source-intercom-native/tests/test_snapshots.py +++ b/source-intercom-native/tests/test_snapshots.py @@ -12,7 +12,7 @@ def test_capture(request, snapshot): "--sessions", "1", "--delay", - "10s", + "30s", ], stdout=subprocess.PIPE, text=True,