Security Analytics - Brute Force Attack

Summary

Log, or machine, data can contain a wealth of information and be used for multiple use cases from Security Analytics to IT Operations and Monitoring. However, it can often be very difficult to extract any meaning out of this data due to its structure and sheer volume. 

Looker can be used to give log data meaning and make it easier for end users to extract insights from a dataset that is traditionally difficult to work with. A “Brute Force Attack” is a common Security Analytics pattern that is used to detect potentially malicious activity when a user’s attempted logins are consecutively denied several times before eventually succeeding. 

Looker’s threshold-based alerts, schedules, and actions can be used to detect this activity, alert someone via email, slack, or text, and also trigger other workflows like opening a support ticket. 

Dataset

This should be setup on “Access” or “Audit” logs that contain information on users and attempted logins. In this example we are using GCP Audit Logs which contain the following required fields:  Login Time, Service Name (name of the service the user is trying to access), User Email, and Login Granted (Y/N). Your data may vary depending on what logs you are collecting but we will use these four columns as the basis of our analysis.

68cd43f9-a7c9-4309-957f-90071a27d255.png
Sample Data

Solution

Because we need to understand the specific order and timing of which these events occurred, we will need to need to use a derived table that leverages SQL window functions and user-input liquid parameters. Using a derived table with user inputs (parameters), we can construct a query to help identify security threats. Scheduling this query to alert you when certain criteria are met can help you stay on top of your security threats without having to manually check logs. Our solution uses four CTEs to calculate all of the necessary fields needed to identify threats. Below, we will dissect each CTE used in our analysis.


Disclaimer: This analysis is written in BigQuery. Some SQL functions may need to be altered if you are using a different dialect. The goal of this article is to give you the necessary building blocks so that you can implement this within your Looker instance.

CTE Breakdown

 

rank_logins CTE: The first CTE in our derived table is used to calculate three columns, Login Rank, Login Grant Rank, and Previous Login Time.

  • Login Rank - What number login is this for a given user? This rank is cumulative as time progresses and increases by one each login regardless of whether or not the user had a successful login.

  • Login Grant Rank - What number login is this for a given user but paritioned by whether or not it was a successful login. Again, this rank increases by one for each new login but in this case we rank the successful logins separately from the unsuccessful logins.

  • Previous Login Time - What is the time of the previous login?

Both of the ranks columns are calculated using the ROW_NUMBER window function from BigQuery. The Previous Login Time column is calculated using the LAG window function. 

Using date parameters we can grab the start and end date based on a user's filter selection to only capture logs within a given timeframe. This timeframe can be relative (last 24 hours) or absolute (between 2020-10-01 and 2020-10-15). 

There is an optional filter if you'd only like to capture logs from certain users (e.g. only look at logs for users with an @company.com email).

rank_logins AS (
SELECT
activity.activity_timestamp AS activity_timestamp_date,
activity.service_name AS service_name,
activity.principal_email AS principal_email,
activity.granted,
ROW_NUMBER() OVER(PARTITION BY activity.principal_email ORDER BY activity.principal_email,
activity.activity_timestamp, activity.service_name) AS login_rank,
ROW_NUMBER() OVER(PARTITION BY activity.principal_email, activity.service_name, granted ORDER BY activity.principal_email,activity.activity_timestamp,activity.service_name) AS login_grant_rank,
LAG(activity.activity_timestamp) OVER (ORDER BY activity.principal_email, activity.activity_timestamp, activity.service_name) as previous_login_time

from ${security_logs.SQL_TABLE_NAME} AS activity

-- Only include logins for a given timeframe (variable that user needs to input)
WHERE CAST(activity.activity_timestamp AS TIMESTAMP) >= {% date_start date_filter %} AND CAST(activity.activity_timestamp AS TIMESTAMP ) < {% date_end date_filter %}
-- OPTIONAL - only included users with a certain email
-- AND authentication_info.principalEmail LIKE '%@google.com'
GROUP BY 1,2,3,4
ORDER BY 3,5)

consec_logins CTE: Our second CTE in our derived table is used to calculate two new columns, Consec Login Rank and Event ID Timestamp. Consec Login Rank is calculated from Login Rank and Login Grant Rank from the rank_logins CTE.

  • Consec Login Rank - How many cumulative logins is this for a given user and service assuming we start the count over whenever we switch between a successful and unsuccessful login (and vice-versa). If you have four successful logins following by three unsuccessful logins following by two successful logins, we would rank these: 1, 2, 3, 4, 1, 2, 3, 1, 2

  • Event ID Timestamp - What’s the max timestamp for a given series of logins (each series of successful or unsuccessful logins for a given series is considered an event).

For this CTE, we use the ROW_NUMER and MAX window functions.

