Improve app performance through pipelining queries to Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL

by Anjali Dhanerwal and Mohammed Asadulla Baig | on

PostgreSQL is an open-source object-relational database system with over 30 years of active development that has earned it a strong reputation for reliability, feature robustness, and performance. Amazon Web Services offers Amazon Relational Database Service (Amazon RDS) and Amazon Aurora as fully managed relational database services. Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL-Compatible Edition make it easy to set up, operate, and scale PostgreSQL deployments in the cloud.

A considerable proportion of OLTP and OLAP applications currently use PostgreSQL and run frequent data queries. Queries can be used to insert, update, delete, or extract information. Time is of the essence and end-users expect results from these queries with no delay. In this post, we discuss PostgreSQL pipeline mode , which was introduced in PostgreSQL 14 and is available in Amazon RDS for PostgreSQL and Aurora PostgreSQL-Compatible. By proactively managing write queues, query pipelining can improve application performance by sending queries and results in a single network round trip.

In this post, we review an example of a retail company with multiple stores that needs to analyze its sales data. The data can be stored in an RDS for PostgreSQL or Aurora database. The company wants to calculate the total sales for each store and generate a report that includes the store name, total sales, and the percentage contribution of each store to the overall sales. We show how pipeline mode in PostgreSQL can optimize report generation through maximizing how much data is sent in a single network transaction.

Solution overview

Before we start using pipeline mode, it’s important to understand how the client and PostgreSQL server communicate with each other. When issuing a query to a database, the statement is transmitted to the PostgreSQL server as a request, which constitutes the first network trip. After the server obtains the result, it transmits it back to the client, completing the entire network trip of this request in two steps.

Let’s say we have to send 50 requests to a server that has a network latency of 200 milliseconds. In this scenario, the entire sequence of queries will require 10 seconds for the network trip alone. Pipeline mode can decrease network latency, but it can utilize more memory both on client and server. However, this can be mitigated by carefully managing the send/receive queue. The following diagram illustrates this workflow.

With pipeline mode, the network trip wait time of our example is reduced to just 0.2 seconds, because only a single trip is required. The following diagram illustrates the updated workflow.

In the following sections, we walk through how to use pipeline mode with the Python language.

Switch connection mode

In PostgreSQL, connection mode refers to the way that a client application connects to the PostgreSQL server. To use pipelines, the application must switch the connection to pipeline mode. The Connection.pipeline() method is supported by Psycopg3 , a PostgreSQL database adapter, to enable pipeline mode. See the following code:

import psycopg
conn = psycopg.connect(host="xxxx",dbname="xxxx",user="xxxx",password="xxxx")

with conn.pipeline() as p:

         <<Send Burst Queries>>

Queries with pipeline mode

After the connection used by the application is set to pipeline mode, connection can group multiple operations into longer streams of messages. These requests are queued on the client side until flushed to the server, which occurs when a sync() operation is called to establish a synchronization point in the pipeline. The server runs the statements and returns results in the same order they were received from the client. The server starts running the commands in the pipeline immediately, without waiting for the end of the pipeline. As a result, the client can receive multiple responses in a single round trip.

Let’s run two operations using our earlier connection:

with conn.pipeline() as p:
    conn.execute("INSERT INTO mytable (name,mode) VALUES ('query1','pipeline')") 
    conn.execute("SELECT data FROM mytable WHERE mode = %s", ["pipeline"]) 
    p.sync()

We can run multiple operations within the pipeline block using Connection.execute() , Cursor.execute() , and executemany() by using one or more cursors:

with conn.pipeline() as p:
    conn.execute("INSERT INTO mytable (name,mode) VALUES ('query1','pipeline')")
    conn.execute("SELECT data FROM mytable WHERE mode = %s", ["pipeline"]) 
    with conn.cursor() as cur:
        cur.execute("INSERT INTO mytable (name,mode) VALUES ('query1','pipeline')")
        cur.executemany("INSERT INTO inventory VALUES (%s)",[("item1",), ("item2",), ("item3",)])
        p.sync()

Process results

In pipeline mode, Psycopg doesn’t wait for the server to receive the result of each query. Instead, the client receives results in batches when the server flushes its output buffer. This is different from normal mode behavior.

