Replies: 1 comment
-
Here's an example of an asset that will launch a Databricks job using the /runs/submit endpoint: from dagster_databricks import DatabricksClientResource
from dagster import asset, Config, AssetExecutionContext, Definitions
from databricks.sdk.service import jobs
from pydantic import Field
_poll_interval_seconds = 5
_max_wait_time_seconds = 60 * 60 * 24 # 1 day
class DatabricksSubmitRunOpConfig(Config):
poll_interval_seconds: float = Field(
default=_poll_interval_seconds,
description="Check whether the Databricks Job is done at this interval, in seconds.",
)
max_wait_time_seconds: int = Field(
default=_max_wait_time_seconds,
description=(
"If the Databricks Job is not complete after this length of time, in seconds,"
" raise an error."
),
)
job_configuration: dict = Field(
description="Dictionary defining the parameters to be passed to the /runs/submit endpoint"
)
@asset()
def databricks_job_asset(
context: AssetExecutionContext,
config: DatabricksSubmitRunOpConfig,
databricks_client: DatabricksClientResource,
):
client = databricks_client.get_client()
jobs_service = client.workspace_client.jobs
run = jobs_service.submit(
tasks=[jobs.SubmitTask.from_dict(config.job_configuration)],
)
run_id: int = run.bind()["run_id"]
get_run_response = jobs_service.get_run(run_id=run_id)
context.log.info(
f"Launched databricks job run for '{get_run_response.run_name}' (`{run_id}`). URL:"
f" {get_run_response.run_page_url}. Waiting to run to complete."
)
client.wait_for_run_to_complete(
logger=context.log,
databricks_run_id=run_id,
poll_interval_sec=config.poll_interval_seconds,
max_wait_time_sec=config.max_wait_time_seconds,
) As you scale this to multiple assets you'll either need to make an asset factory (a function that returns an @asset -decorated function), or will want to abstract the innards into a regular python function so you can define assets with upstream dependencies. If you're not planning on having Dagster actually load any of the data and just want it for scheduling notebook job runs then this is a bit easier with a asset factory, because you can just define the dependencies with deps: def databricks_job_asset_factory(upstream_deps: list[AssetDefinition]):
@asset(deps=upstream_deps)
def _an_asset(context: AssetExecutionContext, config: DatabricksSubmitRunOpConfig, databricks_client: DatabricksClientResource):
...
return _an_asset |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
The user wants to create an asset that will launch a Databricks job using the /runs/submit endpoint. There is no existing functionality for this in
dagster-databricks
Beta Was this translation helpful? Give feedback.
All reactions