Tutorial

How to manage PDTs at scale with Looker’s new Apache Airflow and Cloud Composer integrations

  • 15 March 2022
  • 3 replies
  • 1140 views
How to manage PDTs at scale with Looker’s new Apache Airflow and Cloud Composer integrations
Userlevel 2

Looker persistent derived tables (PDTs) provide the ultimate data modeling flexibility. They put robust data transformation capability into the hands of analysts and data modelers by enabling them to use LookML to write materialized query results back to the database. This means performant reporting based on complex analytical patterns behind the scenes (think cohorting, retention, user behavior patterns, etc.) can be in the hands of users faster.

However, as our customers have leaned into the magic of PDTs, some have run into challenges with scalability. Monitoring and managing PDTs that refresh on varying schedules becomes exponentially more complex as the number of PDTs grows. As a solution to this, we are thrilled to share that we have released a new integration with Apache Airflow that is now also available in Cloud Composer, Google’s managed workflow orchestration service built on Apache Airflow. This new integration provides a pathway for our customers to scale their PDT usage through external data orchestration alongside other ETL and ELT processes. Read on for details on how you can get started.

 

Before You Start

In order to take advantage of this new integration, you will need to be using the following versions of Looker and Airflow:

  • Looker 22.2+

  • Airflow 2 or Cloud Composer using an Airflow 2 environment

  • Google providers package 6.5.0+ for Airflow  

This integration uses the Looker SDK to connect Looker and Airflow. The SDK will call the Looker API 4.0 using your API credentials. To learn more about authenticating with the Looker API, check out our documentation.

 

Setting Up Your Connection

Because this integration will leverage Looker’s new functionality for managing PDTs externally via API, you’ll need to ensure that the toggle called Enable PDT API Control is turned on within your connection settings. If you have more than one connection, you’ll need to enable this for all connections with PDTs that will be managed using Airflow.

Once you have enabled PDT API control in Looker, you’ll need to set up your Looker connection within Apache Airflow or Cloud Composer.

Apache Airflow

iuWyWu-pBwLG3lxHlv9cfBnTWEy-_xivSA8DnfRHN8Kk5y9JGcz1mK5uqIoWlx0P1D8zp4fIDNZhpqvFqNEzqn6c5IRf651GPiHzaUn3QoPDNo1-P4GoYti8chFaYyN9oh6r5pIYuOYpPn6rMguyIIsji0jvZ5elWTurMFmLvgXNZhki

Here’s a brief overview of the parameters available and recommended settings for each:

Connection Id: your_conn_id #give your connection a name of your choosing

Connection Type: HTTP

Host: https://your.looker.com #base URL for Looker API (do not include /api/* in the URL)

Login: YourClientID

Password: YourClientSecret

Port:  #optional - Looker will use the default API path for your instance if left blank

Extra: {"verify_ssl": "true", "timeout": "120"}  # optional

For more information on these settings, see the Looker API path and port section of Looker’s documentation. To generate your API login (ClientID) and password (ClientSecret), you’ll need to do the following:

  1. Create API3 credentials on the Users page in the Admin section of your Looker instance. If you’re not a Looker admin, ask your Looker admin to create the API3 credentials for you.

  2. Copy and paste the generated client ID into your Airflow connection as your login and the generated client secret as your password. For more information, see our documentation on Authentication with an SDK.

There are some additional optional parameters that can be set using JSON format within the Extra section. These include:

  • verify_ssl: This should be set to false ONLY if testing locally against self-signed certs. Otherwise, this will default to true and does not need to be specified.

  • timeout: You can use this to set the timeout (in seconds) for HTTP requests. This will default to 120 seconds (2 minutes) if not specified.

Please note that there isn’t an associated test_connection hook for this connection type, so the Test Connection button will not work. The best way to test your connection is to create and trigger a simple DAG.

Cloud Composer

When setting up a Looker connection in Cloud Composer, we recommend using Secret Manager. By using Secret Manager, you benefit from full capability of secret management for your Looker Client Secret. Here are the steps for setting up a connection in Cloud Composer using Secret Manager:

1. Enable Secret Manager in your project.

2. Ensure that the Service Account used by Cloud Composer has the proper permissions.

3. Set secrets/backend Airflow configuration override to indicate that Secret Manager will be used for secrets.

4. Create a Secret following this naming pattern and Value following URI representation (this is where you will pass in your Looker instance and connection details).

For testing purposes or if you would prefer not to use Secret Manager, you can set up a connection in Airflow directly for your Composer environment. To do that, you would need to open the Airflow UI from your Composer environment and follow the steps outlined above for creating a connection within Airflow directly.

 

Creating a DAG

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. Once you have set up your connection to Looker, you’re ready to create a DAG for PDTs. To do this, you should follow the same steps that you would for any other DAG you might create in Airflow. An example DAG can be found at the end of this article.

Each task in your DAG will be responsible for specific PDT builds using two new Looker Operators:

  • LookerStartPdtBuildOperator initiates materialization for a PDT based on a specified model name and view name and returns the materialization ID.

  • LookerCheckPdtBuildSensor checks the status of a PDT build based on a provided materialization ID for the PDT build job

Using these operators, there are two different types of tasks that you might create: synchronous and asynchronous. Let’s take a closer look at how each would be set up.

 

Synchronous Tasks

Synchronous mode is the default mode for tasks. In synchronous tasks, the LookerStartPdtBuildOperator is used to both start the task and check its status, so it is the only operator that you will need to use inside your PDT build task.

To use this operator, you simply need to provide the model name and view name for the PDT that you want to build. These are the only required parameters, but are a few additional optional parameters that you can include:

  • wait_time: Specify the number of seconds to wait in between status checks for the materialization job. The default is 10 seconds, but this parameter enables you to set a custom interval.

  • wait_timeout: Specify a build timeout in seconds. For example, to force a build to timeout after one hour, you would set wait_timeout to 3600

  • query_parameters: Further specify the materialization build job by including additional settings specific to the PDT build.

    • force_rebuild: This can be set to True or False. When set to True, this will force the rebuild of the specified PDT in your task plus any other PDTs that it depends on, even if they are already materialized. If this parameter is not specified, then it will default to False.

    • force_full_incremental: This can be set to True or False. When set to True, this will force an incremental PDT to fully rebuild (as opposed to just appending a new increment). If this parameter is not specified, then it will default to False.

    • workspace: This is a string value that specifies the workspace in which the PDT should be materialized. It can either be set to dev or production. If unset, this value will default to production.

Here’s example code for a synchronous PDT build task:

build_pdt_task = LookerStartPdtBuildOperator(

task_id='build_pdt_task',

looker_conn_id='your_conn_id',        

model='your_lookml_model',

view='your_lookml_view',

wait_time=30

wait_timeout=3600

query_params={

"force_rebuild": True,

        "force_full_incremental": True,

        "workspace": "dev",

},

)

 

And here’s what this would look like within Airflow:

TJ76mSVLNK86J2t-wUScC8qLU6ydmEJiA8iryJ9dtLl6i-4RnRK1i662SziZHd9M88uUXfNKlVCYsKIPT_ZDRfsxobOfVlAd1CKZ7burKyLYhyTUxoBzMwYpVPmtRs83oIBz8bn5GOZofmxfKoB8xgMMTWTRKEI-YK4177dzjcwr5WSR

Once started, the build task will block DAG execution until the build is finished, meaning that the task will run until it succeeds, errors, or is canceled.

 

Asynchronous Tasks

In asynchronous mode, a PDT build task is separated into start and status tasks. Using asynchronous mode and separating the start and status tasks in this way can be useful when submitting long-running PDT build jobs. The LookerStartPdtBuildOperator is used to start the build task and should be set up as outlined above. The only difference is that you’ll include the following additional parameter within the task:

start_pdt_task_async = LookerStartPdtBuildOperator(

    …

    asynchronous=True,

    …

)

You will then need to include a separate Airflow sensor, which can be created using the LookerCheckPdtBuildSensor operator. This operator is a custom sensor task that can be used to check the status of a PDT build. When the DAG is executing, the start task will finish immediately and pass execution over to the status task. 

The only required parameter for the LookerCheckPdtBuildSensor is the materialization_id for the PDT build job, which is an output of the start task. You can also, optionally, include a custom poke_interval, which sets the interval for checking the status of the job (in seconds). The default poke_interval is 60 seconds (1 minute).

Here’s example code for a status task:

check_pdt_task_async_sensor = LookerCheckPdtBuildSensor(

task_id='check_pdt_task_async_sensor',

looker_conn_id='your_conn_id',        

materialization_id=start_pdt_task_async.output,

poke_interval=10 # optional, poke every 10 sec

)

 

Within Airflow, you would chain these tasks together and specify the relationship between them within your DAG:

   start_pdt_task_async >> check_pdt_task_async_sensor

 

Here’s how your DAG would be represented in Airflow: 

8TJssi_SucND8AOsVbe8GE4vMuzTeJdjoVl3oYwW71t1-rlMrXiJ7_Gfz-ZyBlxBXvx4p3YsETnprKrcr5SU4Fp5zQwn9-ipN9riMq8FEZta3MFx-dafOA3I_a4vDvddT88XTTZujvahiZwz-c6KoY-Lyx5U2jNtadmc2dZ7ddbP3Glw

Just like with synchronous mode, once running, the status task will block DAG execution until the build is finished, meaning that the task will run until it succeeds, errors, or is canceled.

 

Canceling a PDT Build

Both the LookerStartPdtBuildOperator and the LookerCheckPdtBuildSensor support PDT build cancellation. This does not require a separate operator. Either type of task can be canceled by manually marking a task as Failed or Success in Airflow. 

SORwQHMX5xW57PXTc4whmIm1DPCiuHU-BUFFtwZ6tJ8hSgMnzGpnhxg6VlZ542s1IhfWoXqeoQTgDv-hK7oL3LxtMjLFTv8JHzDCyFs61u0VFcmYb-n6AK9qnGaf37feiAAO8XLOlQ8A_v-fk5uWHRzscAECxt1jZya_10vXFSrVhBz4

Manually marking a task as Failed or Success will trigger an API call back to Looker to cancel the build. This will result in the cancellation of the PDT materialization in Looker and within your database (if supported).

 

Complete DAG Example

Below is a complete example of how you might set up a DAG in Airflow for managing Looker PDT builds. If using Airflow, DAGs should be placed within the dags folder inside the file structure of your Airflow implementation. If using Cloud Composer, DAGs should be placed inside the DAGs folder for your environment, which can be found on the Environment Details page.

from datetime import datetime

from airflow import models

from airflow.providers.google.cloud.operators.looker import LookerStartPdtBuildOperator

from airflow.providers.google.cloud.sensors.looker import LookerCheckPdtBuildSensor



with models.DAG(

dag_id='looker_pdt_build',

schedule_interval='0 0 * * *',

start_date=datetime(2022, 1, 1),

catchup=False,

) as dag:



# Start PDT build in asynchronous mode

start_pdt_task_async = LookerStartPdtBuildOperator(

task_id='start_pdt_build_async',

looker_conn_id='my_connection_name',

model='my_model_name',

view='my_view_name',

asynchronous=True,

)



check_pdt_task_async_sensor = LookerCheckPdtBuildSensor(

task_id='check_pdt_async',

looker_conn_id='my_connection_name',

materialization_id=start_pdt_task_async.output,

poke_interval=10, # optional, poke every 10 sec (default is 1 minute)

)



start_pdt_task_async >> check_pdt_task_async_sensor

​This new integration with Apache Airflow creates a pathway for scaling PDT management and maintenance. Not only can you implement automated processes to govern PDT rebuilds, but you can also orchestrate these data transformations alongside your other ETL and ELT workflows. 

These new operators will also be made available within Cloud Composer, Google Cloud’s managed workflow orchestration solution built on top of Apache Airflow, so watch out for more information on that in the coming weeks.


3 replies

I’ve been trying to setup a pipeline in order to trigger a PDT Build using the `LookerStartPdtBuildOperator` without any luck. The error reported in the logs isn’t useful either. 

LookerStartPdtBuildOperator(    
task_id='test_trigger_looker_pdt',
looker_conn_id='looker',
model='my_model',
view='my_view_name',
)

The error I get is:

 

looker_sdk.error.SDKError: {"message":"An error has occurred.","documentation_url":"https://docs.looker.com/"}

 

  • The view does exist (I tried to use a name for non-existent view and it complained that the view did not exist, so I assume that at least, it can see it there) 
  • I enabled PDT API control
  • Gave permissions to the Airflow Service Account


Any thoughts what I might be missing here?


Additional logs:

 

2022-06-27, 15:39:16 UTC] {requests_transport.py:72} INFO - POST(https://withplum.cloud.looker.com/api/4.0/login)
[2022-06-27, 15:39:17 UTC] {requests_transport.py:72} INFO - GET(https://withplum.cloud.looker.com/api/4.0/versions)
[2022-06-27, 15:39:17 UTC] {requests_transport.py:72} INFO - GET(https://withplum.cloud.looker.com/api/4.0/derived_table/my_model/my_view_name/start)
[2022-06-27, 15:39:17 UTC] {taskinstance.py:1776} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/operators/looker.py", line 79, in execute
resp = self.hook.start_pdt_build(
File "/opt/python3.8/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/looker.py", line 71, in start_pdt_build
resp = sdk.start_pdt_build(model_name=model, view_name=view, source=self.source, **query_params)
File "/opt/python3.8/lib/python3.8/site-packages/looker_sdk/sdk/api40/methods.py", line 4967, in start_pdt_build
self.get(
File "/opt/python3.8/lib/python3.8/site-packages/looker_sdk/rtl/api_methods.py", line 143, in get
return self._return(response, structure)
File "/opt/python3.8/lib/python3.8/site-packages/looker_sdk/rtl/api_methods.py", line 87, in _return
raise error.SDKError(response.value.decode(encoding=encoding))
looker_sdk.error.SDKError: {"message":"An error has occurred.","documentation_url":"https://docs.looker.com/"}

 

Userlevel 1

@gmyrianthous shot in the dark, but are you sure the derived table you’re targeting has a persistence strategy defined? If it’s not a persistent derived table, that’s the (unhelpful) error you get.

@gmyrianthous Sorry for the late reply, but you can also check Looker’s logs, they may have more information as the API may not handling the error message properly.

You can also try to start the build directly using Looker’s API to see if it works:

https://developers.looker.com/api/explorer/4.0/methods/DerivedTable/start_pdt_build

Finally, don’t hesitate to reach out to our support so we can take a look into that.

Thanks!

Reply