Trigger Postgres stored procedure using PySpark

Hi Folks,

I have created a stored procedure in Postgres SQL named "source".add_missing_columns. I want to trigger it using PySpark or Python code.

Below is the code Im trying to execute but getting error,

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
.appName("Trigger PostgreSQL Stored Procedure") \
.getOrCreate()

# Define PostgreSQL connection properties
url = "jdbc:postgresql://[HOST]:[PORT]/[DATABASE]"
properties = {
"user": "[USERNAME]",
"password": "[PASSWORD]",
"driver": "org.postgresql.Driver"
}

# Define the name of the stored procedure
procedure_name = "\"Stg\".add_missing_columns"

# Define the call to the stored procedure with parameters
procedure_call = "CALL " + procedure_name + "('InLd', 'InStg', 'AnalytEDW')"

# Print the SQL query statement
print("SQL Query Statement:", procedure_call)

# Execute the stored procedure using a SQL query
spark.read.jdbc(url=url, table="(SELECT " + procedure_call + ")", properties=properties)

# Stop the SparkSession
spark.stop()

SQL Query Statement: CALL "StgEntr".add_missing_columns('InLd', 'InStg', 'AnalytEDW')

Error - py4j.protocol.Py4JJavaError: An error occurred while calling o69.jdbc.
: org.postgresql.util.PSQLException: ERROR: syntax error at or near "."

Has anyone tried this in any case? Any leads on this are much appreciated!

Thanks,

Vigneswar Jeyaraj

0 1 107
1 REPLY 1

The error message you're encountering, "ERROR: syntax error at or near .,", indicates a likely issue with how the stored procedure's name is being specified or handled within the SQL context.

When using this method, be mindful of connection management. Ensure connections are properly closed to avoid resource leaks. Consider using a connection pool or context manager for better handling. See the following code as an example:

from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
.appName("Trigger PostgreSQL Stored Procedure") \
.getOrCreate()

# Define PostgreSQL connection properties
url = "jdbc:postgresql://[HOST]:[PORT]/[DATABASE]"
properties = {
"user": "[USERNAME]",
"password": "[PASSWORD]",
"driver": "org.postgresql.Driver"
}

# Correctly quote the schema and procedure name if necessary
procedure_name = "\"source\".add_missing_columns"

# Construct the SQL command to call the stored procedure with parameters
procedure_call = f"CALL {procedure_name}('InLd', 'InStg', 'AnalytEDW')"

# Display the SQL command to be executed
print("SQL Query Statement:", procedure_call)

# Try executing the stored procedure using Spark SQL
try:
spark.sql(procedure_call)
print("Procedure executed successfully via Spark SQL")
except Exception as e:
print("Failed via Spark SQL. Error:", e)
# If Spark SQL fails, attempt direct JDBC execution
try:
with spark._jvm.java.sql.DriverManager.getConnection(url, properties["user"], properties["password"]) as conn:
with conn.prepareCall(procedure_call) as stmt:
stmt.execute() # Execute the procedure without fetching results
print("Procedure executed successfully via direct JDBC")
except Exception as jdbc_error:
print("Failed via direct JDBC. Error:", jdbc_error)

# Clean up by stopping the Spark session
spark.stop()