How to trigger a single Dataform Workflow from Composer

Title explains a lot.

I am using Dataform and Composer. Both instances are very simple and I just used the quickstart guides to get it up and running.

I have 2-3 workflows on Dataform. I added unique tags for all of them. The workflows deals with data and tables on my BigQuery instance.

I am trying to create a Composer DAG that will trigger the executing of a workflow, similar to if I am doing it manually from the UI, however, I can't do it and it looks the documentation is very limited.

Can somebody share an example of how to do it?

Thanks in advance!

Solved Solved
1 7 1,774
2 ACCEPTED SOLUTIONS

To trigger a Dataform workflow from Google Cloud Composer, you need to use the Composer's Apache Airflow environment to programmatically execute a Dataform job. This typically involves using the Airflow's HTTP operator to make a call to the Dataform API, which in turn triggers the workflow.

Here's a step-by-step guide on how to set this up:

1. Set Up Google Cloud Composer

Ensure that your Google Cloud Composer environment is up and running. You should have Apache Airflow installed as part of this environment.

2. Obtain Dataform API Credentials

To trigger a Dataform workflow, you need to authenticate with the Dataform API. This usually involves obtaining an API key or setting up OAuth credentials. Refer to Dataform's documentation to get these credentials.

3. Create an Airflow DAG

You will create a Directed Acyclic Graph (DAG) in Airflow to define the workflow. This DAG will include a task to trigger the Dataform job.

Here's a basic example of what the DAG might look like in Python:

 
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime, timedelta

# Default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG('trigger_dataform_workflow',
          default_args=default_args,
          description='Trigger a Dataform workflow',
          schedule_interval=timedelta(days=1))

# Task to trigger Dataform workflow
trigger_dataform = SimpleHttpOperator(
    task_id='trigger_dataform',
    http_conn_id='dataform_api_connection',  # Replace with your connection ID
    endpoint='your/dataform/api/endpoint',  # Replace with your Dataform API endpoint
    method='POST',
    headers={"Content-Type": "application/json", "Authorization": "Bearer YOUR_API_KEY"},
    data=json.dumps({"tag": "your_workflow_tag"}),  # Replace with your workflow tag
    dag=dag,
)
 

4. Set Up Airflow Connections

In the Airflow UI, set up a new HTTP connection (dataform_api_connection in the example) with the details of your Dataform API endpoint. This includes the API URL and any authentication headers required.

5. Deploy and Test the DAG

Deploy this DAG to your Airflow environment and test it to ensure it triggers the Dataform workflow as expected.

View solution in original post

To authenticate and trigger a Dataform workflow from Google Cloud Composer, you typically need to set up an HTTP connection in Composer that includes the necessary authentication details to interact with Dataform's API. Here are the steps you can follow:

  1. Obtain Dataform API Credentials:

    • You need to have the appropriate credentials to authenticate with Dataform's API. This could be an API key or OAuth tokens, depending on Dataform's authentication mechanism.
    • The guide you followed for authentication should provide you with these credentials. Make sure you have followed all the steps correctly.
  2. Setting Up HTTP Connection in Composer:

    • In your Composer (Airflow) environment, you need to set up an HTTP connection that includes the Dataform API's base URL and the necessary authentication headers.
    • The connection setup typically involves specifying the API endpoint as the 'Host' and adding the authentication token or API key in the 'Extras' field in JSON format (e.g., {"Authorization": "Bearer YOUR_API_KEY"}).
  3. Testing the Connection:

    • Before integrating this connection into your DAG, you can test it using tools like Postman.
    • In Postman, set up a request to the Dataform API endpoint and include the authentication headers. If you face issues, double-check the API endpoint and the credentials.
  4. Integration in DAG:

    • Once the connection is set up and tested, you can use it in your DAG to trigger Dataform workflows. This is usually done using the SimpleHttpOperator or a similar operator in Airflow, where you specify the HTTP connection ID and the necessary parameters for the API request.
  5. Execution Context:

    • You don't need to execute the authentication steps on the Composer machine. The key is to ensure that the HTTP connection in Composer has the correct configuration to authenticate with Dataform's API.
  6. Documentation and Support:

    • If the documentation you're following is unclear or you're encountering specific issues, consider reaching out to Dataform's support or community forums for more targeted assistance.
  7. Troubleshooting:

    • If you encounter errors, check the logs in your Composer environment for clues. Common issues include incorrect API endpoints, invalid credentials, or network-related problems.

View solution in original post

7 REPLIES 7

To trigger a Dataform workflow from Google Cloud Composer, you need to use the Composer's Apache Airflow environment to programmatically execute a Dataform job. This typically involves using the Airflow's HTTP operator to make a call to the Dataform API, which in turn triggers the workflow.

Here's a step-by-step guide on how to set this up:

1. Set Up Google Cloud Composer

Ensure that your Google Cloud Composer environment is up and running. You should have Apache Airflow installed as part of this environment.

2. Obtain Dataform API Credentials

To trigger a Dataform workflow, you need to authenticate with the Dataform API. This usually involves obtaining an API key or setting up OAuth credentials. Refer to Dataform's documentation to get these credentials.

3. Create an Airflow DAG

You will create a Directed Acyclic Graph (DAG) in Airflow to define the workflow. This DAG will include a task to trigger the Dataform job.

Here's a basic example of what the DAG might look like in Python:

 
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from datetime import datetime, timedelta

# Default arguments
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2024, 1, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG('trigger_dataform_workflow',
          default_args=default_args,
          description='Trigger a Dataform workflow',
          schedule_interval=timedelta(days=1))

# Task to trigger Dataform workflow
trigger_dataform = SimpleHttpOperator(
    task_id='trigger_dataform',
    http_conn_id='dataform_api_connection',  # Replace with your connection ID
    endpoint='your/dataform/api/endpoint',  # Replace with your Dataform API endpoint
    method='POST',
    headers={"Content-Type": "application/json", "Authorization": "Bearer YOUR_API_KEY"},
    data=json.dumps({"tag": "your_workflow_tag"}),  # Replace with your workflow tag
    dag=dag,
)
 

4. Set Up Airflow Connections

In the Airflow UI, set up a new HTTP connection (dataform_api_connection in the example) with the details of your Dataform API endpoint. This includes the API URL and any authentication headers required.

5. Deploy and Test the DAG

Deploy this DAG to your Airflow environment and test it to ensure it triggers the Dataform workflow as expected.

@ms4446 Thank you so much for your prompt response.

I created a Composer instance, running sample jobs just fine. My Dataform instance is also useful for me to run jobs manually, but I am not able to clear out the details for: the HTTP connection on Composer. I can't find the documentation that says if I need any headers, and how I would authenticate with Dataform. I am also trying to do a call in another app such as Postman, but I am unable to do so.

I followed this guide (https://cloud.google.com/dataform/docs/authentication?cloudshell=true#client-libs) but I don't know if I did it the right way. Do I need to execute it on Composer machine or anything like this?

Please let me know if you can help me moving forward or if you need any other detail

 

Thanks in advance!

To authenticate and trigger a Dataform workflow from Google Cloud Composer, you typically need to set up an HTTP connection in Composer that includes the necessary authentication details to interact with Dataform's API. Here are the steps you can follow:

  1. Obtain Dataform API Credentials:

    • You need to have the appropriate credentials to authenticate with Dataform's API. This could be an API key or OAuth tokens, depending on Dataform's authentication mechanism.
    • The guide you followed for authentication should provide you with these credentials. Make sure you have followed all the steps correctly.
  2. Setting Up HTTP Connection in Composer:

    • In your Composer (Airflow) environment, you need to set up an HTTP connection that includes the Dataform API's base URL and the necessary authentication headers.
    • The connection setup typically involves specifying the API endpoint as the 'Host' and adding the authentication token or API key in the 'Extras' field in JSON format (e.g., {"Authorization": "Bearer YOUR_API_KEY"}).
  3. Testing the Connection:

    • Before integrating this connection into your DAG, you can test it using tools like Postman.
    • In Postman, set up a request to the Dataform API endpoint and include the authentication headers. If you face issues, double-check the API endpoint and the credentials.
  4. Integration in DAG:

    • Once the connection is set up and tested, you can use it in your DAG to trigger Dataform workflows. This is usually done using the SimpleHttpOperator or a similar operator in Airflow, where you specify the HTTP connection ID and the necessary parameters for the API request.
  5. Execution Context:

    • You don't need to execute the authentication steps on the Composer machine. The key is to ensure that the HTTP connection in Composer has the correct configuration to authenticate with Dataform's API.
  6. Documentation and Support:

    • If the documentation you're following is unclear or you're encountering specific issues, consider reaching out to Dataform's support or community forums for more targeted assistance.
  7. Troubleshooting:

    • If you encounter errors, check the logs in your Composer environment for clues. Common issues include incorrect API endpoints, invalid credentials, or network-related problems.

Hi @ms4446 thanks again for your help. I took longer than I wanted to test it again, and, unfortunately, I can't reach to a simple execution that combines Dataform + Compose. I thought it would be easy since both are inside GCP but it doesn't seem so.

In your responses, it seems to be pretty straight forward, however, I am struggling with the basics and I can't setup a simple HTTP connection to Dataform as you say in topic number 2. 

I did follow this authentication guide (https://cloud.google.com/dataform/docs/authentication) and I am not 100% sure what would be the "Bearer YOUR_API_KEY". I also generated the API KEY via GCP and it still doesn't work.

I am also having a hard time to identify what is the endpoint that I should submit my request. By looking at this doc (https://cloud.google.com/dataform/reference/rest?_ga=2.163805177.-799383516.1697759444) it looks simple but I can't make it work.

Would you kindly help me with this details?
Thanks!

Hi @fbnz ,

I understand your frustration with setting up a connection between Dataform and Composer, especially regarding authentication and endpoint details. Here is some clarification:

Understanding "Bearer YOUR_API_KEY":

  • In the context of the guide, "Bearer YOUR_API_KEY" refers to the API access token you generate in GCP, used for authenticating your requests to the Dataform API.
  • Ensure you have the correct token from GCP. The actual token is a long string, and when used in requests, it should be prefixed with "Bearer " (note the space after Bearer).

Identifying the Endpoint:

  • The specific endpoint you need depends on the action you're trying to perform. For triggering a workflow, the typical format is /api/v2/projects/{project_id}/workspaces/{workspace_id}/workflows/{workflow_id}/runs.
  • Replace {project_id}, {workspace_id}, and {workflow_id} with your actual project, workspace, and workflow IDs, which can be found in the Dataform UI or your project configuration.

Troubleshooting Steps:

  1. Double-check Authentication:

    • Verify that you're using the correct API key and that it's still valid.
    • In your Airflow connection, the "Authorization" header should be set as "Bearer {YOUR_API_KEY}", with your actual API key in place of {YOUR_API_KEY}.
  2. Validate Endpoint:

    • Confirm the endpoint URL is correct for the specific action you're attempting, such as triggering a workflow.
    • Ensure you're using the correct IDs for your project, workspace, and workflow.
  3. Test the Connection:

    • Use a tool like Postman to send a test request to the Dataform API endpoint. Include your authentication details and payload to check if the issue is with Airflow or the Dataform API.
  4. Review Error Messages:

    • Carefully examine any error messages in Airflow. Look for HTTP status codes and specific error details that can help identify the issue.

I'm attempting to do the same, can't I use the dataform client library to auth using my composer service account? https://cloud.google.com/python/docs/reference/dataform/latest
Then I just need to trigger a workflow, the client lib documentation isn't clear on how to do that, do you have any examples?

Yes, you can use the Dataform client library to authenticate using your Composer service account. This approach leverages Google Cloud's IAM (Identity and Access Management) for authentication, which is a secure and scalable way to manage access to your Dataform resources.

Steps to authenticate with Dataform using a Composer service account:

  1. Create a Service Account: If you haven't already, create a service account in Google Cloud IAM and grant it the necessary Dataform permissions.

  2. Download the Service Account Key: Download the JSON key file for the newly created service account. This file contains the credentials needed for authentication.

  3. Set Environment Variable: In your Composer environment, set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of the downloaded JSON key file.

  4. Use the Dataform Client Library: Your code will now use the Dataform client library. The library should automatically pick up and use the credentials specified by the GOOGLE_APPLICATION_CREDENTIALS environment variable for authentication.

Example:

 
from google.cloud import dataform 

# Initialize the Dataform client
client = dataform.DataformClient() 

# Specify your project, location, and repository
project_id = "your-project-id"
location = "your-location"  # e.g., "us-central1"
repository_id = "your-repository-id"

# Specify the workflow you want to trigger
workflow_id = "your-workflow-id"

# Construct the full repository name
repository_name = f"projects/{project_id}/locations/{location}/repositories/{repository_id}"

# Trigger the workflow
response = client.trigger_workflow(
    request={"name": f"{repository_name}/workflows/{workflow_id}"} 
)
print("Workflow triggered:", response) 

Please Note:

  • Methods and parameters might vary slightly based on your Dataform client library version.
  • Replace placeholders like "your-project-id" with the values specific to your project.
  • If you need more assistance with the library, refer to its documentation, source code, or contact Google Cloud or the Dataform community.