[Dataform] How to create an incremental table with a list of columns pulled from INFORMATION_SCHEMA?

Use case:
We have a table `aggdata.locations` that is frequently being updated with new columns.
We want to be able to capture this table with weekly snapshots using an INCREMENTAL table in dataform.

Issue:
The problem that arises in the use of SELECT * in the SQL statement. If SELECT * is being used we cannot put a timestamp column on the trailing end of the snapshot table.

Let's say a new column is added to aggdata.locations. I already have a table set up in which I capture weekly snaphots called `locations_snapshots`. I manually add a new column to my `locations_snapshots` table to accommodate the new field. The below statement would error if a new column was added to aggdata.locations after the snapshot table was initially built.  The error would be caused by mismatching columns from the locations vs locations_snapshots table.

 

SELECT 
*, 
CURRENT_TIMESTAMP AS snapshot_timestamp 
FROM aggdata.locations

 

Instead of having to explicitly list all of the fields in the SQL statement, ideally a list of fields pulled from INFORMATION_SCHEMA, would be a great alternative.

Below is some sudo code in how I imagine it would work:

 

config { type: "incremental",
         name: "locations_snapshots",
         schema: "aggdata",
        tags: ["snapshot"],
        }

pre_operations {
    declare fields default (
            SELECT STRING_AGG(column_name,',\n')||"," 
            FROM aggdata.INFORMATION_SCHEMA.COLUMNS 
            WHERE table_name = 'locations'
            )
}


SELECT
    fields,
    CURRENT_TIMESTAMP AS snapshot_timestamp
FROM ${ref("aggdata","locations")}

 

The above code obviously does not work as intended. It just enters the fields as a value for each record in the table. I'm just trying to convey the concept. 

Solved Solved
0 1 1,118
1 ACCEPTED SOLUTION

There is currently no way to create an incremental table with a list of columns pulled from INFORMATION_SCHEMA in Dataform. However, there are a few workarounds you can use.

Option 1:

One workaround is to create a new table with the desired columns and then use a MERGE statement to insert or update the data from the aggdata.locations table. For example:

 

CREATE TABLE aggdata.locations_snapshots (
  location_id INT NOT NULL,
  ...
  snapshot_timestamp TIMESTAMP NOT NULL
);

MERGE INTO aggdata.locations_snapshots
USING (SELECT * FROM aggdata.locations) AS t
ON locations_snapshots.location_id = t.location_id
WHEN MATCHED THEN
  UPDATE SET
    ...
    snapshot_timestamp = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
  INSERT (
    location_id,
    ...
    snapshot_timestamp
  ) VALUES (
    t.location_id,
    ...
    CURRENT_TIMESTAMP
  );

This approach will ensure that the locations_snapshots table is always up-to-date with the latest data from the aggdata.locations table, even if new columns are added.

Option 2:

Another workaround is to use a stored procedure to create the locations_snapshots table. The stored procedure can dynamically generate the SQL statement to create the table, based on the columns that are currently in the aggdata.locations table. This approach is more complex, but it is more flexible and can be used to create incremental tables for any table in the database.

Here is an example of a stored procedure that you could use to create an incremental table with a list of columns pulled from INFORMATION_SCHEMA:

 
CREATE OR REPLACE PROCEDURE create_incremental_table(
  table_name VARCHAR(255),
  snapshot_table_name VARCHAR(255)
)
AS $$
DECLARE
  fields VARCHAR(255);
BEGIN
  SELECT STRING_AGG(column_name, ',') INTO fields
  FROM INFORMATION_SCHEMA.COLUMNS
  WHERE table_name = table_name;

  EXECUTE FORMAT('CREATE TABLE %s (%s, snapshot_timestamp TIMESTAMP NOT NULL)', snapshot_table_name, fields);
END;
$$ LANGUAGE plpgsql;

To use the stored procedure, you would simply call it with the name of the table that you want to create an incremental table for and the name of the incremental table that you want to create. For example:

 
CALL create_incremental_table('aggdata.locations', 'aggdata.locations_snapshots');

Once the stored procedure has been executed, the aggdata.locations_snapshots table will be created with the same columns as the aggdata.locations table, plus a snapshot_timestamp column. You can then use a MERGE statement to insert or update the data from the aggdata.locations table into the aggdata.locations_snapshots table, just like in the first example.

Which workaround you choose will depend on your specific needs and requirements. If you need a simple and easy-to-use solution, then Option 1 is the best option. If you need a more flexible solution that can be used to create incremental tables for any table in the database, then Option 2 is the best option.

View solution in original post

1 REPLY 1

There is currently no way to create an incremental table with a list of columns pulled from INFORMATION_SCHEMA in Dataform. However, there are a few workarounds you can use.

Option 1:

One workaround is to create a new table with the desired columns and then use a MERGE statement to insert or update the data from the aggdata.locations table. For example:

 

CREATE TABLE aggdata.locations_snapshots (
  location_id INT NOT NULL,
  ...
  snapshot_timestamp TIMESTAMP NOT NULL
);

MERGE INTO aggdata.locations_snapshots
USING (SELECT * FROM aggdata.locations) AS t
ON locations_snapshots.location_id = t.location_id
WHEN MATCHED THEN
  UPDATE SET
    ...
    snapshot_timestamp = CURRENT_TIMESTAMP
WHEN NOT MATCHED THEN
  INSERT (
    location_id,
    ...
    snapshot_timestamp
  ) VALUES (
    t.location_id,
    ...
    CURRENT_TIMESTAMP
  );

This approach will ensure that the locations_snapshots table is always up-to-date with the latest data from the aggdata.locations table, even if new columns are added.

Option 2:

Another workaround is to use a stored procedure to create the locations_snapshots table. The stored procedure can dynamically generate the SQL statement to create the table, based on the columns that are currently in the aggdata.locations table. This approach is more complex, but it is more flexible and can be used to create incremental tables for any table in the database.

Here is an example of a stored procedure that you could use to create an incremental table with a list of columns pulled from INFORMATION_SCHEMA:

 
CREATE OR REPLACE PROCEDURE create_incremental_table(
  table_name VARCHAR(255),
  snapshot_table_name VARCHAR(255)
)
AS $$
DECLARE
  fields VARCHAR(255);
BEGIN
  SELECT STRING_AGG(column_name, ',') INTO fields
  FROM INFORMATION_SCHEMA.COLUMNS
  WHERE table_name = table_name;

  EXECUTE FORMAT('CREATE TABLE %s (%s, snapshot_timestamp TIMESTAMP NOT NULL)', snapshot_table_name, fields);
END;
$$ LANGUAGE plpgsql;

To use the stored procedure, you would simply call it with the name of the table that you want to create an incremental table for and the name of the incremental table that you want to create. For example:

 
CALL create_incremental_table('aggdata.locations', 'aggdata.locations_snapshots');

Once the stored procedure has been executed, the aggdata.locations_snapshots table will be created with the same columns as the aggdata.locations table, plus a snapshot_timestamp column. You can then use a MERGE statement to insert or update the data from the aggdata.locations table into the aggdata.locations_snapshots table, just like in the first example.

Which workaround you choose will depend on your specific needs and requirements. If you need a simple and easy-to-use solution, then Option 1 is the best option. If you need a more flexible solution that can be used to create incremental tables for any table in the database, then Option 2 is the best option.