Skip to content

Source

commands

bundle

command(metadata, default_key, overwrite, resource_generator, env, conf_source, pipeline, params)

Databricks Asset Bundle commands

Source code in src/kedro_databricks/commands/bundle.py
@click.command(name="bundle")
@option.default_key
@option.overwrite
@option.resource_generator
@option.env
@option.conf_source
@option.pipeline
@option.params
@click.pass_obj
def command(
    metadata: ProjectMetadata,
    default_key: str,
    overwrite: bool,
    resource_generator: str,
    env: str,
    conf_source: str,
    pipeline: str | None,
    params: str | None,
):
    """Databricks Asset Bundle commands"""
    # If the configuration directory does not exist, Kedro will not load any configuration
    local_config_dir = metadata.project_path / conf_source / env
    if not local_config_dir.exists():
        log.warning(f"Creating {local_config_dir.relative_to(metadata.project_path)}")
        local_config_dir.mkdir(parents=True)

    with KedroSession.create(project_path=metadata.project_path, env=env) as session:
        overrides = _load_kedro_env_config(session=session)
        if not overrides:
            raise NoOverridesError(Path(conf_source))
        elif "resources" not in overrides:
            raise NoResourcesKeyError(env)

        ResourceGenerator = RESOURCE_GENERATOR_RESOLVER.resolve(resource_generator)

        g = ResourceGenerator(
            session=session,
            metadata=metadata,
            conf_source=conf_source,
            params=params,
        )

        all_resources = {"jobs": g.generate_jobs(pipeline)}
        overridden_resources = {}
        for resource_type, resource_override_items in overrides["resources"].items():
            overridden_resources[resource_type] = {}
            resource_items = all_resources.get(resource_type, {})
            overrider = RESOURCE_OVERRIDER_RESOLVER.resolve(resource_type)()
            all_keys = set(resource_items.keys()).union(
                set(resource_override_items.keys())
            )
            for key in all_keys:
                if key == default_key or key.startswith(config.regex_prefix):
                    continue
                resource = resource_items.get(key, {})
                overridden_resources[resource_type][key] = overrider.override(
                    resource_key=key,
                    resource=resource,
                    overrides=copy.deepcopy(resource_override_items),
                    default_key=default_key,
                )

        _save_resources(
            metadata=metadata,
            env=env,
            resources=overridden_resources,
            overwrite=overwrite,
        )

deploy

command(ctx, env, bundle, default_key, resource_generator, conf_source, pipeline, params, runtime_params, databricks_args)

Deploy the Databricks Asset Bundle.

This function deploys the Databricks Asset Bundle in the current project directory. It also creates a Databricks configuration file and a Databricks target configuration file.

Source code in src/kedro_databricks/commands/deploy.py
@click.command(name="deploy")
@option.env
@option.bundle
@option.default_key
@option.resource_generator
@option.conf_source
@option.pipeline
@option.params
@option.databricks_args
@option.runtime_params
@click.pass_context
def command(
    ctx: click.Context,
    env: str,
    bundle: bool,
    default_key: str,
    resource_generator: str,
    conf_source: str,
    pipeline: str | None,
    params: str | None,
    runtime_params: str | None,
    databricks_args: tuple[str, ...],
):
    """Deploy the Databricks Asset Bundle.

    This function deploys the Databricks Asset Bundle in the current project
    directory. It also creates a Databricks configuration file and a
    Databricks target configuration file.
    """
    if runtime_params:
        warnings.warn(
            "'--runtime-params' has been renamed to '--params' to be consistent with `kedro databricks bundle`"
        )
        params = runtime_params
    metadata = ctx.obj
    if not isinstance(metadata, ProjectMetadata):
        raise TypeError("Project metadata is not available in the context.")
    if bundle:
        ctx.invoke(
            bundle_command,
            env=env,
            default_key=default_key,
            conf_source=conf_source,
            resource_generator=resource_generator,
            pipeline=pipeline,
            params=params,
            overwrite=True,
        )
    dbcli = DatabricksCli(metadata, env=env, additional_args=list(databricks_args))
    dbcli.deploy()
    log.info(f"Deployed Databricks Asset Bundle in {metadata.project_path}")
    dbcli.upload()
    log.info(f"Uploaded project data to Databricks from {metadata.project_path}")
    dbcli.summary()

destroy

command(metadata, env, databricks_args)

Databricks Asset Bundle Destroy commands

Source code in src/kedro_databricks/commands/destroy.py
@click.command(name="destroy")
@option.env
@option.databricks_args
@click.pass_obj
def command(metadata: ProjectMetadata, env: str, databricks_args: tuple[str, ...]):
    """Databricks Asset Bundle Destroy commands"""
    dbcli = DatabricksCli(
        metadata=metadata,
        env=env,
        additional_args=list(databricks_args),
    )
    dbcli.destroy()

init

command(metadata, catalog, schema, default_key, env, conf_source, resource_generator, regex_prefix, overwrite, databricks_args)

Initialize a Kedro project for Databricks Asset Bundles.

Source code in src/kedro_databricks/commands/init.py
@click.command(name="init")
@option.catalog
@option.schema
@option.default_key
@option.env
@option.conf_source
@option.resource_generator
@option.regex_prefix
@option.overwrite
@option.databricks_args
@click.pass_obj
def command(
    metadata: ProjectMetadata,
    catalog: str,
    schema: str,
    default_key: str,
    env: str,
    conf_source: str,
    resource_generator: str,
    regex_prefix: str,
    overwrite: bool,
    databricks_args: tuple[str, ...],
):
    """Initialize a Kedro project for Databricks Asset Bundles."""
    log.info("Initializing Databricks Asset Bundle...")
    dbcli = DatabricksCli(metadata, additional_args=list(databricks_args))
    assets_dir, template_params = _prepare_template(metadata)
    config_path = metadata.project_path / "databricks.yml"
    if config_path.exists() and not overwrite:
        raise FileExistsError(f"`databricks.yml` already exist at {config_path}")
    elif config_path.exists() and overwrite:
        config_path.unlink(missing_ok=True)
    validated_conf = dbcli.init(assets_dir, template_params)
    log.info(f"Initialized Databricks Asset Bundle in {metadata.project_path}")
    _create_target_configs(
        metadata,
        default_key=default_key,
        default_catalog=catalog,
        default_schema=schema,
        validated_conf=validated_conf,
    )
    _update_gitignore(metadata)
    _update_pyproject(
        metadata,
        Config(
            init_catalog=catalog,
            init_schema=schema,
            default_env=env,
            conf_source=conf_source,
            workflow_default_key=default_key,
            workflow_generator=resource_generator,
            regex_prefix=regex_prefix,
        ),
    )
    hooks_path = metadata.project_path / "src" / metadata.package_name / "hooks.py"
    if hooks_path.exists():
        _transform_spark_hook(hooks_path.as_posix())
    if require_databricks_run_script():  # pragma: no cover - Might be removed in future
        log.warning(
            "Kedro version less than 0.19.8 requires a script to run tasks on Databricks. "
        )
        _write_databricks_run_script(metadata)
    log.info(
        f"Successfully initialized Databricks Asset Bundle in {metadata.project_path}"
    )

run

command(metadata, env, pipeline, databricks_args)

Databricks Asset Bundle Run commands

Source code in src/kedro_databricks/commands/run.py
@click.command(name="run")
@option.pipeline_arg
@option.env
@option.databricks_args
@click.pass_obj
def command(
    metadata: ProjectMetadata,
    env: str,
    pipeline: str,
    databricks_args: tuple[str, ...],
):
    """Databricks Asset Bundle Run commands"""
    # If the first argument starts with '--', it means no pipeline was provided
    # This is to handle the case where the user wants to pass only options to the CLI
    # e.g. `databricks bundle run -- --profile prod` will run the default pipeline with the prod profile
    if pipeline.startswith("--"):
        databricks_args = (pipeline,) + databricks_args
        pipeline = ""

    if not pipeline:
        pipeline = metadata.package_name

    dbcli = DatabricksCli(
        metadata=metadata,
        env=env,
        additional_args=list(databricks_args),
    )
    dbcli.run(pipeline)
    log.info(
        f"Successfully triggered Databricks job for pipeline '{pipeline}' in project {metadata.project_path}"
    )

config

Config

Bases: BaseSettings

conf_source = Field(default='conf') class-attribute instance-attribute

Path of a directory where project configuration is stored

default_env = Field(default='dev') class-attribute instance-attribute

Default target environment for kedro-databricks commands.

init_catalog = Field(default='workspace') class-attribute instance-attribute

Default catalog for kedro databricks init

init_schema = Field(default='default') class-attribute instance-attribute

Default schema for kedro databricks init

regex_prefix = Field(default='re:') class-attribute instance-attribute

Prefix to use for discovering regex workflow or task overrides. Must end in ':'

workflow_default_key = Field(default='default') class-attribute instance-attribute

Default key to use for overrides in kedro databricks bundle

workflow_generator = Field(default='node') class-attribute instance-attribute

Default generator to use for generating Databricks Asset Bundle resources

constants

JOB_KEY_ORDER = ['name', 'description', 'parameters', 'environments', 'job_clusters', 'tasks', 'access_control_list', 'budget_policy_id', 'continuous', 'deployment', 'edit_mode', 'email_notifications', 'format', 'git_source', 'health', 'max_concurrent_runs', 'notification_settings', 'performance_target', 'queue', 'run_as', 'schedule', 'tags', 'timeout_seconds', 'trigger', 'webhook_notifications'] module-attribute

Order of keys in the job configuration for Databricks jobs.

KEDRO_VERSION = Version(metadata.version('kedro')) module-attribute

Kedro version used to build this plugin.

MAX_TASK_KEY_LENGTH = 100 module-attribute

Maximum number of characters in a task key in Databricks jobs.

MINIMUM_DATABRICKS_VERSION = [0, 205, 0] module-attribute

Minimum Databricks version required for this plugin.

TASK_KEY_ORDER = ['task_key', 'description', 'depends_on', 'environment_key', 'job_cluster_key', 'existing_cluster_id', 'libraries', 'new_cluster', 'compute', 'disable_auto_optimization', 'health', 'run_if', 'max_retries', 'min_retry_interval_millis', 'retry_on_timeout', 'timeout_seconds', 'notification_settings', 'email_notifications', 'webhook_notifications', 'alert_task', 'clean_rooms_notebook_task', 'condition_task', 'dashboard_task', 'dbt_task', 'for_each_task', 'notebook_task', 'pipeline_task', 'power_bi_task', 'python_wheel_task', 'run_job_task', 'spark_jar_task', 'spark_python_task', 'spark_submit_task', 'sql_task'] module-attribute

Order of keys in the task configuration for Databricks jobs.

plugin

commands()

Entry point for Kedro-Databricks commands

Source code in src/kedro_databricks/plugin.py
@click.group(name="Kedro-Databricks")
def commands():
    """Entry point for Kedro-Databricks commands"""
    pass

databricks_commands()

Databricks Asset Bundle commands

These commands are used to manage Databricks Asset Bundles in a Kedro project. They allow you to initialize, bundle, deploy, run, and destroy Databricks asset bundles.

Source code in src/kedro_databricks/plugin.py
@commands.group(
    cls=Plugin,
    commands_dir=Path(__file__).parent / "commands",
    name="databricks",
)
def databricks_commands():
    """Databricks Asset Bundle commands

    These commands are used to manage Databricks Asset Bundles in a Kedro project.
    They allow you to initialize, bundle, deploy, run, and destroy Databricks asset bundles.
    """
    pass

resource_generator

Resource generators for building Databricks bundle resources.

This package exposes concrete generators that transform Kedro pipelines into Databricks Asset Bundle resources (jobs). Select the appropriate generator via RESOURCE_GENERATORS to produce resources at node or pipeline granularity.

AbstractResourceGenerator(session, metadata, conf_source='conf', params=None)

Bases: ABC

Generates Databricks resources for the given pipelines.

Finds all pipelines in the project and generates Databricks asset bundle resources for each according to the Databricks REST API

Source code in src/kedro_databricks/resource_generator/abstract_resource_generator.py
def __init__(
    self,
    session: KedroSession,
    metadata: ProjectMetadata,
    conf_source: str = "conf",
    params: str | None = None,
) -> None:
    self.metadata = metadata
    self.context = session.load_context()
    self.pipelines: MutableMapping = pipelines
    self.remote_conf_dir = f"/${{workspace.file_path}}/{conf_source}"
    self.params = params

generate_jobs(pipeline_name=None)

Generate Databricks resources for the given pipelines.

Finds all pipelines in the project and generates Databricks asset bundle resources for each according to the Databricks REST API

Parameters:

Name Type Description Default
pipeline_name str | None

The name of the pipeline for which Databricks asset bundle resources should be generated. If None, generates all pipelines.

None

Returns:

Type Description
dict[str, dict[str, Any]]

dict[str, dict[str, Any]]: A dictionary of pipeline names and their Databricks resources

Source code in src/kedro_databricks/resource_generator/abstract_resource_generator.py
def generate_jobs(
    self, pipeline_name: str | None = None
) -> dict[str, dict[str, Any]]:
    """Generate Databricks resources for the given pipelines.

    Finds all pipelines in the project and generates Databricks asset bundle resources
    for each according to the Databricks REST API

    Args:
        pipeline_name (str | None): The name of the pipeline for which Databricks asset bundle resources should be generated.
            If None, generates all pipelines.

    Returns:
        dict[str, dict[str, Any]]: A dictionary of pipeline names and their Databricks resources
    """
    jobs = {}
    pipeline = self.pipelines.get(pipeline_name)
    if pipeline_name and pipeline:
        log.info(f"Generating resources for pipeline '{pipeline_name}'")
        name = self._make_job_name(self.metadata.package_name, pipeline_name)
        jobs[name] = self._create_job(
            name=name,
            pipeline=pipeline,
            pipeline_name=pipeline_name,
        )
        return jobs
    if pipeline_name:
        raise KeyError(
            f"Pipeline '{pipeline_name}' not found. Available pipelines: {list(self.pipelines.keys())}"
        )

    for registered_pipeline_name, registered_pipeline in self.pipelines.items():
        if len(registered_pipeline.nodes) == 0:
            continue
        name = self._make_job_name(
            self.metadata.package_name, registered_pipeline_name
        )
        job = self._create_job(
            name=name,
            pipeline=registered_pipeline,
            pipeline_name=registered_pipeline_name,
        )
        log.debug(f"Job '{name}' successfully created.")
        log.debug(job)
        jobs[name] = job

    return jobs

NodeResourceGenerator(session, metadata, conf_source='conf', params=None)

Bases: AbstractResourceGenerator

Generate a job with one Databricks task per Kedro node.

Source code in src/kedro_databricks/resource_generator/node_resource_generator.py
def __init__(
    self,
    session: KedroSession,
    metadata: ProjectMetadata,
    conf_source: str = "conf",
    params: str | None = None,
) -> None:
    super().__init__(session, metadata, conf_source, params)
    undeclared_datasets = self._get_memory_datasets()
    if len(undeclared_datasets) > 0:
        raise MemoryDatasetError(self, undeclared_datasets)

PipelineResourceGenerator(session, metadata, conf_source='conf', params=None)

Bases: AbstractResourceGenerator

Generate a job with a single task for the whole pipeline.

Source code in src/kedro_databricks/resource_generator/abstract_resource_generator.py
def __init__(
    self,
    session: KedroSession,
    metadata: ProjectMetadata,
    conf_source: str = "conf",
    params: str | None = None,
) -> None:
    self.metadata = metadata
    self.context = session.load_context()
    self.pipelines: MutableMapping = pipelines
    self.remote_conf_dir = f"/${{workspace.file_path}}/{conf_source}"
    self.params = params

abstract_resource_generator

Base interfaces and helpers for Databricks resource generation.

This module defines the abstract generator responsible for converting Kedro pipelines into Databricks jobs according to the Databricks REST API. Concrete implementations specify how tasks are laid out (e.g., per-node or per-pipeline).

AbstractResourceGenerator(session, metadata, conf_source='conf', params=None)

Bases: ABC

Generates Databricks resources for the given pipelines.

Finds all pipelines in the project and generates Databricks asset bundle resources for each according to the Databricks REST API

Source code in src/kedro_databricks/resource_generator/abstract_resource_generator.py
def __init__(
    self,
    session: KedroSession,
    metadata: ProjectMetadata,
    conf_source: str = "conf",
    params: str | None = None,
) -> None:
    self.metadata = metadata
    self.context = session.load_context()
    self.pipelines: MutableMapping = pipelines
    self.remote_conf_dir = f"/${{workspace.file_path}}/{conf_source}"
    self.params = params
generate_jobs(pipeline_name=None)

Generate Databricks resources for the given pipelines.

Finds all pipelines in the project and generates Databricks asset bundle resources for each according to the Databricks REST API

Parameters:

Name Type Description Default
pipeline_name str | None

The name of the pipeline for which Databricks asset bundle resources should be generated. If None, generates all pipelines.

None

Returns:

Type Description
dict[str, dict[str, Any]]

dict[str, dict[str, Any]]: A dictionary of pipeline names and their Databricks resources

Source code in src/kedro_databricks/resource_generator/abstract_resource_generator.py
def generate_jobs(
    self, pipeline_name: str | None = None
) -> dict[str, dict[str, Any]]:
    """Generate Databricks resources for the given pipelines.

    Finds all pipelines in the project and generates Databricks asset bundle resources
    for each according to the Databricks REST API

    Args:
        pipeline_name (str | None): The name of the pipeline for which Databricks asset bundle resources should be generated.
            If None, generates all pipelines.

    Returns:
        dict[str, dict[str, Any]]: A dictionary of pipeline names and their Databricks resources
    """
    jobs = {}
    pipeline = self.pipelines.get(pipeline_name)
    if pipeline_name and pipeline:
        log.info(f"Generating resources for pipeline '{pipeline_name}'")
        name = self._make_job_name(self.metadata.package_name, pipeline_name)
        jobs[name] = self._create_job(
            name=name,
            pipeline=pipeline,
            pipeline_name=pipeline_name,
        )
        return jobs
    if pipeline_name:
        raise KeyError(
            f"Pipeline '{pipeline_name}' not found. Available pipelines: {list(self.pipelines.keys())}"
        )

    for registered_pipeline_name, registered_pipeline in self.pipelines.items():
        if len(registered_pipeline.nodes) == 0:
            continue
        name = self._make_job_name(
            self.metadata.package_name, registered_pipeline_name
        )
        job = self._create_job(
            name=name,
            pipeline=registered_pipeline,
            pipeline_name=registered_pipeline_name,
        )
        log.debug(f"Job '{name}' successfully created.")
        log.debug(job)
        jobs[name] = job

    return jobs

