display historical pipeline with hubspot deal_stage table from fivetran

  • 10 June 2022
  • 0 replies
  • 27 views

When I am working on cRM data, I always use the crm block form keboola with a bit of the salesforce block
I was working on hubspot data but there is no snapshot data with the fivetran connector, only this deal_stage table which basically records the timestamp when the deal stage changes. 
In order to make the historical_snapshot view from keboola to work, I need to refactor this table so that I can use the great dashboard from the block, which I did using the following derived table (bigquery):

 

view: deal_stage_refactored {
derived_table: {
sql: WITH dates AS (SELECT day FROM
UNNEST(GENERATE_DATE_ARRAY(DATE('2020-06-01'), CURRENT_DATE(),
INTERVAL 1 DAY)) AS day
)
, deal_stage_refactored AS (SELECT deal_id,date_entered,value,
case
when lead(date_entered) OVER (partition by deal_id ORDER BY date_entered ASC) is null then date_add(date_entered, INTERVAL 1 DAY)
else lead(date_entered) OVER (partition by deal_id ORDER BY date_entered ASC)
end as next_date,
lag(value) OVER (partition by deal_id ORDER BY date_entered ASC) as previous_stage
FROM (SELECT deal_id,date(date_entered) as date_entered,max(value) as value FROM `hubspot.deal_stage` where _fivetran_active is false group by 1,2)
)
SELECT
(dates.day ) AS date,
deal_stage_refactored.value AS value,
deal_stage_refactored.deal_id AS deal_id,
deal_stage_refactored.previous_stage as previous_stage
FROM dates
LEFT JOIN deal_stage_refactored ON dates.day>=deal_stage_refactored.date_entered and dates.day<deal_stage_refactored.next_date
union all
SELECT
(dates.day ) AS date,
deal.deal_pipeline_stage_id AS value,
deal.deal_id AS deal_id,
null as previous_stage
FROM dates
LEFT JOIN `hubspot.deal` as deal ON dates.day>=date(deal.property_createdate) ;;
}


dimension: deal_id {
type: number
sql: ${TABLE}.deal_id ;;
}

dimension_group: snapshot {
type: time
timeframes: [
raw,
date,
week,
month,
quarter,
year
]
convert_tz: no
datatype: date
sql: ${TABLE}.date ;;
}

dimension: stage {
type: string
sql: ${TABLE}.value ;;
order_by_field: historical_deal_pipeline_stage.display_order
}

dimension: stage_change {
description: "Defines if the stage has changed since the previous snapshot"
type: yesno
sql: ${previous_stage} is null or ${previous_stage}!=${stage} ;;
}

dimension: previous_stage {
type: string
sql: ${TABLE}.previous_stage ;;
order_by_field: previous_deal_pipeline_stage.display_order
}

dimension: snapshot_id {
type: string
sql: CONCAT(${deal_id}, '_',${snapshot_raw}) ;;
hidden: yes
primary_key: yes
}

dimension: opportunity_value_change {
description: "Defines if the value has changed since the previous snapshot"
type: yesno
sql: ${opportunity.opportunity_value_dimension}*${historical_deal_pipeline_stage.probability}<>${opportunity.opportunity_value_dimension}*${previous_deal_pipeline_stage.probability} ;;
}

dimension: previous_opportunity_value_dimension {
hidden: yes
type: number
sql: ${opportunity.opportunity_value_dimension}*${previous_deal_pipeline_stage.probability} ;;
}

dimension: probability_change {
description: "Defines if the probability has changed since the previous snapshot"
type: yesno
sql: ${historical_deal_pipeline_stage.probability}<>${previous_deal_pipeline_stage.probability} ;;
}

measure: opportunity_value {
description: "Opportunity value at the time of snapshot"
type: sum_distinct
sql: ${opportunity.opportunity_value_dimension} ;;
value_format: "#,##0"
drill_fields: [company_opportunity_employee*,opportunity_value]
}

measure: previous_opportunity_value {
description: "Opportunity value of the previous snapshot"
type: sum_distinct
sql: ${opportunity.opportunity_value_dimension} ;;
value_format: "#,##0"
}

measure: opportunity_value_weighted {
description: "Weighted Opportunity value at the time of snapshot"
type: sum_distinct
sql: ${opportunity.opportunity_value_dimension}*${historical_deal_pipeline_stage.probability} ;;
value_format: "#,##0"
drill_fields: [company_opportunity_employee*,opportunity_value_weighted]
}

measure: previous_opportunity_value_weighted {
description: "Weighted Opportunity value of the previous snapshot"
type: sum_distinct
sql: ${opportunity.opportunity_value_dimension}*${previous_deal_pipeline_stage.probability} ;;
value_format: "#,##0"
}

measure: probability {
description: "Probability in the time of snapshot"
type: sum_distinct
sql: ${historical_deal_pipeline_stage.probability} ;;
value_format: "##0%"
}

measure: previous_probability {
description: "Probability of the previous snapshot"
type: sum_distinct
sql: ${previous_deal_pipeline_stage.probability} ;;
value_format: "##0%"
}

measure: opportunities {
type: count_distinct
sql: ${deal_id} ;;
drill_fields: [company_opportunity_employee*, opportunity.opportunity_value]
}

measure: count {
type: count
drill_fields: [company_opportunity_employee*, count]
}

set: company_opportunity_employee {
fields: [
company.company,
opportunity.opportunity,
employee.employee
]
}

what this is doing is, first create a dates table as per this post which I have used many times
 

WITH dates AS (SELECT day FROM
UNNEST(GENERATE_DATE_ARRAY(DATE('2020-06-01'), CURRENT_DATE(),
INTERVAL 1 DAY)) AS day
)

then, as I only want one stage per day, I only use the latest one in case there are 2 change  per days:

SELECT deal_id,date(date_entered) as date_entered,max(value) as value  FROM `hubspot.deal_stage` where _fivetran_active is false group by 1,2

then I use 2 window functions in this table table to get the next date, and when there is no next date, add one (you will see later why). We also ignore the latest change as we will get it from the deal table later. We also need to get the previous stage:

deal_stage_refactored AS (SELECT deal_id,date_entered,value,
case
when lead(date_entered) OVER (partition by deal_id ORDER BY date_entered ASC) is null then date_add(date_entered, INTERVAL 1 DAY)
else lead(date_entered) OVER (partition by deal_id ORDER BY date_entered ASC)
end as next_date,
lag(value) OVER (partition by deal_id ORDER BY date_entered ASC) as previous_stage
FROM (SELECT deal_id,date(date_entered) as date_entered,max(value) as value FROM `hubspot.deal_stage` where _fivetran_active is false group by 1,2)

then, we are mimicking the structure of the snapshot table form salesforce by just filling the missing dates using the join on our dates table:

SELECT
(dates.day ) AS date,
deal_stage_refactored.value AS value,
deal_stage_refactored.deal_id AS deal_id,
deal_stage_refactored.previous_stage as previous_stage
FROM dates
LEFT JOIN deal_stage_refactored ON dates.day>=deal_stage_refactored.date_entered and dates.day<deal_stage_refactored.next_date

then, we also get the data from the deal table for all the deal whose stage didn’t change, so they don’t show up in the deal_stage table:

union all
SELECT
(dates.day ) AS date,
deal.deal_pipeline_stage_id AS value,
deal.deal_id AS deal_id,
null as previous_stage
FROM dates

then, in our model, we also join our deal_pipeline_stage for stage and previous stage:
 

  join: opportunity_snapshot {
from: deal_stage_refactored
type: left_outer
sql_on: ${opportunity.opportunity_id} = ${opportunity_snapshot.deal_id};;
relationship: one_to_many
}

join: historical_deal_pipeline_stage {
from: deal_pipeline_stage
relationship: many_to_one
sql_on: ${opportunity_snapshot.stage}=${historical_deal_pipeline_stage.stage_id} ;;
}

join: previous_deal_pipeline_stage {
from: deal_pipeline_stage
relationship: many_to_one
sql_on: ${opportunity_snapshot.previous_stage}=${previous_deal_pipeline_stage.stage_id} ;;
}

and then we can get our nice graph using the dashboard from the keboola block
 

 


0 replies

Be the first to reply!

Reply