Introduction
In this blog we will learn how to orchestrate databricks job using Airflow.
Architecture
Prerequisite
Airflow Environment
2. Databricks workflow
Configuration
Step 1: Create Connection in Airflow for Databricks workflow
1.1. Login as Admin to the Airflow
1.2 Add the connection
1.3 Add the Pat Token
1.4 Test the connection and Save
Step 2: Create DAG (Directed Acyclic Graph)
from airflow import DAG
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'airflow'
}
with DAG('databricks_dag',
start_date = days_ago(2),
schedule_interval = None,
default_args = default_args
) as dag:
opr_run_now = DatabricksRunNowOperator(
task_id = 'run_now',
databricks_conn_id = 'databricks_default',
job_id = JOB_ID
)
Step 3: Update the above code [JOB ID with the Databricks Job ID ] and save it as databricks_dag.py in the DAG working folder.
Step 4: Once you place the databricks_dag.py you will see the DAG in the portal.
This is my example. You can find your airflow installation folder and place the file in the dags folder.
Step 5: Verify the DAG is visible in the portal.
Running the DAG
We can run the dag manually or we can schedule it.
How to run Manaually.
Github Code
Troubleshooting
Item: 1
Databricks Connection type is missing
Install on the worker Node and webserver node
pip install apache-airflow-providers-databricks
Item: 2
Broken DAG: [/opt/airflow/dags/databricks_dag.py] Traceback (most recent call last): File "<frozen importlib._bootstrap>", line 219, in callwith_frames_removed File "/opt/airflow/dags/databricks_dag.py", line 2, in <module> from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator ModuleNotFoundError: No module named 'airflow.providers.databricks'
Install on all the server
Run Failure:
run failed with error message Could not acquire access token.
Comments