node_resource_generator

Node-level Databricks resource generator.

Creates a Databricks job where each Kedro node becomes an individual task with appropriate dependencies derived from the pipeline graph.

NodeResourceGenerator(session, metadata, conf_source='conf', params=None)

Bases: AbstractResourceGenerator

Generate a job with one Databricks task per Kedro node.

Source code in src/kedro_databricks/resource_generator/node_resource_generator.py
def __init__(
    self,
    session: KedroSession,
    metadata: ProjectMetadata,
    conf_source: str = "conf",
    params: str | None = None,
) -> None:
    super().__init__(session, metadata, conf_source, params)
    undeclared_datasets = self._get_memory_datasets()
    if len(undeclared_datasets) > 0:
        raise MemoryDatasetError(self, undeclared_datasets)

pipeline_resource_generator

Pipeline-level Databricks resource generator.

Creates a Databricks job with a single task that runs an entire Kedro pipeline in one go.

PipelineResourceGenerator(session, metadata, conf_source='conf', params=None)

Bases: AbstractResourceGenerator

Generate a job with a single task for the whole pipeline.

Source code in src/kedro_databricks/resource_generator/abstract_resource_generator.py
def __init__(
    self,
    session: KedroSession,
    metadata: ProjectMetadata,
    conf_source: str = "conf",
    params: str | None = None,
) -> None:
    self.metadata = metadata
    self.context = session.load_context()
    self.pipelines: MutableMapping = pipelines
    self.remote_conf_dir = f"/${{workspace.file_path}}/{conf_source}"
    self.params = params

resource_overrider

Resource overriders for templating Databricks bundle resources.

This package exposes concrete overriders that Databricks Asset Resources through overrides. Select the appropriate overrider via RESOURCE_OVERRIDERS to produce the desired behavior.

AbstractResourceOverrider

Bases: ABC

Abstract base class for resource overriders.

DefaultResourceOverrider

Bases: AbstractResourceOverrider

A default resource overrider that performs no overrides.

override(resource_key, resource, overrides, default_key=config.workflow_default_key)

Return the overrides unchanged.

Parameters:

Name Type Description Default
resource_key str

The key identifying the resource.

required
default_key str

The default key for overrides.

workflow_default_key
resource dict[str, Any]

The original resource dictionary.

required
overrides dict[str, Any]

The overrides to apply.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: resource with overrides applied (unchanged).

Source code in src/kedro_databricks/resource_overrider/default_resource_overrider.py
def override(
    self,
    resource_key: str,
    resource: dict[str, Any],
    overrides: dict[str, Any],
    default_key: str = config.workflow_default_key,
) -> dict[str, Any]:
    """Return the overrides unchanged.

    Args:
        resource_key: The key identifying the resource.
        default_key: The default key for overrides.
        resource: The original resource dictionary.
        overrides: The overrides to apply.

    Returns:
        dict[str, Any]: resource with overrides applied (unchanged).
    """
    if not isinstance(resource, dict):
        raise ValueError(f"resource must be a dictionary not {type(resource)}")
    if not isinstance(overrides, dict):
        raise ValueError(f"overrides must be a dictionary not {type(overrides)}")
    specific_overrides = overrides.pop(resource_key, {})
    default_overrides = overrides.pop(default_key, {})
    all_overrides = {**default_overrides, **specific_overrides}
    return sort_dict({**resource, **all_overrides})

JobsResourceOverrider

Bases: AbstractResourceOverrider

Override a Databricks jobs resource with the default key.

override(resource_key, resource, overrides, default_key=config.workflow_default_key)

Override the resources in a Databricks bundle.

This function applies the given overrides to the resources in a Databricks bundle.

Parameters:

Name Type Description Default
resource_key str

the key identifying the resource

required
resource Dict

the Databricks jobs to override

required
overrides Dict

the overrides to apply

required
default_key str

the default key to use for overrides

workflow_default_key

Raises:

Type Description
ValueError

if the job or overrides are not dictionaries

ValueError

if the key in overrides is not found in OVERRIDE_KEY_MAP

Returns:

Type Description
dict[str, Any]

Dict[str, Any]: the Databricks bundle with the overrides applied

Source code in src/kedro_databricks/resource_overrider/jobs_resource_overrider.py
def override(
    self,
    resource_key: str,
    resource: dict[str, Any],
    overrides: dict[str, Any],
    default_key: str = config.workflow_default_key,
) -> dict[str, Any]:
    """Override the resources in a Databricks bundle.

    This function applies the given overrides to the resources in a Databricks bundle.

    Args:
        resource_key (str): the key identifying the resource
        resource (Dict): the Databricks jobs to override
        overrides (Dict): the overrides to apply
        default_key (str): the default key to use for overrides

    Raises:
        ValueError: if the job or overrides are not dictionaries
        ValueError: if the key in overrides is not found in OVERRIDE_KEY_MAP

    Returns:
        Dict[str, Any]: the Databricks bundle with the overrides applied
    """
    if not isinstance(resource, dict):
        raise ValueError(f"resource must be a dictionary not {type(resource)}")
    if not isinstance(overrides, dict):
        raise ValueError(f"overrides must be a dictionary not {type(overrides)}")
    overrider = create_merge_factory(
        merge_functions={
            "tasks": lambda old, new: _tasks_overrider(
                old, new, default_key=default_key
            ),
            "environments": lambda old, new: merge_list_of_dicts_by_key(
                old or [], new or [], key="environment_key"
            ),
            "job_clusters": lambda old, new: merge_list_of_dicts_by_key(
                old or [], new or [], key="job_cluster_key"
            ),
            "access_control_list": _access_control_list_overrider,
            "webhook_notifications": _notification_overrider,
            "health": _health_overrider,
            "parameters": lambda old, new: merge_list_of_dicts_by_key(
                old, new, key="name"
            ),
        },
        key_order=JOB_KEY_ORDER,
    )
    default_overrides = overrides.pop(default_key, {})
    regex_overrides = get_regex_values(resource_key, overrides)
    resource_overrides = overrides.pop(resource_key, {})
    overriden = reduce(
        overrider,
        [
            resource,
            default_overrides,
            regex_overrides,
            resource_overrides,
        ],
    )
    return overriden

abstract_resource_overrider

AbstractResourceOverrider

Bases: ABC

Abstract base class for resource overriders.

default_resource_overrider

DefaultResourceOverrider

Bases: AbstractResourceOverrider

A default resource overrider that performs no overrides.

override(resource_key, resource, overrides, default_key=config.workflow_default_key)

Return the overrides unchanged.

Parameters:

Name Type Description Default
resource_key str

The key identifying the resource.

required
default_key str

The default key for overrides.

workflow_default_key
resource dict[str, Any]

The original resource dictionary.

required
overrides dict[str, Any]

The overrides to apply.

required

Returns:

Type Description
dict[str, Any]

dict[str, Any]: resource with overrides applied (unchanged).

Source code in src/kedro_databricks/resource_overrider/default_resource_overrider.py
def override(
    self,
    resource_key: str,
    resource: dict[str, Any],
    overrides: dict[str, Any],
    default_key: str = config.workflow_default_key,
) -> dict[str, Any]:
    """Return the overrides unchanged.

    Args:
        resource_key: The key identifying the resource.
        default_key: The default key for overrides.
        resource: The original resource dictionary.
        overrides: The overrides to apply.

    Returns:
        dict[str, Any]: resource with overrides applied (unchanged).
    """
    if not isinstance(resource, dict):
        raise ValueError(f"resource must be a dictionary not {type(resource)}")
    if not isinstance(overrides, dict):
        raise ValueError(f"overrides must be a dictionary not {type(overrides)}")
    specific_overrides = overrides.pop(resource_key, {})
    default_overrides = overrides.pop(default_key, {})
    all_overrides = {**default_overrides, **specific_overrides}
    return sort_dict({**resource, **all_overrides})

jobs_resource_overrider

JobsResourceOverrider

Bases: AbstractResourceOverrider

Override a Databricks jobs resource with the default key.

override(resource_key, resource, overrides, default_key=config.workflow_default_key)

Override the resources in a Databricks bundle.

This function applies the given overrides to the resources in a Databricks bundle.

Parameters:

Name Type Description Default
resource_key str

the key identifying the resource

required
resource Dict

the Databricks jobs to override

required
overrides Dict

the overrides to apply

required
default_key str

the default key to use for overrides

workflow_default_key

Raises:

Type Description
ValueError

if the job or overrides are not dictionaries

ValueError

if the key in overrides is not found in OVERRIDE_KEY_MAP

Returns:

Type Description
dict[str, Any]

Dict[str, Any]: the Databricks bundle with the overrides applied

Source code in src/kedro_databricks/resource_overrider/jobs_resource_overrider.py
def override(
    self,
    resource_key: str,
    resource: dict[str, Any],
    overrides: dict[str, Any],
    default_key: str = config.workflow_default_key,
) -> dict[str, Any]:
    """Override the resources in a Databricks bundle.

    This function applies the given overrides to the resources in a Databricks bundle.

    Args:
        resource_key (str): the key identifying the resource
        resource (Dict): the Databricks jobs to override
        overrides (Dict): the overrides to apply
        default_key (str): the default key to use for overrides

    Raises:
        ValueError: if the job or overrides are not dictionaries
        ValueError: if the key in overrides is not found in OVERRIDE_KEY_MAP

    Returns:
        Dict[str, Any]: the Databricks bundle with the overrides applied
    """
    if not isinstance(resource, dict):
        raise ValueError(f"resource must be a dictionary not {type(resource)}")
    if not isinstance(overrides, dict):
        raise ValueError(f"overrides must be a dictionary not {type(overrides)}")
    overrider = create_merge_factory(
        merge_functions={
            "tasks": lambda old, new: _tasks_overrider(
                old, new, default_key=default_key
            ),
            "environments": lambda old, new: merge_list_of_dicts_by_key(
                old or [], new or [], key="environment_key"
            ),
            "job_clusters": lambda old, new: merge_list_of_dicts_by_key(
                old or [], new or [], key="job_cluster_key"
            ),
            "access_control_list": _access_control_list_overrider,
            "webhook_notifications": _notification_overrider,
            "health": _health_overrider,
            "parameters": lambda old, new: merge_list_of_dicts_by_key(
                old, new, key="name"
            ),
        },
        key_order=JOB_KEY_ORDER,
    )
    default_overrides = overrides.pop(default_key, {})
    regex_overrides = get_regex_values(resource_key, overrides)
    resource_overrides = overrides.pop(resource_key, {})
    overriden = reduce(
        overrider,
        [
            resource,
            default_overrides,
            regex_overrides,
            resource_overrides,
        ],
    )
    return overriden

utilities

common

get_arg_value(args, arg_name)

Get the value of an argument from a list of arguments.

Parameters:

Name Type Description Default
args List[str]

list of arguments

required
arg_name str

name of the argument to get the value for

required

Returns:

Type Description
str | None

str | None: value of the argument or None if not found

Source code in src/kedro_databricks/utilities/common.py
def get_arg_value(args: list[str], arg_name: str) -> str | None:
    """Get the value of an argument from a list of arguments.

    Args:
        args (List[str]): list of arguments
        arg_name (str): name of the argument to get the value for

    Returns:
        str | None: value of the argument or None if not found
    """
    for i, arg in enumerate(args):
        if "=" in arg:
            _arg, value = arg.split("=", 1)
            if _arg == arg_name:
                return value
        elif arg == arg_name:
            return args[i + 1]

get_entry_point(project_name)

Get the entry point for a project.

Parameters:

Name Type Description Default
project_name str

name of the project

required

Returns:

Name Type Description
str str

entry point for the project

