Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clear-missing-dags failuring #136

Open
CarlosSilva1994 opened this issue Jun 12, 2023 · 7 comments
Open

clear-missing-dags failuring #136

CarlosSilva1994 opened this issue Jun 12, 2023 · 7 comments

Comments

@CarlosSilva1994
Copy link

AssertionError: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7fc6bee71d90>'
[2023-06-12, 00:02:25 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 24386106 for task clear_missing_dags (This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7fc6bee71d90>' (Background on this error at: https://sqlalche.me/e/14/7s2a); 7688)
[2023-06-12, 00:02:25 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2023-06-12, 00:02:25 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check

@waardd
Copy link

waardd commented Jun 21, 2023

I think i have the same issue.

[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:103} INFO - Process will be Deleting the DAG(s) from the DB:
[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:105} INFO - 	Entry: <DAG: Sophis_NoXCOM>
[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:106} INFO - Process will be Deleting 1 DAG(s)
[2023-06-21, 10:12:50 CEST] {clear_missing_dags.py:111} INFO - Performing Delete...
[2023-06-21, 10:12:50 CEST] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/appl/mca/airflow/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
    return_value = self.execute_callable()
  File "/appl/mca/airflow/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/appl/mca/airflow/dags/clear_missing_dags.py", line 114, in clear_missing_dags_fn
    session.commit()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 1451, in commit
    self._transaction.commit(_to_root=self.future)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 829, in commit
    self._prepare_impl()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 808, in _prepare_impl
    self.session.flush()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 3444, in flush
    self._flush(objects)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 3584, in _flush
    transaction.rollback(_capture_exception=True)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
    raise exception
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/session.py", line 3544, in _flush
    flush_context.execute()
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 453, in execute
    n.execute_aggregate(self, set_)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/unitofwork.py", line 717, in execute_aggregate
    dependency_processor.process_deletes(uow, states)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/dependency.py", line 552, in process_deletes
    self._synchronize(
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/dependency.py", line 610, in _synchronize
    sync.clear(dest, self.mapper, self.prop.synchronize_pairs)
  File "/appl/mca/airflow/lib64/python3.8/site-packages/sqlalchemy/orm/sync.py", line 86, in clear
    raise AssertionError(
AssertionError: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7f2166763520>'
[2023-06-21, 10:12:50 CEST] {standard_task_runner.py:100} ERROR - Failed to execute job 106133 for task clear_missing_dags (This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: Dependency rule tried to blank-out primary key column 'serialized_dag.dag_id' on instance '<SerializedDagModel at 0x7f2166763520>' (Background on this error at: https://sqlalche.me/e/14/7s2a); 290548)
[2023-06-21, 10:12:50 CEST] {local_task_job.py:208} INFO - Task exited with return code 1
[2023-06-21, 10:12:50 CEST] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check

@CarlosSilva1994
Copy link
Author

@waardd did you find a way to solve this issue?

@waardd
Copy link

waardd commented Jul 5, 2023

No i did not.
I have four environments and 3 out of them it worked perfect.
Only my prod environment it gives the error when i delete (or move) the DAG's i no longer use.
I have no clue how to solve this.
Maybe there is a sql statement that does the same thing to solve this particulair issue?

@Robert-Zacchigna
Copy link

Robert-Zacchigna commented Jul 13, 2023

I believe this is a dupe of this: #131

Try this as a solution: #131 (comment)

@CarlosSilva1994
Copy link
Author

@Robert-Zacchigna Thank you for your support!
I tried what you suggested and another error has occurred:

[2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:110} INFO - Entry: <DAG: freshchat_source_agent_performance_v2>
[2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:110} INFO - Entry: <DAG: freshchat_source_classic_v2>
[2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:111} INFO - Process will be Deleting 2 DAG(s)
[2023-07-27, 19:02:04 UTC] {airflow-clear-missing-dags.py:116} INFO - Performing Delete...
[2023-07-27, 19:02:04 UTC] {taskinstance.py:1768} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
psycopg2.errors.NotNullViolation: null value in column "dag_id" violates not-null constraint
DETAIL: Failing row contains (null, t, f, f, 2023-06-07 19:54:16.350838+00, null, null, null, null, /opt/*/dags/repo/dags/freshchat-source-v2/source.py, dataeng-team, null, grid, "0 8 * * ", null, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, 12, f, 1, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, f, At 08:00, /opt//dags/2b34c8bc3162c80e0fc303802a31153fcf484744/dags).

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 192, in execute_callable
return self.python_callable(self.op_args, self.op_kwargs)
File "/opt/airflow/dags/repo/dags/maintenance/airflow-clear-missing-dags.py", line 119, in clear_missing_dags_fn
session.query(DagModel).filter(DagModel.dag_id == entry.serialized_dag.dag_id).update(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 3306, in update
result = self.session.execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/session.py", line 1714, in execute
result = conn._execute_20(statement, params or {}, execution_options)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1705, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1572, in _execute_clauseelement
ret = self._execute_context(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1943, in _execute_context
self.handle_dbapi_exception(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 2124, in handle_dbapi_exception
util.raise
(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 211, in raise

raise exception
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1900, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.NotNullViolation) null value in column "dag_id" violates not-null constraint
DETAIL: Failing row contains (null, t, f, f, 2023-06-07 19:54:16.350838+00, null, null, null, null, /opt/
/dags/repo/dags/freshchat-source-v2/source.py, dataeng-team, null, grid, "0 8 * * ", null, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, 12, f, 1, 2022-03-01 08:00:00+00, 2022-03-02 08:00:00+00, f, At 08:00, /opt/**/dags/2b34c8bc3162c80e0fc303802a31153fcf484744/dags).

[SQL: UPDATE dag SET dag_id = dag_id=%(param_1)s WHERE dag.dag_id = %(dag_id_1)s]
[parameters: {'param_1': None, 'dag_id_1': 'freshchat_source_agent_performance_v2'}]
(Background on this error at: https://sqlalche.me/e/14/gkpj)
[2023-07-27, 19:02:04 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 26799212 for task clear_missing_dags ((psycopg2.errors.InFailedSqlTransaction) current transaction is aborted, commands ignored until end of transaction block

[SQL: SELECT task_instance.task_id AS task_instance_task_id, task_instance.dag_id AS task_instance_dag_id, task_instance.run_id AS task_instance_run_id, task_instance.map_index AS task_instance_map_index, task_instance.start_date AS task_instance_start_date, task_instance.end_date AS task_instance_end_date, task_instance.duration AS task_instance_duration, task_instance.state AS task_instance_state, task_instance.try_number AS task_instance_try_number, task_instance.max_tries AS task_instance_max_tries, task_instance.hostname AS task_instance_hostname, task_instance.unixname AS task_instance_unixname, task_instance.job_id AS task_instance_job_id, task_instance.pool AS task_instance_pool, task_instance.pool_slots AS task_instance_pool_slots, task_instance.queue AS task_instance_queue, task_instance.priority_weight AS task_instance_priority_weight, task_instance.operator AS task_instance_operator, task_instance.queued_dttm AS task_instance_queued_dttm, task_instance.queued_by_job_id AS task_instance_queued_by_job_id, task_instance.pid AS task_instance_pid, task_instance.executor_config AS task_instance_executor_config, task_instance.updated_at AS task_instance_updated_at, task_instance.external_executor_id AS task_instance_external_executor_id, task_instance.trigger_id AS task_instance_trigger_id, task_instance.trigger_timeout AS task_instance_trigger_timeout, task_instance.next_method AS task_instance_next_method, task_instance.next_kwargs AS task_instance_next_kwargs
FROM task_instance
WHERE task_instance.dag_id = %(dag_id_1)s AND task_instance.task_id = %(task_id_1)s AND task_instance.run_id = %(run_id_1)s AND task_instance.map_index = %(map_index_1)s]
[parameters: {'dag_id_1': 'airflow-clear-missing-dags', 'task_id_1': 'clear_missing_dags', 'run_id_1': 'manual__2023-07-27T19:01:56.177391+00:00', 'map_index_1': -1}]
(Background on this error at: https://sqlalche.me/e/14/2j85); 8978)
[2023-07-27, 19:02:04 UTC] {local_task_job.py:208} INFO - Task exited with return code 1
[2023-07-27, 19:02:04 UTC] {taskinstance.py:2578} INFO - 0 downstream tasks scheduled from follow-on schedule check

@Robert-Zacchigna
Copy link

hmm i think this is an issue with your airflow DB, its saying that some records in the "dag_id" table are null which is violating the not-null constraint:

psycopg2.errors.NotNullViolation: null value in column "dag_id" violates not-null constraint

As a result, the process cant complete. My assumption is that the function is first performing some kind of integrity check on the data (in this case the table row) before checking if it needs to be cleared/removed.

  • Have you done a DB migration in the past/recently when upgrading airflow versions?
    • And have you cleared out tables marked for archival after the migrations?
      • A banner would show up in the UI specifying which tables are archived and can be dropped.
  • Have you made forced deletions on the DB that might have caused the DB to have a null record for a (presumably now deleted) DAG?

Beyond the above, I'd need to see your script (if you modified it in anyway) and/or your DB (don't share the DB, that is not safe from a data protection perspective, I'm just saying where you need to look for problem resolution).

You could also try checking the status/health of your DB using the airflow CLI, see here: https://airflow.apache.org/docs/apache-airflow/stable/howto/usage-cli.html#cli-db-clean

MAKE A BACKUP OF THE DB FIRST BEFORE MODIFYING THE DB

@waardd
Copy link

waardd commented Jan 29, 2024

This solved it for me:

I believe this is a dupe of this: #131

Try this as a solution: #131 (comment)

tnx @Robert-Zacchigna Robert-Zacchigna

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants