Is it possible to make Python VirtualEnv Operator aware of local modules ? #22175
-
Hello, I want to import a scraper script I've done with its own dependencies as an Apache Airflow DAG. But here's the thing, virtual env is a different environment where I can't access my local modules (this is not a bug, it's expected). Still, how can I do to solve my issue ? Should I publish every script on pip, so I can import it as a requirement ? Is there any official hack ?
# amazing_scraper_dag.py
@dag(dag_id= 'amazing_scraper', schedule_interval='@daily', start_date=datetime(2021, 1, 1), catchup=False, tags=['scrapper'])
def taskflow():
@task.virtualenv(
system_site_packages=True,
requirements=[
'aiohttp',
'asyncio',
'bs4',
'pandas',
'lxml',
]
)
def extract():
"""
#### Extract task
ERROR Here : Module solvolabs not found :'(
"""
# This function has my task requirements as direct dependencies
from solvolabs.my_scraper import scrape
return scrape()
extract()
dag = taskflow() I've found a lot of posts everywhere on GitHub and many hacks, but I'd like something more ... solid if possible. |
Beta Was this translation helpful? Give feedback.
Replies: 3 comments 14 replies
-
Package your This is the only approach that is not a hack that makes sense I think. |
Beta Was this translation helpful? Give feedback.
-
Hi ! Sorry to come back here : I've finally packaged my script and published it on PyPi : https://pypi.org/project/globalfirepower-scraper/ Then my dag script is refactored this way (so clean : I like it !) : import logging
import shutil
from datetime import datetime
from airflow.decorators import dag, task
log = logging.getLogger(__name__)
if not shutil.which("virtualenv"):
log.warning(
"This Dag requires virtualenv, please install it."
)
else:
@dag(dag_id= 'amazing_scraper', schedule_interval='@daily', start_date=datetime(2021, 1, 1), catchup=False, tags=['scrapper'])
def taskflow():
"""
### TaskFlow API example using virtualenv
"""
@task.virtualenv(
requirements=[
'globalfirepower-scraper==0.0.1',
],
python_version=3.9,
system_site_packages=True
)
def extract():
"""
#### Extract task
Scrape Global Fire Power Countries Ranking in 2022.
"""
from globalfirepower import GlobalFirePowerScraper
scraper = GlobalFirePowerScraper
return scraper.get_country_ranking_table()
countries_ranking = extract()
dag = taskflow() And ... it doesn't work, and I'm in pain finding why and how I can fix it. Things I've checked before coming here :
This is the output of my DAG :
Any idea ? I have exactly the same output on Heroku where my Python version is 3.9 in both setup and VirtualEnvOperator. Sorry if it's a silly question, I'm really new at Airflow and Data Engineering but eager to learn ! Regards |
Beta Was this translation helpful? Give feedback.
-
We've added the whole dags folder to PYTHONPATH=/opt/airflow/dags
# dag_do_some_etl.py
from acme_lib.salesforce_etl import utils_without_dependencies
...
@task.virtualenv(requirements=["some-required-dependency"])
def virtualenv_task():
from acme_lib.salesforce_etl import utils_with_dependencies
... BR |
Beta Was this translation helpful? Give feedback.
Package your
solvolabs
code inpip
package and add it as requirement to virtualenv. Implement pipeline where you can release a new version of the code topip
via "single click".This is the only approach that is not a hack that makes sense I think.