conseq_logins AS (
SELECT *,
ROW_NUMBER() OVER(PARTITION BY principal_email, service_name, granted, login_rank - login_grant_rank ORDER BY principal_email,login_rank) AS conseq_login_rank,
MAX(rank_logins.activity_timestamp_date) OVER (PARTITION BY principal_email, service_name, granted, login_rank - login_grant_rank) AS event_id_timestamp
FROM rank_logins
ORDER BY principal_email, login_rank
)

login_facts CTE: Our third CTE in our derived table is used to calculate two new "facts" about our logins, Latest Successful Login and Max # of Failed Consecutive Logins

  • Latest Successful Login - When was our last successful login (within the given timeframe from the first CTE)?

  • Max # of Failed Consecutive Logins - What was the max number of consecutive unsuccessful logins?

These columns are calculated for a given user and service.

login_facts AS (
SELECT
principal_email,
service_name,
MAX(CASE WHEN granted='Yes' THEN activity_timestamp_date ELSE NULL END) AS latest_successful_login,
MAX(CASE WHEN granted='No' THEN conseq_login_rank ELSE NULL END) as max_not_granted_rank
FROM conseq_logins
GROUP BY 1,2
)

max_failed_login CTE: The last CTE in our derived table is used to calculate one new column, Max Failed Login Time

  • Max Failed Login Time - What is the max time associated with the most consecutive failed logins. Note: This is not just our most recent failed login time. We specifically want the max time associated with the max number of consecutive unsuccessful logins.

This column is calculated for a given user and service.

max_failed_login AS (
SELECT
conseq_logins.principal_email,
conseq_logins.service_name,
MAX(CASE WHEN max_not_granted_rank = conseq_login_rank AND granted = 'No' THEN activity_timestamp_date ELSE NULL END) as max_failed_login_time
FROM conseq_logins
LEFT JOIN login_facts ON login_facts.principal_email = conseq_logins.principal_email AND login_facts.service_name = conseq_logins.service_name
GROUP BY 1, 2
)

Putting It All Together

Now that we have all of the necessary calculations, we will join the CTEs together to complete our analysis. In this final table, we will add a few filters to ensure we only pull back security threats that are relevant. 

  • Filter for events where the max successful login time is greater than the max failed login time. We only want to capture those events where an individual was able to gain access to our system after several unsuccessful logins.

  • Filter for failed logins above our threshold. One of the filters required for this analysis is a failed_login_threshold. This tells us how many consecutive failed logins are required before we consider something to be suspicious. For example, if you set this threshold at five, the analysis will only return events that had at least five failed logins in a row for a given service and where there was a successful login after that fifth (or higher) unsuccessful login.

  • Filter for the most recent successful login after our series of failed logins (above our threshold) so that we can see that this user successfully logged into our system after a suspicious number of unsuccessful logins.

include: "/views/**/*.view"

