Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Db cleanup failes after upgrade to python3.9 and airflow2.2.2 #121

Open
hanizaidi110 opened this issue Dec 1, 2021 · 9 comments
Open

Db cleanup failes after upgrade to python3.9 and airflow2.2.2 #121

hanizaidi110 opened this issue Dec 1, 2021 · 9 comments

Comments

@hanizaidi110
Copy link

I have recently upgraded to Python3.9 Airflow2.2.2. Following error is occurring repeatedly after the upgrade. I've only changed the parameters indicated in the repo README and is running everything else the same. Other lib versions:
SQLAlchemy 1.4.1
Flask-SQLAlchemy 2.4.3
Can you please check the problem here?

Job 108936: Subtask cleanup_TaskInstance
Running <TaskInstance: admin_airflow_db_cleanup.cleanup_TaskInstance manual__2021-12-01T09:58:53.886445+00:00 [running]> on host 2e7d9eccb27e
Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=l9b0o0x6t8n3v7s3@vimcar.slack.com,team-bi@vimcar.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=admin_airflow_db_cleanup
AIRFLOW_CTX_TASK_ID=cleanup_TaskInstance
AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T09:58:53.886445+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T09:58:53.886445+00:00
Retrieving max_execution_date from XCom
Configurations:
max_date:                 2021-11-01 09:58:59.085046+00:00
enable_delete:            True
session:                  <sqlalchemy.orm.session.Session object at 0x7f607aed7c40>
airflow_db_model:         <class 'airflow.models.taskinstance.TaskInstance'>
state:                    None
age_check_column:         ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
keep_last:                False
keep_last_filters:        None
keep_last_group_by:       None

