Orchestrating data pipelines
Data orchestration refers to the automated and coordinated management of data workflows across various stages of the analytics life cycle. This process involves integrating, cleansing, transforming, and moving data from diverse sources to a data warehouse, where it can be readily accessed and analyzed for insights.
We have already covered how dbt empowers data engineers and data analysts to move away from repetitive stored procedures and employ version control and testing found in traditional software engineering to help run data workflows. dbt is largely used for the transformation part of the analytics workflow.
On the other hand, the goal of data orchestration in analytics engineering is to streamline the flow of data through different tools and processes, ensuring that it is accurate, timely, and in the right format for analysis. This involves scheduling dbt jobs in the correct sequence – for example, managing dependencies between tasks, handling errors gracefully, and ensuring data quality and compliance standards are met throughout the data pipeline.
In this section, we will talk about designing an automated workflow and introduce certain tools and libraries that help automate the loading and moving of data.
First, we’ll look at some considerations when designing an automated workflow.
Designing an automated workflow – considerations
When designing an automated workflow for orchestrating data, keep the following in mind:
- Complexity versus simplicity: Choose a tool that matches your team’s skill set and the complexity of your workflows. Integration efforts will vary significantly depending on whether data is coming from APIs, databases, file systems, or cloud services.
- Scalability: Consider future growth in data volume and complexity. Design workflows with scalability in mind, allowing them to handle increases in data volume and complexity without significant rework.
- Integration: Ensure the tool integrates well with your data stack.
- Maintenance: Consider the effort required to maintain and update your workflows.
- Cost: Factor in both direct costs and the indirect costs of tool complexity and maintenance.
Scalability is largely dependent on how flexible the tool is with increased data volume or new requirements for ingesting and extracting data. Ask yourself whether the volume of your data will change in the future by a certain order of magnitude and whether your current tooling will be able to adapt to this without incurring extra costs. Second, ask whether your current setup can be ported to a different tooling and whether it is modular enough that it can easily be maintained. Check whether it’s a simple exercise to update or modify parts of the workflow without impacting the entire system. Does it provide enough support for other tools in the future, if we were to change to a different tooling?
These questions will inevitably come up when deciding on an orchestration solution and are crucial to get right if you want to avoid technical debt and incur extra costs because of unforeseen maintenance.
Now that we have challenged you on how to approach automating your data orchestration, let’s have a look at certain tools that integrate well with dbt and are most commonly encountered within analytics engineering teams.
dbt Cloud
In this book, we have mostly focused on dbt and highlighted it as our main go-to tool in the analytics engineering toolbox. In Chapter 8, Hands-On Analytics Engineering, you got a glimpse of the features of dbt Cloud, and you saw how to set up dbt Cloud for use with Google BigQuery. This is why we will briefly discuss the orchestration feature here. dbt Cloud allows you to develop and define jobs, orchestrate them, and view documentation about your project, but its range of features is largely determined by the subscription you have.
As a hosted service, dbt Cloud comes with a subscription cost, which might be a consideration for smaller teams or projects. The main benefit is that dbt Cloud offers built-in job scheduling and monitoring, eliminating the need for external orchestration tools.
We will not go into detail on how to create and schedule jobs in dbt Cloud as the process is fairly straightforward if you follow the official dbt guide (https://docs.getdbt.com/docs/deploy/deploy-jobs). dbt Cloud is a standalone tool and luckily, you do not need to think about provisioning and maintaining the underlying infrastructure for orchestration.
However, keep in mind that while it offers convenience, some users might prefer the control and customization that’s possible and the lower price tag offered by self-hosted orchestration tools. As mentioned in Chapter 8, there are more sophisticated tools for job orchestration. Tools such as Airflow, Prefect, and Dagster have far better integration with other tooling compared to using dbt Cloud as your orchestration environment – for example, Airflow is configured in Python, which allows you to extend your workflow so that it includes orchestration of other notebooks or ML scripts. You can deploy and orchestrate it on whatever infrastructure you want and maintain everything in code, which is not possible with dbt Cloud.
The next tool we’ll discuss is open-source and is fairly popular within the data engineering community. We will not do a deep dive into it as that would require writing another book! Rather, we will explain the basics of the tool and highlight some features that are interesting when combined with dbt. We chose to focus on Airflow as this is the most common data orchestration tool. We also encourage you to check out good alternatives such as Dagster, Prefect, and Mage, which is also an upcoming promising tool.
Airflow
You may have heard of Apache Airflow, which is the most commonly encountered open-source tool for orchestrating data pipelines. It is frequently hailed as the Swiss Army knife of data engineering. Airflow allows you to author, schedule, and monitor workflows. It is highly customizable and can integrate with many data processing frameworks, databases, and cloud services (https://airflow.apache.org/).
A great benefit of sticking with Airflow is its widespread use within the data engineering and analytics community. There is plenty of documentation and integrations available with other tools and it is a highly flexible tool that uses Python to define highly complex workflows. It has a rich UI that enables monitoring and managing workflows.
Making use of Airflow in orchestrating your dbt jobs requires having dbt and Airflow be installed in the same environment, such as a Docker container. Airflow uses a database to track task status and manage metadata. You’ll need a running instance of a compatible database (for example, PostgreSQL, MySQL, and so on) and configure Airflow to use it so that this metadata can be stored. This can be useful to track task completion and failure over time.
When it comes to using Airflow with dbt, the most important thing to get right is workflow design, specifically task dependencies and error handling. You need to understand the dependencies between your dbt models to properly orchestrate them in Airflow tasks. This includes knowing which models need to be built before others can be run. Also, you need to plan for how to handle errors in your dbt runs, including retries, alerts, and potentially skipping or marking tasks as failed.
Airflow relies on operators. Operators are the building blocks of workflows as they define the individual tasks that make up a DAG, similar to a DAG in dbt.
Each operator represents a single task in a workflow, specifying what action to perform when that task is executed. Operators are designed to do one thing well and can be combined in a DAG to perform complex workflows. There are several types of operators in Airflow, each serving different purposes. We will not dive into all the operators available, but the most common ones are action operators.
These operators execute a specific action or task, such as running a Python function, executing a SQL query, or sending an HTTP request. Here are some examples:
- BashOperator: This executes a Bash command or script. This can be used to run a
dbt build
command, for example, as dbt Core is used with the command line. - PythonOperator: Executes a Python function.
- HttpOperator: Sends an HTTP request.
Note
This chapter features code snippets designed to showcase best practices or usage. However, executing these snippets may need extra setup, not covered in this book. Readers should view these snippets as illustrative examples and adapt the underlying best practices to their unique scenarios.
The following code snippet illustrates the use of a simple BashOperator
for the simple task of running dbt run -models
. This will run all dbt models under the models
directory:
from pendulum import datetime from airflow.decorators import dag from airflow.operators.bash import BashOperator PATH_TO_DBT_PROJECT = "<path to your dbt project>" PATH_TO_DBT_VENV = "<path to your venv activate binary>" @dag( start_date=datetime(2023, 3, 23), schedule="@daily", catchup=False, ) def simple_dbt_dag(): dbt_run = BashOperator( task_id="dbt_run", bash_command="source $PATH_TO_DBT_VENV && dbt run --models .", env={"PATH_TO_DBT_VENV": PATH_TO_DBT_VENV}, cwd=PATH_TO_DBT_PROJECT, ) simple_dbt_dag()
The downside of using a default BashOperator
is that for debugging purposes, you have no observability on task execution – that is, on what went wrong in dbt. If a single model fails in a large project, you will not have observability of what failed, and it can be costly to rerun everything. You will have to check your run_results.json
artifact file (which is a log file that’s generated after each dbt run and contains metadata on model execution) to see what has gone wrong.
Luckily, there is a specific operator that better integrates with dbt. This operator can execute dbt Cloud jobs, poll for status, and download job artifacts and, under the hood, leverages the dbt Cloud Administrative API to trigger jobs. We will explain the features of this API in more detail later.
You can use DbtCloudRunJobOperator
to trigger a run of a dbt Cloud job. By default, the operator will periodically check on the status of the executed job to terminate with a successful status every pre-defined number of seconds or until the job reaches a certain timeout length of execution time (https://airflow.apache.org/docs/apache-airflow-providers-dbt-cloud/stable/operators.html):
trigger_job_run1 = DbtCloudRunJobOperator( task_id="trigger_job_run1", job_id=48617, check_interval=10, timeout=300, )
The preceding code snippet illustrates the use of this operator, where it triggers a certain job with job_id
48617 in dbt Cloud and has a timeout set (which is also a configurable parameter when calling the API) to limit execution time.
The biggest limitation here is that you have to use dbt Cloud. Essentially, this operator just calls dbt Cloud under the hood. It has some parameters that can help you monitor your run better compared to just using BashOperator
, such as certain polling parameters, but there is a better alternative available!
The better alternative is Cosmos, which is an open-source library that dynamically generates DAGs from other tools or frameworks for Airflow. In Chapter 9, Data Quality and Obervability (Figure 9.4), we saw that DAGs are a way to visualize data transformation: each node in this chart represents a model, and thus it helps understand how tables are related to each other. In the context of Airflow, a DAG collects all tasks together, organized with dependencies and relationships to say how they should run, in which order they should run, and what tasks depend on each other. Cosmos automatically creates Airflow tasks from dbt models.
Here are the benefits of using Cosmos with Airflow and dbt Core:
- Using Airflow’s data-aware scheduling and Airflow sensors to run models that depend on other events can be useful.
- Each dbt model can be turned into a task, complete with Airflow features such as retries and error notifications, as well as full observability into past runs directly in the Airflow UI.
- We can run dbt tests on tables created by individual models immediately after a model has completed. We can catch issues early and configure some other data quality tools to run alongside dbt tests.
- We can run dbt projects using Airflow connections instead of dbt profiles. We can store all your connections in one place, directly within Airflow, or by using a secrets backend. This centralizes secrets and connection details, which makes them easier to maintain.
- It leverages native support for installing and running dbt in a virtual environment to avoid dependency conflicts with Airflow.
Have a look at the following code snippet, which imports the necessary packages from Airflow and Cosmos, including PostgresOperator
and DbtTaskGroup
, to trigger a dbt run and subsequently query this model in Postgres:
from airflow.decorators import dag from airflow.providers.postgres.operators.postgres import PostgresOperator from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig # adjust for other database types from cosmos.profiles import PostgresUserPasswordProfileMapping from pendulum import datetime import os YOUR_NAME = "<your_name>" CONNECTION_ID = "db_conn" DB_NAME = "<your_db_name>" SCHEMA_NAME = „<your_schema_name>" MODEL_TO_QUERY = "model2" # The path to the dbt project DBT_PROJECT_PATH = f"{os.environ['AIRFLOW_HOME']}/dags/dbt/my_simple_dbt_project" # The path where Cosmos will find the dbt executable # in the virtual environment created in the Dockerfile DBT_EXECUTABLE_PATH = f"{os.environ['AIRFLOW_HOME']}/dbt_venv/bin/dbt"
Notice the DBT_PROJECT_PATH
environment variable, which points to your dbt project, as well as DBT_EXECUTABLE_PATH
, which points to your dbt installation. You can see how rather straightforward it is to get started with orchestrating your dbt models from Airflow with Cosmos. The following snippet shows the remainder of the definition of the DAG:
profile_config = ProfileConfig( profile_name="default", target_name="dev", profile_mapping=PostgresUserPasswordProfileMapping( conn_id=CONNECTION_ID, profile_args={"schema": SCHEMA_NAME}, ), ) execution_config = ExecutionConfig( dbt_executable_path=DBT_EXECUTABLE_PATH, ) @dag( start_date=datetime(2023, 8, 1), schedule=None, catchup=False, params={"my_name": YOUR_NAME}, ) def my_simple_dbt_dag(): transform_data = DbtTaskGroup( group_id="transform_data", project_config=ProjectConfig(DBT_PROJECT_PATH), profile_config=profile_config, execution_config=execution_config, operator_args={ "vars": '{"my_name": {{ params.my_name }} }', }, default_args={"retries": 2}, ) query_table = PostgresOperator( task_id="query_table", postgres_conn_id=CONNECTION_ID, sql=f"SELECT * FROM {DB_NAME}.{SCHEMA_NAME}.{MODEL_TO_QUERY}", ) transform_data >> query_table my_simple_dbt_dag()
Notice how simple the DAG is – it leverages the DbtTaskGroup
class to create a task group from the models in your dbt project. The dependencies between dbt models are turned into dependencies between Airflow tasks automatically. This saves a lot of time if you had to define this yourself. All we had to specify was profile_config
and execution_config
, which specify which profile to use for connection and the path to the dbt executable. Cosmos will parse your dbt project files to define a DAG for you. Moreover, we can also run a query on Postgres using PostgresOperator
, which allows us to combine several tasks into one workflow.
Now that you have a better understanding of the considerations involved in automating data workflows and recognize the added flexibility of using a tool such as Airflow, let’s dive into another important workflow that helps us automate the testing of our data: CI.