Connecting my dbt Core instance

Connect dbt Core to detect and alert your team of data incidents

Nicholas Freund avatar
Written by Nicholas Freund
Updated over a week ago

Why connect Workstream to dbt Core?

When you connect your Workstream.io instance with dbt Core, we will detect data incidents, alert your team of them in Slack, and streamline your data incident management form end to end.

This includes knowing when your dbt models, tests, and sources fail; registering associated incidents for your team to investigate and manage; alerting your team via Slack; and populating a status page for any asset that is configured as an exposure in dbt.

Workstream also presents failing dashboards and data incidents in our environment insights page. This gives your team a centralized place to monitor data quality trends, and more easily identify urgent issues with your assets.

How do I connect Workstream to dbt Core?

Please note that the process for connecting Workstream.io to dbt Core will depend on what orchestrator you use for your production deployment of dbt Core. We have guides for Deploying on GitHub Actions and Deploying on Apache Airflow. If you use Dagster, Prefect, or another solution, please reach out to us directly via Intercom or [email protected]. We are more than happy to help get you going.

The rest of this article will cover the general process and principles that will apply generally for most deployments, including local development and testing.

Step 1. Obtain API credentials from Workstream.

After logging into Workstream, click on your workspace name in the top-left corner. Then click Settings > Connections, and then click the “+ Connect” button under dbt Core. Copy these credentials to a safe place; possibly to a GitHub Secret in your dbt project repository.

Step 2. Install the workstream-dbt-core Python package.

Add the workstream-dbt-core package (PyPI link) to the Python project you use to run dbt Core. That could mean adding the package name to requirements.txt , running poetry add workstream-dbt-core, or (with your virtual environment activated) simply running pip install workstream-dbt-core .

This package has zero dependencies other than dbt ≥ 1.5.1, <2.0 , so it should not cause any package conflict issues.

The workstream-dbt-core package installs a CLI command, workstream report. That command looks for the target directory produced by an invocation of dbt, and posts the manifest.json, run_results.json, and sources.json files in that directory to a Workstream API endpoint. Workstream never sees your dbt project code or the data in your data warehouse, only the metadata from your dbt invocations.

Step 3. Authorize and Configure the workstream-dbt-core client.

Configuration is simple, and can be done through command-line options or environment variables. At a minimum, you will need to pass the Workstream API client ID and secret from Step 1 to the client, either through the cli options --client-id and --client-secret, or through the environment variables WORKSTREAM_DBT_CLIENT_ID and WORKSTREAM_DBT_CLIENT_SECRET. You may also need to pass a --target-path or set WORKSTREAM_DBT_TARGET_PATH if you do not invoke workstream report from your dbt project directory, or if you have configured a target path in your dbt project that is not the default.

Step 4. Run workstream report.

You will need to run workstream report after each individual dbt command. This is because dbt will overwrite the manifest.json and run_results.json files in your target directory with every invocation, and workstream report depends on those files.

If you have a shell script, you need to be careful with the exit codes of each command. dbt will exit with a non-zero code on a freshness error, run error, or test failure; this exit code could prevent workstream report from being invoked by the shell script. To force your script to continue to the next command, even if dbt fails, you should suffix the dbt command with || true. For example, the following script will run workstream report after dbt run, even if dbt run fails:

dbt run || true
workstream report

If you rely on dbt's exit codes to cause your workflow to fail, you can configure workstream report to exit with a non-zero code on a dbt failure by passing the --exit-nonzero option:

dbt run || true
workstream report --exit-nonzero

Deploying on GitHub Actions

Since a GitHub Actions workflow is a series of shell scripts, many of the points above apply.

  1. Obtain credentials from Workstream. We recommend you store these as GitHub Repository Secrets or GitHub Environment Secrets so you can access them in your workflow.

  2. Add the workstream-dbt-core package (PyPI link) to the Python project you use to run dbt Core. That could mean adding the package name to requirements.txt , running poetry add workstream-dbt-core, or in your GitHub Actions workflow, adding a step with pip install workstream-dbt-core.

  3. Update the GitHub Actions Workflow that defines your production dbt runs to invoke the workstream report command after each dbt run, dbt test, or dbt source freshness. Be careful with exit codes: GitHub will abort the workflow if any shell command exits with a nonzero code. By default, workstream report will always exit with a code of 0; however, if you *want* the GitHub workflow to fail on a dbt failure, you should use workstream report --exit-nonzero. For example, this workflow step will never stop after the freshness call, but may stop if dbt run or dbt test fail:

    - name: Run dbt build pipeline
    run: |
    dbt source freshness || true
    workstream report
    dbt seed
    dbt run || true
    workstream report --exit-nonzero
    dbt test || true
    workstream report --exit-nonzero
    env:
    WORKSTREAM_DBT_CLIENT_ID: ${{ secrets.WORKSTREAM_DBT_CLIENT_ID }}
    WORKSTREAM_DBT_CLIENT_SECRET: ${{ secrets.WORKSTREAM_DBT_CLIENT_SECRET }}

  4. After merging your PR, your next production run of dbt will report its results to Workstream, so you can use Workstream’s incident management features to manage dbt failures.

Deploying on Apache Airflow

There are many ways to deploy dbt Core on Airflow. Here we'll cover the two most common ways, using the BashOperator and the PythonOperator. The most important aspect of this solution (for most Airflow deployments) is that the Workstream Report step must run in the same operator as the dbt invocation, since we depend on files created by the dbt invocation. This requirement can be relaxed if your dbt operator copies its files to durable storage, like an S3 bucket.

We'll follow the same steps as the local deployment:

  1. Obtain credentials from Workstream. We recommend you store these as Airflow Variables or Secrets, if you have a Secrets backend configured. The worker executing your BashOperator or PythonOperator will need access to these credentials.

  2. Add the workstream-dbt-core package (PyPI link) to your Airflow project or to the environment the operator executes in (if using PythonVirtualenvOperator, ExternalPythonOperator, DockerOperator, KubernetesPodOperator, etc.).

  3. Update the task that invokes dbt to also invoke workstream report or call its Python API. It is important that this occurs in the same task that invokes dbt, and not a subsequent task, which won't have access to the dbt artifact files.

    Here is an example DAG that invokes dbt run and dbt test in two separate BashOperator tasks, with each reporting their results to Workstream:

    from pathlib import Path
    import textwrap

    from airflow import DAG
    from airflow.operators.bash import BashOperator

    with DAG("dbt-workstream") as dag:
    dbt_proj_path = Path.home() / "..." / "dbt-proj"

    dbt_run = BashOperator(
    task_id="run_dbt",
    bash_command=textwrap.dedent(
    f"""
    dbt run --project-dir {dbt_proj_path} || true
    workstream report \
    --exit-nonzero \
    --target-path {dbt_proj_path / "target"}
    """.strip()
    ),
    env={
    "WORKSTREAM_DBT_CLIENT_ID": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_ID') }}",
    "WORKSTREAM_DBT_CLIENT_SECRET": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_SECRET') }}",
    },
    append_env=True,
    )

    dbt_test = BashOperator(
    task_id="test_dbt",
    bash_command=textwrap.dedent(
    f"""
    dbt test --project-dir {dbt_proj_path} || true
    workstream report \
    --exit-nonzero \
    --target-path {dbt_proj_path / "target"}
    """.strip()
    ),
    env={
    "WORKSTREAM_DBT_CLIENT_ID": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_ID') }}",
    "WORKSTREAM_DBT_CLIENT_SECRET": "{{ var.value.get('WORKSTREAM_DBT_CLIENT_SECRET') }}",
    },
    append_env=True,
    )

    dbt_run >> dbt_test


    Here is an equivalent example that uses the PythonOperator and the Python API for dbt and workstream_dbt_core:

    from pathlib import Path

    from airflow import DAG, AirflowException
    from airflow.decorators import task
    from airflow.models import Variable

    with DAG("dbt-workstream-py") as dag:
    dbt_proj_path = Path.home() / "..." / "dbt-proj"

    @task(task_id="run_dbt")
    def dbt_run():
    from dbt.cli.main import dbtRunner
    from workstream_dbt_core.api import report_invocation

    dbt = dbtRunner()
    dbt_result = dbt.invoke(["run", "--project-dir", str(dbt_proj_path)])
    ws_result = report_invocation(
    target_path=dbt_proj_path / "target",
    client_id=Variable.get("WORKSTREAM_DBT_CLIENT_ID"),
    client_secret=Variable.get("WORKSTREAM_DBT_CLIENT_SECRET"),
    )
    ws_result.print_report(exit_nonzero=True)
    if not dbt_result.success:
    raise AirflowException("dbt run failed.")

    @task(task_id="test_dbt")
    def dbt_test():
    from dbt.cli.main import dbtRunner
    from workstream_dbt_core.api import report_invocation

    dbt = dbtRunner()
    dbt_result = dbt.invoke(["test", "--project-dir", str(dbt_proj_path)])
    ws_result = report_invocation(
    target_path=dbt_proj_path / "target",
    client_id=Variable.get("WORKSTREAM_DBT_CLIENT_ID"),
    client_secret=Variable.get("WORKSTREAM_DBT_CLIENT_SECRET"),
    )
    ws_result.print_report(exit_nonzero=True)
    if not dbt_result.success:
    raise AirflowException("dbt test failed.")

    dbt_run() >> dbt_test()

  4. After updating your DAG, your next run of Airflow will report its results to Workstream, so you can use Workstream’s incident management features to manage dbt failures.

Did this answer your question?