Source code in src/kedro_databricks/utilities/common.py
def get_entry_point(project_name: str) -> str:
    """Get the entry point for a project.

    Args:
        project_name (str): name of the project

    Returns:
        str: entry point for the project
    """
    entrypoint = project_name.strip().lower()
    entrypoint = re.sub(r" +", " ", entrypoint)
    entrypoint = re.sub(r"[^a-zA-Z_]", "-", entrypoint)
    entrypoint = re.sub(r"(-+)$", "", entrypoint)
    entrypoint = re.sub(r"^(-+)", "", entrypoint)
    return entrypoint

get_regex_values(lookup_key, values, regex_prefix='re:')

Get values matching a regex

Parameters:

Name Type Description Default
lookup_key str

key to match regexes to

required
values dict[str, Any]

The overrides to apply.

required
regex_prefix str

prefix that identifies a key as a regex

're:'

Returns:

Type Description
dict[str, Any]

dict[str, Any]: values where lookup key matches the regex

Source code in src/kedro_databricks/utilities/common.py
def get_regex_values(
    lookup_key: str,
    values: dict[str, Any],
    regex_prefix: str = "re:",
) -> dict[str, Any]:
    """Get values matching a regex

    Args:
        lookup_key (str): key to match regexes to
        values (dict[str, Any]): The overrides to apply.
        regex_prefix (str): prefix that identifies a key as a regex

    Returns:
        dict[str, Any]: values where lookup key matches the regex
    """
    matched_values = {}
    for key, value in values.items():
        if key.startswith(regex_prefix):
            pattern = key[len(regex_prefix) :]
            if re.match(pattern, lookup_key):
                return value
    return matched_values

get_value_from_dotpath(ddict, dotpath)

Get a value from a dictionary using a dotpath.

Parameters:

Name Type Description Default
ddict Any

dictionary to get the value from

required
dotpath str

dotpath to get the value for

required

Returns:

Name Type Description
Any Any

value from the dictionary or None if not found

Source code in src/kedro_databricks/utilities/common.py
def get_value_from_dotpath(ddict, dotpath) -> Any:
    """Get a value from a dictionary using a dotpath.

    Args:
        ddict (Any): dictionary to get the value from
        dotpath (str): dotpath to get the value for

    Returns:
        Any: value from the dictionary or None if not found
    """
    if not isinstance(ddict, dict):
        return None
    keys = dotpath.split(".")
    key = keys.pop(0)
    current_level = ddict.get(key)
    if current_level is None:
        return None
    elif len(keys) > 0:
        return get_value_from_dotpath(current_level, ".".join(keys))
    else:
        return current_level

remove_nulls(value)

Remove None values from a dictionary or list.

Parameters:

Name Type Description Default
value Dict[Any, Any] | List[Dict[Any, Any]]

dictionary or list to remove None values from

required

Returns:

Type Description
dict[str, Any] | list[Any]

Dict[Any, Any] | List[Dict[Any, Any]]: dictionary or list with None values removed

Source code in src/kedro_databricks/utilities/common.py
def remove_nulls(value: dict[str, Any] | list[Any]) -> dict[str, Any] | list[Any]:
    """Remove None values from a dictionary or list.

    Args:
        value (Dict[Any, Any] | List[Dict[Any, Any]]): dictionary or list to remove None values from

    Returns:
        Dict[Any, Any] | List[Dict[Any, Any]]: dictionary or list with None values removed
    """
    non_null = copy.deepcopy(value)
    if isinstance(non_null, dict):
        _remove_nulls_from_dict(non_null)
    elif isinstance(non_null, list):
        _remove_nulls_from_list(non_null)
    return non_null

require_databricks_run_script(_version=KEDRO_VERSION)

Check if the current Kedro version is less than 0.19.8.

Kedro 0.19.8 introduced a new run_script method that is required for running tasks on Databricks. This method is not available in earlier versions of Kedro. This function checks if the current Kedro version is less than 0.19.8.

Returns:

Name Type Description
bool bool

whether the current Kedro version is less than 0.19.8

Source code in src/kedro_databricks/utilities/common.py
def require_databricks_run_script(_version=KEDRO_VERSION) -> bool:
    """Check if the current Kedro version is less than 0.19.8.

    Kedro 0.19.8 introduced a new `run_script` method that is required for
    running tasks on Databricks. This method is not available in earlier
    versions of Kedro. This function checks if the current Kedro version is
    less than 0.19.8.

    Returns:
        bool: whether the current Kedro version is less than 0.19.8
    """
    return _version < Version("0.19.8")

sanitize_name(node)

Sanitize the node name to be used as a task key in Databricks.

Parameters:

Name Type Description Default
node Node | str

Kedro node object or node name

required

Returns:

Name Type Description
str str

sanitized task key

Source code in src/kedro_databricks/utilities/common.py
def sanitize_name(node: Node | str) -> str:
    """Sanitize the node name to be used as a task key in Databricks.

    Args:
        node (Node | str): Kedro node object or node name

    Returns:
        str: sanitized task key
    """
    if isinstance(node, str):
        _name = node
    else:
        _name = node.name

    if not re.match(r"^[\w\-\_]+$", _name):  # Ensure the name is valid
        log.warning(
            f"Node name '{_name}' contains invalid characters and will be sanitized. "
            "To avoid this use an explicit node name as `node(..., name='valid_name')`."
        )

        _name = re.sub(r"[^\w\_]", "_", _name)
        _name = re.sub(r"_{2,}", "_", _name).lstrip("_").rstrip("_")

    if len(_name) > MAX_TASK_KEY_LENGTH:  # Ensure the name is not too long
        log.warning(
            f"Node name '{_name}' is too long. "
            f"Truncating to {MAX_TASK_KEY_LENGTH} characters."
        )
        _name = _name[:MAX_TASK_KEY_LENGTH]

    return _name

sort_dict(d, key_order=None)

Recursively sort the keys of a dictionary.

Parameters:

Name Type Description Default
d Dict[Any, Any]

dictionary to sort

required
key_order List[str]

list of keys to sort by

None

Returns:

Type Description
dict[Any, Any]

Dict[Any, Any]: dictionary with ordered values

