Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ [source-hubspot] Implement RFR for contacts_form_submissions, contact_list_memberships, contact_merged_audit #38049

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

brianjlai
Copy link
Contributor

@brianjlai brianjlai commented May 7, 2024

Closes https://github.com/airbytehq/airbyte-internal-issues/issues/7047

What

There are 5 Hubspot streams that only support full refresh. I've implemented 3 of them:

  • contacts_form_submissions
  • contact_list_memberships
  • contact_merged_audit

I did not implement the others because:

  • email_subscriptions: This does not support any form of pagination and is therefore not RFR compatible
  • marketing_emails: As per the docs, this supports filtering on updated or created and should just be converted to incremental which is better.

How

Bumped the version to 0.87.0 which is the first version to have the RFR Python interfaces

To support RFR as per the design, the streams in question need to implement a version of read_records() which reads the prior page from the stream_slice, reads one page at a time, and updates state w/ the next_page_token.

The Hubspot streams being implemented use a form of cursor pagination where the response contains a pointer to the next contact (specified by the vid). And that vid is passed to the next request using the vidOffset parameter.

All of the full refresh streams derive from the same concept of fetching Contacts records and extracting a specific attribute list and returning them as records so I consolidated the implementation in the ContactsAllBase class.

I also added some mock server tests for each.

Review guide

  1. source_hubspot/streams.py
  2. honestly the rest is just tests

User Impact

Theoretically none, I did refactor contact_merge_audit to use the same flow as the others which was just cleaner, but now we should start seeing checkpoint messages emitted per-page during syncs

Can this PR be safely reverted and rolled back?

No db or config migrations

  • YES 💚
  • NO ❌

…s, contact_list_memberships, contact_merged_audit, and mock server tests
Copy link

vercel bot commented May 7, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

1 Ignored Deployment
Name Status Preview Comments Updated (UTC)
airbyte-docs ⬜️ Ignored (Inspect) Visit Preview May 20, 2024 5:16am

@brianjlai brianjlai requested a review from a team May 9, 2024 23:38
@brianjlai brianjlai marked this pull request as ready for review May 9, 2024 23:38
@brianjlai brianjlai requested a review from a team as a code owner May 9, 2024 23:38
@brianjlai
Copy link
Contributor Author

brianjlai commented May 10, 2024

Live tests ✅ against a connection: https://github.com/airbytehq/airbyte/actions/runs/9024913362/job/24799667426

Also tested in my own personal workspace for the streams mentioned

@maxi297
Copy link
Contributor

maxi297 commented May 10, 2024

I'll check the PR tomorrow but note that contact_list_memberships will be move to semi-incremental next week

from .request_builders.streams import ContactsStreamRequestBuilder


@freezegun.freeze_time("2024-05-04T00:00:00Z")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to use the dynamic end_date freeze, like pendulum.parse(<config.start_date>).substract(days=<x_days_to_substract>), having the 1 start_date anchor, instead of hardcoding the value.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, these tests are similar except of the self.mock_response and assert output.state_messages, etc assertions, which make me think we can simplify the logic of these tests having 1 executable test_method, and bunch of inputs / outputs (parametrized). DYT it worth making so?

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My first thoughts reading the PR description are that I'm not sure why we are focusing on those streams. They seem pretty fine if we check the source side of things:

Screenshot 2024-05-10 at 10 09 28 AM
source

Based on the stream failure tab in the same dashboard, it seems like it's mostly one workspace having issues.
image

Shouldn't we focus on destination not breaking instead? Or do we think they break because of source related issues?

Also note that for the small percentage of source failures we see, those were mostly not read issues (see this analysis and this comment).

I'll still review the PR but business-wise, I'm not sure this is the best this we can work on and (without having read the code so this might be misplaced fear) I fear for the complexity/risk of data loss this might induce

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the end, I have two concerns:

  • Duplication of the logic: do you think we can do something about this?
  • What happens with contacts_list_memberships that will be semi-incremental?

I also want to give a big thumbs up for the mock server tests which allowed me NOT to do manual testing (the only manual testing I did was to test if a vid was deleted and the API supports it very well). This is really appreciated!


next_page_token = stream_slice
try:
properties = self._property_wrapper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fear this part as it sounds like it is the same logic as Stream.read_records and if one change, we now need to know that we need to change the other. Could we have the logic still centralize in Stream, have the Stream._state and only implement the state getter and setter in ContactsAllBase? Even then, I think we will need another class as this is in progress and will rely on ContactsAllBase but I don't think it'll be RFR compatible as-is with both client side filtering incremental and RFR incremental...

Copy link
Contributor

@maxi297 maxi297 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Maybe another concern:

  • can we move most of the logic to the CDK? In theory, as long as next_page_token is exposed, we should be fine. I think the problem in the HubSpot case is that it re-implements read_records but as long as it calls for _read_page, we should be able to just have the change in the CDK. The reasoning is that it feels like it would build more leverage but maybe this is just an impression that needs to be confirmed

@@ -131,3 +132,28 @@ def read_from_stream(
cls, cfg, stream: str, sync_mode: SyncMode, state: Optional[List[AirbyteStateMessage]] = None, expecting_exception: bool = False
) -> EntrypointOutput:
return read(SourceHubspot(), cfg, cls.catalog(stream, sync_mode), state, expecting_exception)


@freezegun.freeze_time("2024-05-05T00:00:00Z")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this only be on the actual test implementation?

@@ -1374,6 +1377,62 @@ class ContactsAllBase(Stream):
records_field = None
filter_field = None
filter_value = None
_state = {}
limit_field = "count"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. It was using limit before. The default value on the API side is 100 so this shouldn't change the behavior

@@ -2,28 +2,31 @@

[[package]]
name = "airbyte-cdk"
version = "0.78.6"
version = "0.87.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, testing manually makes me believe that upping the CDK will cause issues with ClientSideIncrementalStream as the state will be set and this will fail with an index error. I think it comes from a change here where the condition before was if stream_state and "state" in dir(stream_instance) and <...> but we've removed the first part

Comment on lines +876 to +879
if value:
self._cursor_value = value[self.cursor_field]
else:
self._cursor_value = ""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also:

Suggested change
if value:
self._cursor_value = value[self.cursor_field]
else:
self._cursor_value = ""
self._cursor_value = value[self.cursor_field] if value else ""

@@ -861,7 +873,10 @@ def state(self) -> Mapping[str, Any]:

@state.setter
def state(self, value: Mapping[str, Any]):
self._cursor_value = value[self.cursor_field]
if value:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, testing manually makes me believe that upping the CDK will cause issues with ClientSideIncrementalStream as the state will be set and this will fail with an index error. I think it comes from a change here where the condition before was if stream_state and "state" in dir(stream_instance) and <...> but we've removed the first part

@maxi297 good comment about this in the poetry.lock file about CDK compatibility! So I had adjusted this condition because there's actually kind of a nasty bug in the CDK that could cause data loss for the first slice because the availability strategy can accidentally update state. See this comment in a previous PR.

And because of that change, I adjusted this condition. I think Hubspot configures setters a bit uncharacteristic from others and with RFR, we can't assume state setter/getters == incremental and will always have cursor_field. Basically to account for the case you mentioned, I updated our state setter to be more resilient to empty state which is what we should probably be doing regardless.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants