I’ve got this incremental PDT:
derived_table: {
increment_key: "last_update_time"
increment_offset: 5
sql: WITH
recency_scores as (
SELECT userId, recency, recency_centroid_score as recency_score
FROM (
SELECT centroid_id as recency_centroid_id,
row_number() over(order by numerical_value desc) as recency_centroid_score
FROM ML.CENTROIDS(MODEL ${recency_model.SQL_TABLE_NAME})
)
INNER JOIN ${recency_model_predictions.SQL_TABLE_NAME} USING(recency_centroid_id)
),
frequency_scores as (
SELECT userId, frequency, frequency_centroid_score as frequency_score
FROM (
SELECT centroid_id as frequency_centroid_id,
row_number() over(order by numerical_value asc) as frequency_centroid_score
FROM ML.CENTROIDS(MODEL ${frequency_model.SQL_TABLE_NAME})
)
INNER JOIN ${frequency_model_predictions.SQL_TABLE_NAME} USING(frequency_centroid_id)
),
monetary_scores as (
SELECT userId, total_monetary, monetary_centroid_score as monetary_score, TIMESTAMP(TIMESTAMP_TRUNC(CURRENT_DATETIME(), SECOND)) as last_update
FROM (
SELECT centroid_id as monetary_centroid_id,
row_number() over(order by numerical_value asc) as monetary_centroid_score
FROM ML.CENTROIDS(MODEL ${monetary_model.SQL_TABLE_NAME})
)
INNER JOIN ${monetary_model_predictions.SQL_TABLE_NAME} USING(monetary_centroid_id)
)
SELECT userId,
recency,
frequency,
total_monetary,
recency_score,
frequency_score,
monetary_score,
recency_scorefrequency_scoremonetary_score as rfm_score,
last_update
FROM
(recency_scores
INNER JOIN frequency_scores USING(userId))
INNER JOIN monetary_scores USING(userId)
WHERE {% incrementcondition %} last_update {% endincrementcondition %}
;;
The trigger works every 5 minutes but the data are not appending in the table. We have always the last update, not the previous (we want the last and the 4 previous updates, as specified with increment_offset: 5).