stored procedure to refresh reporting data with data transferred in after the fact?

Hi All,

Use case: 

My current process (inherited) involves a daily data transfer from sa360 for a number of digital marketing clients. Since this data in it's raw form comes in like 20 different tables, I also run a scheduled query after each day's transfer that transforms the raw data into a set of tables for each day , ready to serve to looker studio. (i.e. sa360_stats_{run_time-25h|"%Y%m%d"} is the "destination table" for each run)

Issue: 

This process does not account for the fact that additional data, with a data_date up to 2 weeks in the past, can keep coming in with each day's transfer.

Solution:

Seeing as how I have a large number of accounts, and the amount of data being added is random (to me), I figured just re-running these scheduled queries for every client for the last two weeks every day would be very inefficient(?).

I finally have been able to write a stored procedure script that scans my whole project for relevant datasets (those with sa360 data AND those who have had data added after the fact). It then programmatically goes through each dataset, and refreshes each day's table where the date is a part of the current list of dates that have been *changed*. 

Question/inquiry:

Due to the fact that I am somewhat new to writing automation scripts and don't have anyone to talk things through with, I do kind of feel like I mcgyver-ed this together. If anyone is interested in checking it out, I would love to know if there are any suggestions on making it simpler, more efficient, or if I went and made this solution way more complicated than it needed to be.

Thank you so much in advance! See below for my code

 

 

 

BEGIN
DECLARE
  schemas ARRAY<string>;
DECLARE
  sa360_datasets ARRAY<string>;
DECLARE
  refresh_clients ARRAY<string>;
DECLARE
  ids ARRAY<string>;
DECLARE
  z int64 DEFAULT 0;
DECLARE
  dates ARRAY<string>;
DECLARE
  query string;
DECLARE
  dates_len int64;
DECLARE
  i INT64 DEFAULT 0;
DECLARE
  q INT64 DEFAULT 0;
DECLARE
  u INT64 DEFAULT 0;
DECLARE
  arrSize INT64;
DECLARE
  arrSize2 INT64;
DECLARE
  query2 string;
DECLARE
  query3 string;
SET
  schemas = ARRAY(
  SELECT
    *
  FROM
    my_project.my_dataset.sa360_datasets);
SET
  arrSize = ARRAY_LENGTH(schemas);
SET
  i = 0;
SET
  query = CONCAT("CREATE OR REPLACE TABLE `my_dataset.sa360_reporting_data` as (select struct(table_schema as client, sa360_id as id) as sa360_keys from (");
WHILE
  i < arrSize -1 DO
SET
  query = CONCAT( query, "select table_schema, split(table_name, '_')[safe_offset(2)] as sa360_id from my_project.",schemas[
  OFFSET
    (i)],".INFORMATION_SCHEMA.TABLES where contains_substr(table_name, 'p_AccountDeviceStat') and contains_substr(ddl, 'dpb_revenue') and contains_substr(ddl, 'dpb_transactions') union all  ");
SET
  i = i + 1;
END WHILE
  ;
SET
  query = CONCAT(query, "select table_schema, split(table_name, '_')[safe_offset(2)] as sa360_id from my_project.",schemas[ ORDINAL (arrSize)],".INFORMATION_SCHEMA.TABLES where contains_substr(table_name, 'p_AccountDeviceStat') and contains_substr(ddl, 'dpb_revenue') and contains_substr(ddl, 'dpb_transactions')))  ");
EXECUTE IMMEDIATE
  query;
SET
  refresh_clients = ARRAY(
  SELECT
    DISTINCT sa360_keys.client
  FROM
    `my_dataset.sa360_reporting_data`);
SET
  ids = ARRAY(
  SELECT
    DISTINCT sa360_keys.id
  FROM
    `my_dataset.sa360_reporting_data`);
SET
  arrSize2 = ARRAY_LENGTH(refresh_clients);
SET
  query2 = CONCAT("CREATE OR REPLACE TABLE `my_dataset.sa360_reporting_data` as ");
WHILE
  u < arrSize2 -1 DO
SET
  query2 = CONCAT(query2, " (select struct(table_schema, split(table_name, '_')[safe_offset(2)] as id, ARRAY(SELECT DISTINCT(FORMAT_DATE('%Y%m%d', date)) AS date_table_suffix FROM APPENDS(TABLE ", refresh_clients[
  OFFSET
    (u)],".p_KeywordFloodlightAndDeviceStats_", ids[
  OFFSET
    (u)],", DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 day), CURRENT_TIMESTAMP()) WHERE EXTRACT(date FROM _change_timestamp)>date AND _change_type='INSERT') AS dates)  as sa360_keys FROM my_project.", refresh_clients[
  OFFSET
    (u)],".INFORMATION_SCHEMA.TABLES WHERE CONTAINS_SUBSTR(table_name, 'p_AccountDeviceStat') AND CONTAINS_SUBSTR(ddl, 'dpb_revenue') AND CONTAINS_SUBSTR(ddl, 'dpb_transactions') ) union all");
