Skip to content

Commit

Permalink
source-intercom-native: hydrate contacts with all tags
Browse files Browse the repository at this point in the history
Contacts contain multiple sub-resources, like tags. There can be a large
number of sub-resources attached to one contact, but only a subset are
included when fetching a contact from Intercom's API. For example, only
the first 10 tags are included in a contact. To get all sub-resources,
the connector would need to make an additional request to fetch them
then hydrate the contact with the full set of sub-resources.

This commit adds this hydration functionality for tags within a contact.
There are other possible sub-resources that we could hydrate (notes,
companies, opted_out_subscription_types, etc.), but users are only
asking about tags right now. So I'm only adding functionality for tags.
Adding support for hydrating other sub-resources should be fairly
straightforward if we need to do so later.
  • Loading branch information
Alex-Bair committed Feb 28, 2025
1 parent 9337fe2 commit d6cd897
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 7 deletions.
34 changes: 33 additions & 1 deletion source-intercom-native/source_intercom_native/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
from .models import (
IntercomResource,
TimestampedResource,
Contact,
NestedTag,
ContactTagsResponse,
ContactsSearchResponse,
TicketsSearchResponse,
ConversationsSearchResponse,
Expand Down Expand Up @@ -184,6 +187,33 @@ def _is_large_date_window(start: int, end: int) -> bool:
return delta > timedelta(hours=1)


async def _hydrate_contact(
http: HTTPSession,
contact: Contact,
log: Logger,
) -> Contact:
if contact.tags.has_more:
url = f"{API}/contacts/{contact.id}/tags"
response = ContactTagsResponse.model_validate_json(
await http.request(log, url)
)

# Tags and nested tags have different shapes. We have to transform tags into nested tags
# to ensure the tags in a Contact are always the same shape.
nested_tags = []
for tag in response.data:
nested_tag = NestedTag.model_validate({
"id": tag.id,
"type": tag.type,
"url": f"/tags/{tag.id}"
})
nested_tags.append(nested_tag)

contact.tags.data = nested_tags
contact.tags.has_more = False
return contact


async def fetch_contacts(
http: HTTPSession,
window_size: int,
Expand Down Expand Up @@ -234,7 +264,9 @@ async def fetch_contacts(
if updated_at > last_seen_ts:
last_seen_ts = updated_at
if updated_at > start:
yield contact
# Nested subresources within a contact are capped at 10 elements, even if more exist.
# We hydrate the contact with the additional subresources if they aren't all present.
yield await _hydrate_contact(http, contact, log)

if pagination_ended_early or response.pages.next is None:
break
Expand Down
41 changes: 39 additions & 2 deletions source-intercom-native/source_intercom_native/models.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
from datetime import datetime, UTC, timedelta
from typing import Annotated, AsyncGenerator, Callable, Literal, TYPE_CHECKING, Optional
from typing import (
Annotated,
AsyncGenerator,
Callable,
Generic,
Optional,
TYPE_CHECKING,
TypeVar,
)

from estuary_cdk.capture.common import (
AccessToken,
Expand Down Expand Up @@ -112,8 +120,37 @@ class Next(BaseModel, extra="forbid"):
pages: Pagination


_Subresource = TypeVar("_Subresource", bound=BaseModel)


class NestedTag(BaseModel):
id: str
type: str
url: str


class Contact(TimestampedResource):
class Subresources(BaseModel, Generic[_Subresource]):
data: list[_Subresource]
has_more: bool
total_count: int
type: str
url: str

tags: Subresources[NestedTag]


class ContactTagsResponse(BaseModel, extra="allow"):
class Tag(BaseModel, extra="forbid"):
id: str
type: str
name: str

data: list[Tag]


class ContactsSearchResponse(SearchResponse):
data: list[TimestampedResource]
data: list[Contact]


class TicketsSearchResponse(SearchResponse):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,87 @@
"type": "list"
},
"tags": {
"data": [],
"data": [
{
"id": "10718342",
"type": "tag",
"url": "/tags/10718342"
},
{
"id": "10718351",
"type": "tag",
"url": "/tags/10718351"
},
{
"id": "10718352",
"type": "tag",
"url": "/tags/10718352"
},
{
"id": "10718349",
"type": "tag",
"url": "/tags/10718349"
},
{
"id": "10718354",
"type": "tag",
"url": "/tags/10718354"
},
{
"id": "10718353",
"type": "tag",
"url": "/tags/10718353"
},
{
"id": "10718347",
"type": "tag",
"url": "/tags/10718347"
},
{
"id": "10718350",
"type": "tag",
"url": "/tags/10718350"
},
{
"id": "10718345",
"type": "tag",
"url": "/tags/10718345"
},
{
"id": "10718343",
"type": "tag",
"url": "/tags/10718343"
},
{
"id": "10718348",
"type": "tag",
"url": "/tags/10718348"
},
{
"id": "10718344",
"type": "tag",
"url": "/tags/10718344"
},
{
"id": "10718355",
"type": "tag",
"url": "/tags/10718355"
},
{
"id": "10718346",
"type": "tag",
"url": "/tags/10718346"
}
],
"has_more": false,
"total_count": 0,
"total_count": 14,
"type": "list",
"url": "/contacts/67570a878b6e26fe4ce6a894/tags"
},
"type": "contact",
"unsubscribed_from_emails": false,
"unsubscribed_from_sms": false,
"updated_at": 1734097734,
"updated_at": 1740758913,
"utm_campaign": null,
"utm_content": null,
"utm_medium": null,
Expand Down
2 changes: 1 addition & 1 deletion source-intercom-native/tests/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_capture(request, snapshot):
"--sessions",
"1",
"--delay",
"10s",
"30s",
],
stdout=subprocess.PIPE,
text=True,
Expand Down

0 comments on commit d6cd897

Please sign in to comment.