Connect to AWS RDS (Entrata Postgres SQL) using Composer

Hi Team,

I'm trying to perform incremental load of data from AWS RDS (Entrata PostgreSQL) to Cloud SQL PostgreSQL. I'm trying to set up this pipeline using Composer submitting as a Dataproc serverless job. Can you please help me with the best connection options including networking, VPC settings etc between GCP & AWS?

Thanks,

Vigneswar Jeyaraj

4 1 116
1 REPLY 1

To set up an incremental data load from AWS RDS (PostgreSQL) to Cloud SQL (PostgreSQL) using Cloud Composer and submitting jobs to Dataproc Serverless, you need to consider several aspects including networking, security, and data transfer methods. Here are some steps that might help configure the necessary settings:

Networking and VPC Settings

You have several options depending on your desired level of security and network isolation:

  • Public IPs with Firewall Rules

    • Simplest Setup: If both your AWS RDS and Cloud SQL PostgreSQL instances have public IPs, you can configure port 5432 (PostgreSQL) in your AWS RDS instance's firewall settings. Only allow traffic from the IP range of your Composer/Dataproc environment.
    • Specific Configuration: In the AWS RDS Security Group, open port 5432 for incoming traffic specifically from known  Cloud Composer and Dataproc Serverless IP ranges (find these in Google Cloud documentation).
    • Drawbacks: Less secure, exposes RDS instance to the public internet. Not recommended for production.
  • VPC Peering

    • More Secure: Creates a private network link between your Google Cloud project and AWS VPC. Composer/Dataproc can communicate with RDS privately, avoiding public internet exposure.
    • Setup:
      1. Create a VPC peering connection (Google Cloud project to AWS VPC).
      2. Configure network routes on both sides for traffic flow.
      3. Update the RDS security group to allow traffic from the Google Cloud peering subnet.
  • AWS PrivateLink

    • Hybrid Approach: Exposes RDS instance as a private endpoint within your VPC. Connect to this endpoint over VPC peering for increased security.
    • Note: Setting up PrivateLink can be complex but provides excellent security.

Data Transfer

For incremental data transfer, consider this setup:

  • Dataproc Serverless with PySpark:

    • Create a PySpark job on Dataproc Serverless (orchestrated via Cloud Composer).
    • Use PostgreSQL JDBC driver in your PySpark job to connect to AWS RDS and Cloud SQL databases.
  • Change Data Capture (CDC):

    • Timestamps: Use if RDS database tables have timestamp columns for filtering new/updated data.
    • AWS Database Migration Service (DMS): Great for initial migrations, might be overkill for ongoing incremental loads unless dealing with high data volume and change frequency.
    • Third-Party CDC Tools: Like Debezium (requires additional infrastructure like Kafka). Excellent for real-time data sync but adds complexity.

Example

 
import pyspark.sql.functions as F 
from pyspark.sql import SparkSession 

# JDBC connection details
rds_jdbc_url = "jdbc:postgresql://<RDS_HOSTNAME>:<PORT>/<DATABASE_NAME>"
cloud_sql_jdbc_url = "jdbc:postgresql://<CLOUD_SQL_HOSTNAME>:<PORT>/<DATABASE_NAME>"

# Create a Spark session
spark = SparkSession.builder.appName("incremental_load").getOrCreate()

# Get last load time from Composer, update as needed 
last_load_time = composer_environment.get_variable("last_load_time")

# Load data from AWS RDS 
df_rds = spark.read.format("jdbc") \ 
    .option("url", rds_jdbc_url) \ 
    .option("user", "<RDS_USERNAME>") \ 
    .option("password", "<RDS_PASSWORD>") \ 
    .option("dbtable", "<TABLE_NAME>") \ 
    .load() 

# Filter for new data based on timestamp
df_new_data = df_rds.filter(F.col("last_modified_timestamp") > last_load_time) 

# Write incremental data to Cloud SQL
df_new_data.write.format("jdbc") \ 
    .option("url", cloud_sql_jdbc_url) \ 
    .option("user", "<CLOUD_SQL_USERNAME>") \ 
    .option("password", "<CLOUD_SQL_PASSWORD>") \ 
    .option("dbtable", "<TABLE_NAME>") \ 
    .mode("append") \ 
    .save() 

# Update Composer variable to persist new 'last_load_time'
composer_environment.set_variable("last_load_time", current_timestamp) 

Scheduling

  • Regular Execution: Use Cloud Composer's DAGs to schedule this job at defined intervals for regular data updates.