SET
  u = u + 1;
END WHILE
  ;
SET
  query2 = CONCAT(query2, " (select struct(table_schema, split(table_name, '_')[safe_offset(2)] as id, ARRAY(SELECT DISTINCT(FORMAT_DATE('%Y%m%d', date)) AS date_table_suffix FROM APPENDS(TABLE ", refresh_clients[ORDINAL(arrSize2)],".p_KeywordFloodlightAndDeviceStats_", ids[ORDINAL(arrSize2)],", DATE_SUB(CURRENT_TIMESTAMP(), INTERVAL 6 day), CURRENT_TIMESTAMP()) WHERE EXTRACT(date FROM _change_timestamp)>date AND _change_type='INSERT') AS dates) as sa360_keys FROM my_project.", refresh_clients[ORDINAL(arrSize2)],".INFORMATION_SCHEMA.TABLES WHERE CONTAINS_SUBSTR(table_name, 'p_AccountDeviceStat') AND CONTAINS_SUBSTR(ddl, 'dpb_revenue') AND CONTAINS_SUBSTR(ddl, 'dpb_transactions') ) ");
EXECUTE IMMEDIATE
  query2;
SET
  z = 0;
WHILE
  z < arrSize2 -1 DO
SET
  dates = (
  SELECT
    sa360_keys.dates
  FROM
    `my_dataset.sa360_reporting_data`
  WHERE
    sa360_keys.table_schema = refresh_clients[
  OFFSET
    (u)]);
SET
  q = 0;
SET
  dates_len = ARRAY_LENGTH(dates);
WHILE
  q < dates_len - 1 DO
SET
  query3 = CONCAT("create or replace table `",refresh_clients[
  OFFSET
    (z)],".sa360_stats_test_", dates[
  OFFSET
    (q)], "` as (WITH campaign AS (SELECT distinct(campaignId), ARRAY_AGG(campaign ORDER BY lastModifiedTimestamp DESC LIMIT 1)[OFFSET (0)] campaign FROM `my_project.",refresh_clients[
  OFFSET
    (z)],".Campaign_", ids[
  OFFSET
    (z)],"`  group by campaignid order by campaignid), adgroup AS (SELECT distinct(adGroupId), ARRAY_AGG(adgroup ORDER BY lastModifiedTimestamp DESC LIMIT 1)[OFFSET (0)] adGroup FROM `my_project.",refresh_clients[
  OFFSET
    (z)],".AdGroup_", ids[
  OFFSET
    (z)],"` group by adgroupId), keyword AS (SELECT distinct(keywordId), ARRAY_AGG(keywordText ORDER BY lastModifiedTimestamp DESC LIMIT 1)[OFFSET (0)] keywordText FROM `my_project.",refresh_clients[
  OFFSET
    (z)],".Keyword_", ids[
  OFFSET
    (z)],"` group by keywordId), revenue AS (SELECT date, sum(dfarevenue) as dfarevenue, cast(sum(dfatransactions) as float64) as dfatransactions, adid, adGroupId, keywordId, campaignid,deviceSegment FROM `my_project.",refresh_clients[
  OFFSET
    (z)],".KeywordFloodlightAndDeviceStats_", ids[
  OFFSET
    (z)],"` group by date, adgroupid, keywordid, campaignid, deviceSegment ,adid) SELECT stats.date, stats.campaignId, stats.keywordId, stats.adGroupId, stats.adid, keyword.keywordText AS keyword, adgroup.adGroup AS adgroup, campaign.campaign AS campaign, deviceSegment AS device, sum(impr) AS impressions, sum(clicks) AS clicks, sum(cost) AS cost, sum(cost*1.1) AS spend, sum(cast(revenue.dfatransactions as float64)) AS transactions, sum(dpb_room_nights) AS room_nights, sum(revenue.dfarevenue) AS revenue FROM `my_project.",refresh_clients[
  OFFSET
    (z)],".KeywordDeviceStats_", ids[
  OFFSET
    (z)],"`  AS stats LEFT JOIN revenue using(keywordid, adgroupid, campaignid, devicesegment, date, adid) left join campaign USING (campaignId) LEFT JOIN adgroup USING (adGroupID) LEFT JOIN keyword USING (keywordId) WHERE keywordId != '0' and date= PARSE_DATE('%Y%m%d', '",dates[
  OFFSET
    (q)], "') group by date, campaignid, keywordid, adgroupid, keyword, adgroup, campaign, devicesegment, adid ORDER BY date desc)");
EXECUTE IMMEDIATE
  query3;
END WHILE
  ;
END WHILE
  ;
END

 

 

 

 

0 0 201
0 REPLIES 0