view: failed_logins {
derived_table: {
sql:
-- Create a temp table that has all logins by date, service, and email and number/rank them by date and date, granted per user
WITH rank_logins AS (
SELECT
activity.activity_timestamp AS activity_timestamp_date,
activity.service_name AS service_name,
activity.principal_email AS principal_email,
activity.granted,
ROW_NUMBER() OVER(PARTITION BY activity.principal_email ORDER BY activity.principal_email, activity.activity_timestamp, activity.service_name) AS login_rank,
ROW_NUMBER() OVER(PARTITION BY activity.principal_email, activity.service_name, granted ORDER BY activity.principal_email,activity.activity_timestamp,activity.service_name) AS login_grant_rank,
LAG(activity.activity_timestamp) OVER (ORDER BY activity.principal_email, activity.activity_timestamp, activity.service_name) as previous_login_time

from ${security_logs.SQL_TABLE_NAME} AS activity

-- Only include logins for a given timeframe (variable that user needs to input)
WHERE CAST(activity.activity_timestamp AS TIMESTAMP) >= {% date_start date_filter %} AND CAST(activity.activity_timestamp AS TIMESTAMP ) < {% date_end date_filter %}
-- OPTIONAL - only included users with a certain email
-- AND authentication_info.principalEmail LIKE '%@google.com'
GROUP BY 1,2,3,4
ORDER BY 3,5),

-- Working off above temp table, rank/number subsequent logins split by successful and unsuccessful logins.
conseq_logins AS
(
SELECT *,
ROW_NUMBER() OVER(PARTITION BY principal_email, service_name, granted, login_rank - login_grant_rank ORDER BY principal_email,login_rank) AS conseq_login_rank,
MAX(rank_logins.activity_timestamp_date) OVER (PARTITION BY principal_email, service_name, granted, login_rank - login_grant_rank) AS event_id_timestamp
FROM rank_logins
ORDER BY principal_email, login_rank
),

-- get last successful and unsuccessful login
login_facts AS (
SELECT
principal_email,
service_name,
MAX(CASE WHEN granted='Yes' THEN activity_timestamp_date ELSE NULL END) AS latest_successful_login,
MAX(CASE WHEN granted='No' THEN conseq_login_rank ELSE NULL END) as max_not_granted_rank
FROM conseq_logins
GROUP BY 1,2
),

max_failed_login AS (
SELECT
conseq_logins.principal_email,
conseq_logins.service_name,
MAX(CASE WHEN max_not_granted_rank = conseq_login_rank AND granted = 'No' THEN activity_timestamp_date ELSE NULL END) as max_failed_login_time
FROM conseq_logins
LEFT JOIN login_facts ON login_facts.principal_email = conseq_logins.principal_email AND login_facts.service_name = conseq_logins.service_name
GROUP BY 1, 2

)
-- select all relevant data from above temp tables
SELECT
conseq_logins.activity_timestamp_date AS activity_timestamp_date_time,
conseq_logins.service_name AS service_name,
conseq_logins.principal_email AS principal_email,
conseq_logins.granted AS granted,
conseq_logins.login_rank AS login_rank,
conseq_logins.login_grant_rank AS login_grant_rank,
conseq_logins.conseq_login_rank,
login_facts.latest_successful_login AS max_granted_time,
max_failed_login.max_failed_login_time AS max_not_granted_time,
login_facts.max_not_granted_rank as max_not_granted_rank,
conseq_logins.login_rank - conseq_logins.login_grant_rank as event_id,
conseq_logins.event_id_timestamp as event_id_timestamp,
conseq_logins.previous_login_time as previous_login_time
FROM conseq_logins
LEFT JOIN login_facts ON login_facts.principal_email = conseq_logins.principal_email AND login_facts.service_name = conseq_logins.service_name
LEFT JOIN max_failed_login ON max_failed_login.principal_email = conseq_logins.principal_email AND max_failed_login.service_name = conseq_logins.service_name

WHERE ((conseq_logins.conseq_login_rank >= {% parameter failed_login_threshold %} AND conseq_logins.granted = 'No') OR (conseq_logins.granted = 'Yes' AND conseq_logins.previous_login_time = max_failed_login.max_failed_login_time AND login_facts.max_not_granted_rank >= {% parameter failed_login_threshold %}))
and login_facts.latest_successful_login >= max_failed_login.max_failed_login_time

GROUP BY 1,2,3,4,5,6,7,8,9,10,11,12,13
ORDER BY 3,5;;
}

filter: date_filter {
type: date
}

parameter: failed_login_threshold {
type: number
default_value: "3"
}

dimension_group: previous_login_timestamp {
type: time
sql: CAST(${TABLE}.previous_login_time AS TIMESTAMP) ;;
}

dimension_group: activity_timestamp_date_time {
type: time
timeframes: [millisecond, time, hour, date, month, quarter, year]
sql: CAST(${TABLE}.activity_timestamp_date_time AS TIMESTAMP) ;;
}

dimension: service_name {
type: string
sql: ${TABLE}.service_name ;;
}

dimension: event_id {
type: number
sql: ${TABLE}.event_id ;;
}

dimension: principal_email {
type: string
sql: ${TABLE}.principal_email ;;
}

dimension: granted {
type: string
sql: CASE WHEN ${TABLE}.granted = 'Yes' THEN 'Granted'
ELSE 'Denied' END ;;
html:
{% if value == 'Granted' %}
<div style="background: #8BC34A; border-radius: 2px; color: #fff; display: inline-block; font-size: 11px; font-weight: bold; line-height: 1; padding: 3px 4px; width: 100%; text-align: center;">{{ rendered_value }}</div>
{% elsif value == 'Denied' %}
<div style="background: #FF0000; border-radius: 2px; color: #fff; display: inline-block; font-size: 11px; font-weight: bold; line-height: 1; padding: 3px 4px; width: 100%; text-align: center;">{{ rendered_value }}</div>
{% endif %} ;;
}

dimension: conseq_login_rank {
type: number
sql: ${TABLE}.conseq_login_rank ;;
}
dimension: login_rank {
type: number
sql: ${TABLE}.login_rank ;;
}

dimension: login_grant_rank {
type: number
sql: ${TABLE}.login_grant_rank ;;
}

dimension_group: event_id_timestamp {
type: time
sql: CAST(${TABLE}.event_id_timestamp AS TIMESTAMP) ;;
}

dimension_group: max_granted_time {
type: time
sql: CAST(${TABLE}.max_granted_time AS TIMESTAMP) ;;
}

dimension_group: max_not_granted_time {
type: time
sql: CAST(${TABLE}.max_not_granted_time AS TIMESTAMP) ;;
}

dimension: max_not_granted_rank {
label: "Max # of Consecutive Denies"
type: number
sql: ${TABLE}.max_not_granted_rank ;;
}

set: detail {
fields: [
activity_timestamp_date_time_time,
service_name,
principal_email,
granted,
login_rank,
login_grant_rank,
max_granted_time_time
]
}
}
3 0 410
0 REPLIES 0
Top Labels in this Space
Top Solution Authors