Source code in src/kedro_databricks/utilities/common.py
def sort_dict(d: dict[Any, Any], key_order: list[str] | None = None) -> dict[Any, Any]:
    """Recursively sort the keys of a dictionary.

    Args:
        d (Dict[Any, Any]): dictionary to sort
        key_order (List[str]): list of keys to sort by

    Returns:
        Dict[Any, Any]: dictionary with ordered values
    """
    if key_order is None:
        key_order = []
    other_keys = [k for k in d.keys() if k not in key_order]
    order = key_order + other_keys

    return dict(sorted(d.items(), key=lambda x: order.index(x[0])))

version_to_str(version)

Convert a version list to a string.

Requires version to follow semantic versioning (3 parts: major, minor, patch).

Parameters:

Name Type Description Default
version List[int]

version list to convert

required

Returns:

Name Type Description
str str

version string

Source code in src/kedro_databricks/utilities/common.py
def version_to_str(version: list[int]) -> str:
    """Convert a version list to a string.

    Requires version to follow semantic versioning (3 parts: major, minor, patch).

    Args:
        version (List[int]): version list to convert

    Returns:
        str: version string
    """
    if len(version) != 3:  # noqa: PLR2004 - Semantic versioning requires 3 parts
        raise ValueError(f"Invalid version: {version}")
    return ".".join(str(x) for x in version)

databricks_cli

DatabricksCli(metadata, env=config.default_env, additional_args=None)

Databricks CLI command collection.

Parameters:

Name Type Description Default
additional_args list[str] | None

Additional arguments to be passed to the databricks CLI.

None
Source code in src/kedro_databricks/utilities/databricks_cli.py
def __init__(
    self,
    metadata: ProjectMetadata,
    env: str = config.default_env,
    additional_args: list[str] | None = None,
):
    """Initialize the Databricks CLI command collection.

    Args:
        additional_args (list[str] | None): Additional arguments to be passed to the
            `databricks` CLI.
    """
    self.log = get_logger("databricks_cli")
    if additional_args is None:
        additional_args = []
    self.metadata = metadata
    self.env = env
    self.args = additional_args
    self._check_self(warn=False)

logger

Logger configuration for the kedro-databricks package.

This module sets up a logger for the kedro-databricks package, allowing for structured logging and easier debugging. The logger is configured to log messages at the INFO level by default, but this can be overridden by setting the LOG_LEVEL environment variable.

get_logger(name)

Get a logger with the specified name.

This function retrieves a logger instance with the given name, which is a child of the root logger for the kedro-databricks package

Parameters:

Name Type Description Default
name str

The name of the logger.

required

Returns:

Type Description
Logger

logging.Logger: The logger instance.

Source code in src/kedro_databricks/utilities/logger.py
def get_logger(name: str) -> logging.Logger:
    """Get a logger with the specified name.

    This function retrieves a logger instance with the given name, which is a child of the root logger
    for the `kedro-databricks` package

    Args:
        name (str): The name of the logger.

    Returns:
        logging.Logger: The logger instance.
    """
    ROOT_LOGGER = logging.getLogger("kedro-databricks")
    return ROOT_LOGGER.getChild(name)

plugin

Plugin(commands_dir, **kwargs)

Bases: Group

Kedro-Databricks plugin for Kedro CLI

Source code in src/kedro_databricks/utilities/plugin.py
def __init__(self, commands_dir: Path, **kwargs):
    super().__init__(**kwargs)
    self.commands_dir = commands_dir

resolver_generics

Generic, composable resource resolvers.

This module defines a small set of primitives to resolve arbitrary "resources" from string identifiers. It includes:

  • RegistryResourceResolver: resolves from a given mapping/registry.
  • ModuleResourceResolver: resolves a dotted path (module.attr) and optionally validates the resulting attribute.
  • CompositeResourceResolver: chains multiple resolvers and aggregates their errors for better diagnostics.

These utilities allow flexible lookups (e.g., built-in names via a registry, or user-provided dotted paths) while surfacing clear error messages.

CompositeResourceResolver(resolvers) dataclass

Bases: Generic[ResourceType], ResourceResolver[ResourceType]

resolve(value)

Try multiple resolvers in order, returning the first success.

If all resolvers fail, a combined ResourceNotFoundError is raised containing the individual failure messages for easier debugging.

Source code in src/kedro_databricks/utilities/resolver_generics.py
def resolve(self, value: str) -> ResourceType:
    """Try multiple resolvers in order, returning the first success.

    If all resolvers fail, a combined ``ResourceNotFoundError`` is raised
    containing the individual failure messages for easier debugging.
    """
    errors: list[ResourceResolverError] = []
    for resolver in self.resolvers:
        try:
            return resolver.resolve(value)
        except ResourceResolverError as exc:
            errors.append(exc)
    raise ResourceNotFoundError.for_errors(errors)

ModuleResourceResolver(validate_fn=None) dataclass

Bases: Generic[ResourceType], ResourceResolver[ResourceType]

resolve(value)

Resolve a dotted path (module.attr) to a Python attribute.

If validate_fn is provided, the resolved attribute must satisfy it; otherwise a ResourceInvalidError is raised.

Raises:

Type Description
ResourceImportError

If the value is not of the form module.attribute or the module import fails.

ResourceNotFoundError

If the module is found but the attribute is missing.

ResourceInvalidError

If validate_fn is provided and returns False for the resolved attribute.

Source code in src/kedro_databricks/utilities/resolver_generics.py
def resolve(self, value: str) -> ResourceType:
    """Resolve a dotted path (``module.attr``) to a Python attribute.

    If ``validate_fn`` is provided, the resolved attribute must satisfy it;
    otherwise a ``ResourceInvalidError`` is raised.

    Raises:
        ResourceImportError: If the value is not of the form
            ``module.attribute`` or the module import fails.
        ResourceNotFoundError: If the module is found but the attribute
            is missing.
        ResourceInvalidError: If ``validate_fn`` is provided and returns
            ``False`` for the resolved attribute.
    """
    try:
        module_path, attribute_name = value.rsplit(".", 1)
    except ValueError as exc:
        raise ResourceImportError.for_value(
            value, "Invalid resource path format, expected 'module.attribute'"
        ) from exc

    try:
        module = __import__(module_path, fromlist=[attribute_name])
    except ImportError as exc:
        raise ResourceImportError.for_value(
            value, f"Module '{module_path}' could not be imported"
        ) from exc

    try:
        attribute = getattr(module, attribute_name)
    except AttributeError as exc:
        raise ResourceNotFoundError.for_value(
            value,
            f"Attribute '{attribute_name}' not found in module '{module_path}'",
        ) from exc

    if self.validate_fn and not self.validate_fn(attribute):
        raise ResourceInvalidError.for_value(value)

    return attribute