When a flush (or a sync) is performed, all pending results are sent back to the cursors that ran them. If a cursor had run more than one query, it will receive more than one result, in their run order. See the following code:

with conn.pipeline() as p:
    conn.execute("INSERT INTO mytable (name,mode) VALUES ('query1','pipeline')")
    conn.execute("SELECT data FROM mytable WHERE mode = %s", ["pipeline"]) 
    with conn.cursor() as cur:
        cur.execute("INSERT INTO mytable (name,mode) VALUES ('query1','pipeline')")
        cur.executemany("INSERT INTO inventory VALUES (%s)",[("item1",), ("item2",), ("item3",)])
        p.sync()
        while True:
            print(cur.fetchall())
            if not cur.nextset():
                break

Errors and exceptions

It’s important to note that the server encapsulates all the statements sent in pipeline mode within an implicit transaction , which will only be committed after the sync is received. As a result, if a failure occurs within a group of statements, it will likely render the effects of statements run after the previous sync invalid and will be propagated to the following sync.

In the following code, the table inventory doesn’t exist, which results in an error in the block caused by the insert statement:

with conn.pipeline() as p:
    conn.execute("INSERT INTO mytable (name,mode) VALUES ('query1','pipeline')")
    conn.execute("SELECT data FROM mytable WHERE mode = %s", ["pipeline"]) 
    with conn.cursor() as cur:
        try:
            cur.execute("INSERT INTO mytable (name,mode) VALUES ('query2','pipeline')")
            p.sync()
            cur.execute("INSERT INTO mytable (name,mode) VALUES ('query3','pipeline')")
            cur.executemany("INSERT INTO inventory VALUES (%s)",[("item1",), ("item2",), ("item3",)])
            p.sync()
        except psycopg.errors.UndefinedTable:
            pass

The error message will be raised by the sync() call. At the end of the block, the mytable table will contain the following:

SELECT * FROM mytable;
+----+----+----+----+----+
| name | mode |
+----+----+----+----+----+
| query1 | pipeline |
+----+----+----+----+----+
| query2 | pipeline |
+----+----+----+----+----+
(2 row)

Limitations of pipeline mode

Although pipeline mode provides benefits to network latency, it’s important to note that it may not be suitable for every query and may add complexity to the application. Only asynchronous operations that utilize the extended query protocol are permitted. There are certain command strings that are not allowed, including statements that include multiple SQL commands and COPY . Pipeline mode isn’t helpful when the client needs information from one operation to produce the next one. In such cases, the client must introduce a synchronization point and wait for a full client-server round trip to obtain the required results, which negates the benefits of using pipeline mode. Careful consideration and testing should be done before implementing pipeline mode in production environments. Additionally, pipeline mode requires adequate memory resources on both client and server to be effective.

Conclusion

In this post, we learned how pipelining PostgreSQL queries can help improve overall application performance by reducing query latency over the network. Although pipeline mode can provide performance benefits for some workloads, it’s important to carefully evaluate the specific requirements of each query before deciding to use pipeline mode. Overall, PostgreSQL pipeline mode is a powerful feature that can improve the performance of applications that rely on PostgreSQL for data processing.

If you have any comments or questions about this post, submit them in the comments section.


About the Authors

Anjali Dhanerwal is a Technical Account Manager with Enterprise Support, India. She joined Amazon Web Services in 2022 and supports customers in designing and constructing solutions that are highly scalable, resilient, and secure. She is also a part of the Database community and Cloud Operations community with a focus area of Amazon Aurora, Amazon RDS for PostgreSQL, and architectural health and cloud optimization.

Mohammed Asadulla Baig is a Senior Technical Account Manager with Enterprise Support, India. He joined Amazon Web Services in 2017 and helps customers plan and build highly scalable, resilient, and secure solutions. Along with his work as a Sr. TAM, he specializes in databases like Amazon Aurora and Amazon RDS for PostgreSQL. He has assisted multiple enterprise customers by enabling them with various Amazon Web Services services and provided guidance on achieving operational excellence.


The mentioned AWS GenAI Services service names relating to generative AI are only available or previewed in the Global Regions. Amazon Web Services China promotes AWS GenAI Services relating to generative AI solely for China-to-global business purposes and/or advanced technology introduction.