New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
✨ [source-hubspot] Implement RFR for contacts_form_submissions, contact_list_memberships, contact_merged_audit #38049
base: master
Are you sure you want to change the base?
Conversation
…s, contact_list_memberships, contact_merged_audit, and mock server tests
The latest updates on your projects. Learn more about Vercel for Git ↗︎ 1 Ignored Deployment
|
Live tests ✅ against a connection: https://github.com/airbytehq/airbyte/actions/runs/9024913362/job/24799667426 Also tested in my own personal workspace for the streams mentioned |
I'll check the PR tomorrow but note that |
from .request_builders.streams import ContactsStreamRequestBuilder | ||
|
||
|
||
@freezegun.freeze_time("2024-05-04T00:00:00Z") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to use the dynamic end_date
freeze, like pendulum.parse(<config.start_date>).substract(days=<x_days_to_substract>)
, having the 1 start_date
anchor, instead of hardcoding the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, these tests are similar except of the self.mock_response
and assert output.state_messages
, etc assertions, which make me think we can simplify the logic of these tests having 1 executable test_method, and bunch of inputs / outputs (parametrized). DYT it worth making so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My first thoughts reading the PR description are that I'm not sure why we are focusing on those streams. They seem pretty fine if we check the source side of things:
Based on the stream failure tab in the same dashboard, it seems like it's mostly one workspace having issues.
Shouldn't we focus on destination not breaking instead? Or do we think they break because of source related issues?
Also note that for the small percentage of source failures we see, those were mostly not read issues (see this analysis and this comment).
I'll still review the PR but business-wise, I'm not sure this is the best this we can work on and (without having read the code so this might be misplaced fear) I fear for the complexity/risk of data loss this might induce
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the end, I have two concerns:
- Duplication of the logic: do you think we can do something about this?
- What happens with
contacts_list_memberships
that will be semi-incremental?
I also want to give a big thumbs up for the mock server tests which allowed me NOT to do manual testing (the only manual testing I did was to test if a vid was deleted and the API supports it very well). This is really appreciated!
|
||
next_page_token = stream_slice | ||
try: | ||
properties = self._property_wrapper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I fear this part as it sounds like it is the same logic as Stream.read_records
and if one change, we now need to know that we need to change the other. Could we have the logic still centralize in Stream
, have the Stream._state
and only implement the state
getter and setter in ContactsAllBase
? Even then, I think we will need another class as this is in progress and will rely on ContactsAllBase
but I don't think it'll be RFR compatible as-is with both client side filtering incremental and RFR incremental...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh! Maybe another concern:
- can we move most of the logic to the CDK? In theory, as long as
next_page_token
is exposed, we should be fine. I think the problem in the HubSpot case is that it re-implementsread_records
but as long as it calls for_read_page
, we should be able to just have the change in the CDK. The reasoning is that it feels like it would build more leverage but maybe this is just an impression that needs to be confirmed
@@ -131,3 +132,28 @@ def read_from_stream( | |||
cls, cfg, stream: str, sync_mode: SyncMode, state: Optional[List[AirbyteStateMessage]] = None, expecting_exception: bool = False | |||
) -> EntrypointOutput: | |||
return read(SourceHubspot(), cfg, cls.catalog(stream, sync_mode), state, expecting_exception) | |||
|
|||
|
|||
@freezegun.freeze_time("2024-05-05T00:00:00Z") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this only be on the actual test implementation?
@@ -1374,6 +1377,62 @@ class ContactsAllBase(Stream): | |||
records_field = None | |||
filter_field = None | |||
filter_value = None | |||
_state = {} | |||
limit_field = "count" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch. It was using limit
before. The default value on the API side is 100 so this shouldn't change the behavior
@@ -2,28 +2,31 @@ | |||
|
|||
[[package]] | |||
name = "airbyte-cdk" | |||
version = "0.78.6" | |||
version = "0.87.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, testing manually makes me believe that upping the CDK will cause issues with ClientSideIncrementalStream
as the state will be set and this will fail with an index error. I think it comes from a change here where the condition before was if stream_state and "state" in dir(stream_instance) and <...>
but we've removed the first part
if value: | ||
self._cursor_value = value[self.cursor_field] | ||
else: | ||
self._cursor_value = "" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also:
if value: | |
self._cursor_value = value[self.cursor_field] | |
else: | |
self._cursor_value = "" | |
self._cursor_value = value[self.cursor_field] if value else "" |
@@ -861,7 +873,10 @@ def state(self) -> Mapping[str, Any]: | |||
|
|||
@state.setter | |||
def state(self, value: Mapping[str, Any]): | |||
self._cursor_value = value[self.cursor_field] | |||
if value: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, testing manually makes me believe that upping the CDK will cause issues with ClientSideIncrementalStream as the state will be set and this will fail with an index error. I think it comes from a change here where the condition before was if stream_state and "state" in dir(stream_instance) and <...> but we've removed the first part
@maxi297 good comment about this in the poetry.lock
file about CDK compatibility! So I had adjusted this condition because there's actually kind of a nasty bug in the CDK that could cause data loss for the first slice because the availability strategy can accidentally update state. See this comment in a previous PR.
And because of that change, I adjusted this condition. I think Hubspot configures setters a bit uncharacteristic from others and with RFR, we can't assume state setter/getters == incremental and will always have cursor_field
. Basically to account for the case you mentioned, I updated our state setter to be more resilient to empty state which is what we should probably be doing regardless.
Closes https://github.com/airbytehq/airbyte-internal-issues/issues/7047
What
There are 5 Hubspot streams that only support full refresh. I've implemented 3 of them:
I did not implement the others because:
email_subscriptions
: This does not support any form of pagination and is therefore not RFR compatiblemarketing_emails
: As per the docs, this supports filtering onupdated
orcreated
and should just be converted to incremental which is better.How
Bumped the version to
0.87.0
which is the first version to have the RFR Python interfacesTo support RFR as per the design, the streams in question need to implement a version of
read_records()
which reads the prior page from thestream_slice
, reads one page at a time, and updates state w/ thenext_page_token
.The Hubspot streams being implemented use a form of cursor pagination where the response contains a pointer to the next contact (specified by the
vid
). And thatvid
is passed to the next request using thevidOffset
parameter.All of the full refresh streams derive from the same concept of fetching Contacts records and extracting a specific attribute list and returning them as records so I consolidated the implementation in the
ContactsAllBase
class.I also added some mock server tests for each.
Review guide
source_hubspot/streams.py
User Impact
Theoretically none, I did refactor
contact_merge_audit
to use the same flow as the others which was just cleaner, but now we should start seeing checkpoint messages emitted per-page during syncsCan this PR be safely reverted and rolled back?
No db or config migrations