RegistryResourceResolver(registry, default=None) dataclass

Bases: Generic[ResourceType], ResourceResolver[ResourceType]

resolve(value)

Resolve using a static registry mapping.

Raises ResourceNotFoundError when the key is missing.

Source code in src/kedro_databricks/utilities/resolver_generics.py
def resolve(self, value: str) -> ResourceType:
    """Resolve using a static registry mapping.

    Raises ``ResourceNotFoundError`` when the key is missing.
    """
    if value not in self.registry and self.default is None:
        raise ResourceNotFoundError.for_value(value)
    elif value not in self.registry and self.default is not None:
        return self.default
    return self.registry[value]

ResourceImportError

Bases: ResourceResolverError

Raised when a resource cannot be imported.

Typical causes include: - The value is not a dotted path of the form module.attribute. - The specified module cannot be imported.

This error is commonly emitted by ModuleResourceResolver when the path format is invalid or the module import fails.

for_value(value, message=None) classmethod

Construct an error for a single value.

Parameters:

Name Type Description Default
value str

The unresolved identifier.

required
message str | None

Optional details about why resolution failed.

None

Returns:

Name Type Description
ResourceImportError ResourceImportError

A descriptive error instance.

Source code in src/kedro_databricks/utilities/resolver_generics.py
@classmethod
def for_value(cls, value: str, message: str | None = None) -> "ResourceImportError":
    """Construct an error for a single value.

    Args:
        value: The unresolved identifier.
        message: Optional details about why resolution failed.

    Returns:
        ResourceImportError: A descriptive error instance.
    """
    return cls(
        f"Resource '{value}' could not be imported: {message}."
        if message
        else f"Resource '{value}' could not be imported."
    )

ResourceInvalidError

Bases: ResourceResolverError

Raised when a resource is invalid.

Emitted when a resolved value fails a post-resolution validation (e.g., does not satisfy a provided validate_fn). This is distinct from ResourceNotFoundError, which indicates the resource could not be located at all.

for_value(value, message=None) classmethod

Construct an error for a single value.

Parameters:

Name Type Description Default
value str

The unresolved identifier.

required
message str | None

Optional details about why resolution failed.

None

Returns:

Name Type Description
ResourceInvalidError ResourceInvalidError

A descriptive error instance.

Source code in src/kedro_databricks/utilities/resolver_generics.py
@classmethod
def for_value(
    cls, value: str, message: str | None = None
) -> "ResourceInvalidError":
    """Construct an error for a single value.

    Args:
        value: The unresolved identifier.
        message: Optional details about why resolution failed.

    Returns:
        ResourceInvalidError: A descriptive error instance.
    """
    return cls(
        f"Resource '{value}' is invalid: {message}."
        if message
        else f"Resource '{value}' is invalid."
    )

ResourceNotFoundError

Bases: ResourceResolverError

Raised when a resource cannot be resolved.

Provides helpers to produce consistent, user-friendly error messages for single- and multi-resolver failures.

for_value(value, message=None) classmethod

Construct an error for a single value.

Parameters:

Name Type Description Default
value str

The unresolved identifier.

required
message str | None

Optional details about why resolution failed.

None

Returns:

Name Type Description
ResourceNotFoundError ResourceNotFoundError

A descriptive error instance.

Source code in src/kedro_databricks/utilities/resolver_generics.py
@classmethod
def for_value(
    cls, value: str, message: str | None = None
) -> "ResourceNotFoundError":
    """Construct an error for a single value.

    Args:
        value: The unresolved identifier.
        message: Optional details about why resolution failed.

    Returns:
        ResourceNotFoundError: A descriptive error instance.
    """
    return cls(
        f"Resource '{value}' not found: {message}."
        if message
        else f"Resource '{value}' not found."
    )

ResourceResolver

Bases: ABC, Generic[ResourceType]

Abstract protocol for resolving a value into a resource of type T.

resolve(value) abstractmethod

Resolve a string identifier into a resource.

Parameters:

Name Type Description Default
value str

The identifier to resolve.

required

Returns:

Type Description
ResourceType

The resolved resource.

Raises:

Type Description
ResourceNotFoundError

If the resource cannot be found.

Source code in src/kedro_databricks/utilities/resolver_generics.py
@abstractmethod
def resolve(self, value: str) -> ResourceType:
    """Resolve a string identifier into a resource.

    Args:
        value: The identifier to resolve.

    Returns:
        The resolved resource.

    Raises:
        ResourceNotFoundError: If the resource cannot be found.
    """

ResourceResolverError

Bases: Exception

Base error for all resource resolver failures.

This is the common ancestor for all resolver exceptions and is also used by composite resolvers to aggregate multiple failures into a single exception with a readable, concatenated message.

for_errors(errors) classmethod

Construct an aggregated resolver error.

Parameters:

Name Type Description Default
errors Sequence[ResourceResolverError]

The list of individual ResourceResolverError instances raised by underlying resolvers.

required

Returns:

Name Type Description
ResourceResolverError ResourceResolverError

A single error whose message concatenates

ResourceResolverError

the messages of all provided errors, separated by semicolons.

Source code in src/kedro_databricks/utilities/resolver_generics.py
@classmethod
def for_errors(
    cls, errors: Sequence["ResourceResolverError"]
) -> "ResourceResolverError":
    """Construct an aggregated resolver error.

    Args:
        errors: The list of individual ``ResourceResolverError`` instances
            raised by underlying resolvers.

    Returns:
        ResourceResolverError: A single error whose message concatenates
        the messages of all provided errors, separated by semicolons.
    """
    messages = "; ".join(str(error) for error in errors)
    return cls(f"Multiple resource resolution errors: {messages}")