Why connect Workstream to dbt Core?
When you connect your Workstream.io instance with dbt Core, we will detect data incidents, alert your team of them in Slack, and streamline your data incident management form end to end.
This includes knowing when your dbt models, tests, and sources fail; registering associated incidents for your team to investigate and manage; alerting your team via Slack; and populating a status page for any asset that is configured as an exposure in dbt.
Workstream also presents failing dashboards and data incidents in our environment insights page. This gives your team a centralized place to monitor data quality trends, and more easily identify urgent issues with your assets.
How do I connect Workstream to dbt Core?
Please note that the process for connecting Workstream.io to dbt Core will depend on what orchestrator you use for your production deployment of dbt Core. We have guides for Deploying on GitHub Actions and Deploying on Apache Airflow. If you use Dagster, Prefect, or another solution, please reach out to us directly via Intercom or [email protected]. We are more than happy to help get you going.
The rest of this article will cover the general process and principles that will apply generally for most deployments, including local development and testing.
Step 1. Obtain API credentials from Workstream.
After logging into Workstream, click on your workspace name in the top-left corner. Then click Settings > Connections, and then click the “+ Connect” button under dbt Core. Copy these credentials to a safe place; possibly to a GitHub Secret in your dbt project repository.
Step 2. Install the workstream-dbt-core
Python package.
Add the workstream-dbt-core
package (PyPI link) to the Python project you use to run dbt Core. That could mean adding the package name to requirements.txt
, running poetry add workstream-dbt-core
, or (with your virtual environment activated) simply running pip install workstream-dbt-core
.
This package has zero dependencies other than dbt ≥ 1.5.1, <2.0
, so it should not cause any package conflict issues.
The workstream-dbt-core
package installs a CLI command, workstream report
. That command looks for the target
directory produced by an invocation of dbt, and posts the manifest.json
, run_results.json
, and sources.json
files in that directory to a Workstream API endpoint. Workstream never sees your dbt project code or the data in your data warehouse, only the metadata from your dbt invocations.
Step 3. Authorize and Configure the workstream-dbt-core
client.
Configuration is simple, and can be done through command-line options or environment variables. At a minimum, you will need to pass the Workstream API client ID and secret from Step 1 to the client, either through the cli options --client-id
and --client-secret
, or through the environment variables WORKSTREAM_DBT_CLIENT_ID
and WORKSTREAM_DBT_CLIENT_SECRET
. You may also need to pass a --target-path
or set WORKSTREAM_DBT_TARGET_PATH
if you do not invoke workstream report
from your dbt project directory, or if you have configured a target path in your dbt project that is not the default.
Step 4. Run workstream report
.
You will need to run workstream report
after each individual dbt command. This is because dbt will overwrite the manifest.json
and run_results.json
files in your target
directory with every invocation, and workstream report
depends on those files.
If you have a shell script, you need to be careful with the exit codes of each command. dbt will exit with a non-zero code on a freshness error, run error, or test failure; this exit code could prevent workstream report
from being invoked by the shell script. To force your script to continue to the next command, even if dbt fails, you should suffix the dbt command with || true
. For example, the following script will run workstream report
after dbt run
, even if dbt run
fails:
dbt run || true
workstream report
If you rely on dbt's exit codes to cause your workflow to fail, you can configure workstream report
to exit with a non-zero code on a dbt failure by passing the --exit-nonzero
option:
dbt run || true
workstream report --exit-nonzero
Deploying on GitHub Actions
Since a GitHub Actions workflow is a series of shell scripts, many of the points above apply.
Obtain credentials from Workstream. We recommend you store these as GitHub Repository Secrets or GitHub Environment Secrets so you can access them in your workflow.
Add the
workstream-dbt-core
package (PyPI link) to the Python project you use to run dbt Core. That could mean adding the package name torequirements.txt
, runningpoetry add workstream-dbt-core
, or in your GitHub Actions workflow, adding a step withpip install workstream-dbt-core
.Update the GitHub Actions Workflow that defines your production dbt runs to invoke the
workstream report
command after eachdbt run
,dbt test
, ordbt source freshness
. Be careful with exit codes: GitHub will abort the workflow if any shell command exits with a nonzero code. By default,workstream report
will always exit with a code of 0; however, if you *want* the GitHub workflow to fail on a dbt failure, you should useworkstream report --exit-nonzero
. For example, this workflow step will never stop after the freshness call, but may stop ifdbt run
ordbt test
fail:- name: Run dbt build pipeline
run: |
dbt source freshness || true
workstream report
dbt seed
dbt run || true
workstream report --exit-nonzero
dbt test || true
workstream report --exit-nonzero
env:
WORKSTREAM_DBT_CLIENT_ID: ${{ secrets.WORKSTREAM_DBT_CLIENT_ID }}
WORKSTREAM_DBT_CLIENT_SECRET: ${{ secrets.WORKSTREAM_DBT_CLIENT_SECRET }}After merging your PR, your next production run of dbt will report its results to Workstream, so you can use Workstream’s incident management features to manage dbt failures.
Deploying on Apache Airflow
There are many ways to deploy dbt Core on Airflow. Here we'll cover the two most common ways, using the BashOperator
and the PythonOperator
. The most important aspect of this solution (for most Airflow deployments) is that the Workstream Report step must run in the same operator as the dbt invocation, since we depend on files created by the dbt invocation. This requirement can be relaxed if your dbt operator copies its files to durable storage, like an S3 bucket.
We'll follow the same steps as the local deployment:
Obtain credentials from Workstream. We recommend you store these as Airflow Variables or Secrets, if you have a Secrets backend configured. The worker executing your
BashOperator
orPythonOperator
will need access to these credentials.Add the
workstream-dbt-core
package (PyPI link) to your Airflow project or to the environment the operator executes in (if usingPythonVirtualenvOperator
,ExternalPythonOperator
,DockerOperator
,KubernetesPodOperator
, etc.).Update the task that invokes dbt to also invoke
workstream report
or call its Python API. It is important that this occurs in the same task that invokes dbt, and not a subsequent task, which won't have access to the dbt artifact files.
Here is an example DAG that invokesdbt run
anddbt test
in two separateBashOperator
tasks, with each reporting their results to Workstream:
from pathlib import Path
import textwrap
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG("dbt-workstream") as dag:
dbt_proj_path = Path.home() / "..." / "dbt-proj"
dbt_run = BashOperator(
task_id="run_dbt",
bash_command=textwrap.dedent(
f"""
dbt run --project-dir {dbt_proj_path} || true
workstream report \
--exit-nonzero \
--target-path {dbt_proj_path / "target"}
""".strip()
),
env={
"WORKSTREAM_DBT_CLIENT_ID": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_ID') }}",
"WORKSTREAM_DBT_CLIENT_SECRET": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_SECRET') }}",
},
append_env=True,
)
dbt_test = BashOperator(
task_id="test_dbt",
bash_command=textwrap.dedent(
f"""
dbt test --project-dir {dbt_proj_path} || true
workstream report \
--exit-nonzero \
--target-path {dbt_proj_path / "target"}
""".strip()
),
env={
"WORKSTREAM_DBT_CLIENT_ID": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_ID') }}",
"WORKSTREAM_DBT_CLIENT_SECRET": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_SECRET') }}",
},
append_env=True,
)
dbt_run >> dbt_test
Here is an equivalent example that uses thePythonOperator
and the Python API for dbt and workstream_dbt_core:
from pathlib import Path
from airflow import DAG, AirflowException
from airflow.decorators import task
from airflow.models import Variable
with DAG("dbt-workstream-py") as dag:
dbt_proj_path = Path.home() / "..." / "dbt-proj"
@task(task_id="run_dbt")
def dbt_run():
from dbt.cli.main import dbtRunner
from workstream_dbt_core.api import report_invocation
dbt = dbtRunner()
dbt_result = dbt.invoke(["run", "--project-dir", str(dbt_proj_path)])
ws_result = report_invocation(
target_path=dbt_proj_path / "target",
client_id=Variable.get("WORKSTREAM_DBT_CLIENT_ID"),
client_secret=Variable.get("WORKSTREAM_DBT_CLIENT_SECRET"),
)
ws_result.print_report(exit_nonzero=True)
if not dbt_result.success:
raise AirflowException("dbt run failed.")
@task(task_id="test_dbt")
def dbt_test():
from dbt.cli.main import dbtRunner
from workstream_dbt_core.api import report_invocation
dbt = dbtRunner()
dbt_result = dbt.invoke(["test", "--project-dir", str(dbt_proj_path)])
ws_result = report_invocation(
target_path=dbt_proj_path / "target",
client_id=Variable.get("WORKSTREAM_DBT_CLIENT_ID"),
client_secret=Variable.get("WORKSTREAM_DBT_CLIENT_SECRET"),
)
ws_result.print_report(exit_nonzero=True)
if not dbt_result.success:
raise AirflowException("dbt test failed.")
dbt_run() >> dbt_test()After updating your DAG, your next run of Airflow will report its results to Workstream, so you can use Workstream’s incident management features to manage dbt failures.