New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle db isolation for mapped operators and task groups #39259
base: main
Are you sure you want to change the base?
Handle db isolation for mapped operators and task groups #39259
Conversation
if isinstance(self.task, MappedOperator): | ||
self.task = context["ti"].task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this is an interesting one @potiuk. The way mapped operators are "expanded" or "unmapped"... it happens inside of MappedOperator.render_template_fields
. It does so by replacing the task
attr on the ti
in the context dictionary, which in the non-db-isolation case mutates what is here self.task
! But in db isolation case, the context dict is created via RPC and so the pydantic TI in the context dict is not the same as the PydanticTI that is running.... it's .... quite complicated. But anyway this here is one way to ensure that the task gets properly unmapped -- we don't here rely on mutating the TI in the context dict.
and here @uranusjr ? |
# when taking task over RPC, we need to add the dag back | ||
if isinstance(task, MappedOperator): | ||
if not task.dag: | ||
task.dag = dag | ||
elif not task._dag: | ||
task._dag = dag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a fan for this… can we do this earlier in the stack, say when the task is created instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the issue @uranusjr is that this is early in the stack when it's a RPC call. the only earlier place we could do it is in the decorator. WDYT? we could stick it in a private function though and get it out of the way and reuse in module though....
when not a RPC call, this has no effect and is not needed.
if isinstance(self.task, MappedOperator): | ||
self.task = context["ti"].task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this not work with BaseOperator? The conditional makes this a lot weirder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s right because only when it is mappedoperator is ti.task mutated. Otherwise ti.task is the result of rpc call and long story short it can’t be used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that make sense @uranusjr ?
so with normal task, self.task
is the task that is created locally, and there is no need to override it from the one in context dict. and if you did that then you'd take a task object that isn't quite complete, essentially because we don't have proper serialization of Task since there's no real Task entity and no TaskPydantic. But generally it's not a problem because most of the time we don't need to serialize a task object.
in the mappedoperator case though, as we saw last night, "unmapping" is achieved by mutating the ti in the context dict, and it relies on the assumption that the TI in the context dict is the same object as the one that is created locally and being run, which isn't true when the context comes from RPC.
if searching for alternatives, we could look at not relying on the context dict for this "unmapping". e.g. we could forword the "original" ti object to the thing doing the unmapping so we don't need to mutate what's in context.
another option would be, upon receiving a fresh context dict over RPC, we could replace the TIs in the context with the local TIPydantic object -- or something to this effect. then perhaps we could keep the context["ti"] mutation approach for unmapping.
we could also look at changing the way we handle context over RPC. currently it's just a "working" approach but not optimal because there's no laziness. we could optimize by making each context object an accessor that is an RPC call (and we should do something like this ). and something like that could help here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense, but if isinstance(self.task, MappedOperator)
is an awkward condition to check for the case.
upon receiving a fresh context dict over RPC, we could replace the TIs in the context with the local TIPydantic object
This sounds somewhat promising. Instead of just the ti, we could probably try to replace the entire relationship (including e.g. dag
) so we can get rid of needing to pass in dag
separately into _record_task_map_for_downstreams
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense, but
if isinstance(self.task, MappedOperator)
is an awkward condition to check for the case.
yeah, i see what you're saying. e.g. better would be for the code to "tell us" when an unmap has happened.
like when we call
original_task.render_template_fields(context, jinja_env)
that could like... return a new task when it creates one. that would certainly make it more obvious what is going on too.
No description provided.