Running Cleanup Process...
Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/vimcar-bi/airflow_dags/admin_airflow_db_cleanup.py", line 291, in cleanup_function
    query = session.query(airflow_db_model).options(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1619, in options
    return self._options(False, *args)
  File "<string>", line 2, in _options
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/base.py", line 227, in generate
    fn(self, *args[1:], **kw)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1638, in _options
    opt.process_query(self)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 176, in process_query
    self._process(query, True)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 677, in _process
    val._bind_loader(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 829, in _bind_loader
    raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
Marking task as UP_FOR_RETRY. dag_id=admin_airflow_db_cleanup, task_id=cleanup_TaskInstance, execution_date=20211201T095853, start_date=20211201T170532, end_date=20211201T170532
Failed to execute job 108936 for task cleanup_TaskInstance
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/vimcar-bi/airflow_dags/admin_airflow_db_cleanup.py", line 291, in cleanup_function
    query = session.query(airflow_db_model).options(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1619, in options
    return self._options(False, *args)
  File "<string>", line 2, in _options
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/base.py", line 227, in generate
    fn(self, *args[1:], **kw)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1638, in _options
    opt.process_query(self)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 176, in process_query
    self._process(query, True)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 677, in _process
    val._bind_loader(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 829, in _bind_loader
    raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
Task exited with return code 1
0 downstream tasks scheduled from follow-on schedule check

@LukeHong
Copy link

LukeHong commented Dec 6, 2021

Seems duplicated with #117

@Jaroslaw-Rachwalski
Copy link

Jaroslaw-Rachwalski commented Jun 20, 2022

I think that the issue is very similar but I'm using airflow 2.3.2 from docker with python 3.7. I'm getting the same error only for following tasks: cleanup_TaskInstance, cleanup_BaseXCom, cleanup_TaskReschedule, cleanup_RenderedTaskInstanceFields. Other tasks finish successfully.

[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:265} INFO - Retrieving max_execution_date from XCom
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:278} INFO - Configurations:
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:279} INFO - max_date:                 2022-05-21 17:18:41.939540+00:00
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:280} INFO - enable_delete:            True
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:281} INFO - session:                  <sqlalchemy.orm.session.Session object at 0x7f7102e0c750>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:282} INFO - airflow_db_model:         <class 'airflow.models.taskinstance.TaskInstance'>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:283} INFO - state:                    None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:284} INFO - age_check_column:         ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:285} INFO - keep_last:                False
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:286} INFO - keep_last_filters:        None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:287} INFO - keep_last_group_by:       None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:289} INFO - 
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:291} INFO - Running Cleanup Process...
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/airflow-db-cleanup.py", line 298, in cleanup_function
    logging.info("INITIAL QUERY : " + str(query))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2848, in __str__
    return str(statement.compile(bind))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 506, in compile
    return self._compiler(dialect, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 570, in _compiler
    return dialect.statement_compiler(dialect, self, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 766, in __init__
    Compiled.__init__(self, dialect, statement, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 455, in __init__
    self.string = self.process(self.statement, **compile_kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 490, in process
    return obj._compiler_dispatch(self, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 81, in _compiler_dispatch
    return meth(self, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2981, in visit_select
    select_stmt, self, **kwargs
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/base.py", line 501, in create_for_statement
    return klass.create_for_statement(statement, compiler, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/context.py", line 579, in create_for_statement
    opt.process_compile_state(self)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 185, in process_compile_state
    self._process(compile_state, not bool(compile_state.current_path))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 718, in _process
    raiseerr,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 870, in _bind_loader
    "mapper option expects " "string key or list of attributes"
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=airflow-db-cleanup, task_id=cleanup_TaskInstance, execution_date=20220619T000000, start_date=20220620T171951, end_date=20220620T171952
[2022-06-20, 17:19:52 UTC] {logging_mixin.py:115} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/models/param.py:62 DeprecationWarning: The use of non-json-serializable params is deprecated and will be removed in a future release

@e-compagno
Copy link

e-compagno commented Jun 24, 2022

Tasks cleanup_TaskInstance and cleanup_TaskReschedule fail with Python 3.8.12 and Airflow 2.2.2. Same error sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes

This seems to be related to missing columns in the db tables to be cleaned for those two tasks.

I fixed it replacing execution_date with end_date in age_check_column for both cleanup_TaskInstance and cleanup_TaskReschedule. Let me know whether there is a more appropriate field that should be used.

I also suggest adding

# Extract db table columns' names
    table_columns = [str(colname).split(".")[1] for colname in airflow_db_model.__table__.columns]
    age_check_col_name = str(age_check_column).split(".")[1]

    if age_check_col_name not in table_columns:
        raise ValueError(f"{age_check_col_name} field not in table {airflow_db_model.__table__.name}")

in cleanup_function to check that the age_check_column selected for a specific column is found in the table to be cleaned

@Jaroslaw-Rachwalski
Copy link

Jaroslaw-Rachwalski commented Jun 25, 2022

I read thru #117 and it looks that comment from @PhilippDB makes sense and can be the solution.

@e-compagno please be aware of what wrote @tylerwmarrs - using start_date or end_date may refer to incorrect records that are not tied with records in dag_run table. The tables have following constrains in DDL:

CREATE TABLE task_instance(
    task_id character varying(250) NOT NULL,
    dag_id character varying(250) NOT NULL,
    run_id character varying(250) NOT NULL,
[...]
    PRIMARY KEY(task_id,dag_id,run_id,map_index),
    CONSTRAINT task_instance_dag_run_fkey FOREIGN key(dag_id) REFERENCES dag_run(dag_id),
    CONSTRAINT task_instance_dag_run_fkey FOREIGN key(run_id) REFERENCES dag_run(dag_id),
    CONSTRAINT task_instance_trigger_id_fkey FOREIGN key(trigger_id) REFERENCES trigger(id)
);

@utkarshgupta137
Copy link

GoogleCloudPlatform/python-docs-samples#7847 Fixed it for me

@e-compagno
Copy link

e-compagno commented Oct 24, 2022

The problem is still active in version 2.4.1. The cloud composer version does not fix TaskReschedule cleaning and also break the xcom cleaning operation as execution_date is not a field in the db. Is there any update on the issue resolution?

Would

"airflow_db_model": XCom,
    "age_check_column": XCom.timestamp,

fixes the issue?

@vincenzobaz
Copy link

XCom.timestamp solves the error on XCom, not the others

@lphcreat
Copy link

XCom.timestamp solves the error on XCom, not the others

you can find the time column in model file. for example TaskInstance you can use queued_dttm/end_date

@chawocecil
Copy link

chawocecil commented Dec 5, 2022

The problem is still active in version 2.4.1. The cloud composer version does not fix TaskReschedule cleaning and also break the xcom cleaning operation as execution_date is not a field in the db. Is there any update on the issue resolution?

Would

"airflow_db_model": XCom,
    "age_check_column": XCom.timestamp,

fixes the issue?

Fixes the issue for Xcom. For RenderedTaskInstanceFields use RenderedTaskInstanceFields.run_id to fix the issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants