Use custom merge logic for incremental table in Dataform

I'm creating an incremental table in Dataform, but want to change how duplicate rows are merged. The default behavior is for duplicate rows to overwrite existing rows in the table (Merge rows in an incremental table). Instead, I would like to sum values in the new duplicate rows to the existing rows' values.

For example, consider the source table below. Here, sales can may be added to the table after the date that they occur (insert_date = date the sales were added to table, transaction_date = date the sales occurred).

 

insert_datetransaction_datesales_quantity
2023-12-012023-12-0110
2023-12-022023-12-0220
2023-12-032023-12-015

I would like to incrementally (based on insert_date) generate a table that shows the total sales_quantity on a given transaction_date, where transaction_date is the unique key. The result would be as follows.

 

transaction_datesales_quantity
2023-12-0115
2023-12-0220

So I would like to sum the sales_quantity for duplicate rows, rather than overwrite it. But with the default behavior running on the sample code below, if I ran the incremental execution on 2023-12-01, 2023-12-02, and 2023-12-03 (where future insert_date rows don't exist in the source table on the date of the execution), I would end up with the following results at the end. During the last execution, the new row with a transaction_date of 2023-12-01 overwrote the previously added row, instead of adding to it.

SQLX file:

 

config {
  type: "incremental",
  uniqueKey: ["transaction_date"]
}

pre_operations {
  DECLARE insert_date_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT MAX(insert_date) FROM ${self()}`,
    `SELECT DATE("2000-01-01")`)}
  )
}

SELECT
  transaction_date,
  SUM(sales_quantity) AS sales_quantity
FROM
  ${ref("table")}
WHERE
  insert_date > insert_date_checkpoint
GROUP BY
  transaction_date

 

Results:

transaction_datesales_quantity
2023-12-015
2023-12-0220

Is there a way to update the merge logic associated with an incremental table to use something other than overwrite? Or would I need to create a specific operations SQLX file to accomplish this?

Doing a full refresh of the data every time wouldn't be a good solution since the sales table can be very large/costly and I want to take advantage of its partitioning on the insert_date column.

Solved Solved
0 2 851
1 ACCEPTED SOLUTION

You're correct that the default behavior of Dataform's incremental tables is to overwrite existing rows with matching unique keys. To sum values in duplicate rows instead, you have two main options:

  1. Modifying the Incremental Table Definition:

    • Custom MERGE Statement: Utilize a MERGE statement with custom logic to manage duplicate rows. In your SELECT statement, include a subquery to identify existing rows for each transaction date. Then, in the MERGE statement, use a WHEN MATCHED clause to update sales_quantity by adding the new row's value to the existing one. This approach offers granular control over the merge behavior but requires additional coding.

    • Custom pre_operations: Implement a custom pre-operation to aggregate and temporarily store the total sales quantity for each transaction date. In the main SELECT statement, join this temporary data with your source table and sum the sales_quantity values. This method can be more efficient for frequent updates but may add complexity for large datasets.

  2. Creating a Custom Operations SQLX File:

    • This method allows for fully customized SQL logic to handle the incremental update as per your requirements. You can effectively use Dataform functions like incremental(), self(), and previous_data() to manage the data flow. This option offers the most flexibility but requires a higher level of SQL expertise.

Additional Considerations:

  • Partitioning: Partitioning the table on transaction_date can improve performance, especially for large datasets. This strategy is beneficial for managing and querying large volumes of data efficiently.

  • Handling Future Dates: Consider using a UNION ALL strategy to combine current data with a separate query for potential future transaction dates. This ensures comprehensive and up-to-date data handling.

Testing and Validation:

  • Ensure thorough testing and validation of your custom logic to handle all possible scenarios and maintain data integrity.

Documentation and Maintenance:

  • Given the complexity of custom solutions, proper documentation and clear comments within the SQLX file are crucial. This practice aids in understanding the logic and facilitates future modifications.

The choice between modifying the incremental table definition and creating a custom operations SQLX file depends on your specific data requirements, the complexity of your pipeline, and your comfort level with SQL. Both approaches have their merits, and the most suitable one will depend on balancing flexibility, complexity, and performance considerations.

View solution in original post

2 REPLIES 2

You're correct that the default behavior of Dataform's incremental tables is to overwrite existing rows with matching unique keys. To sum values in duplicate rows instead, you have two main options:

  1. Modifying the Incremental Table Definition:

    • Custom MERGE Statement: Utilize a MERGE statement with custom logic to manage duplicate rows. In your SELECT statement, include a subquery to identify existing rows for each transaction date. Then, in the MERGE statement, use a WHEN MATCHED clause to update sales_quantity by adding the new row's value to the existing one. This approach offers granular control over the merge behavior but requires additional coding.

    • Custom pre_operations: Implement a custom pre-operation to aggregate and temporarily store the total sales quantity for each transaction date. In the main SELECT statement, join this temporary data with your source table and sum the sales_quantity values. This method can be more efficient for frequent updates but may add complexity for large datasets.

  2. Creating a Custom Operations SQLX File:

    • This method allows for fully customized SQL logic to handle the incremental update as per your requirements. You can effectively use Dataform functions like incremental(), self(), and previous_data() to manage the data flow. This option offers the most flexibility but requires a higher level of SQL expertise.

Additional Considerations:

  • Partitioning: Partitioning the table on transaction_date can improve performance, especially for large datasets. This strategy is beneficial for managing and querying large volumes of data efficiently.

  • Handling Future Dates: Consider using a UNION ALL strategy to combine current data with a separate query for potential future transaction dates. This ensures comprehensive and up-to-date data handling.

Testing and Validation:

  • Ensure thorough testing and validation of your custom logic to handle all possible scenarios and maintain data integrity.

Documentation and Maintenance:

  • Given the complexity of custom solutions, proper documentation and clear comments within the SQLX file are crucial. This practice aids in understanding the logic and facilitates future modifications.

The choice between modifying the incremental table definition and creating a custom operations SQLX file depends on your specific data requirements, the complexity of your pipeline, and your comfort level with SQL. Both approaches have their merits, and the most suitable one will depend on balancing flexibility, complexity, and performance considerations.

@ms4446 thanks for the detailed response. All 3 suggestions are clean and clear. It shouldn't be an issue implementing the incremental summation merge I was hoping for with one of these approaches. I appreciate the help!