Database in Rust: Wire Protocol and Result Set Serialization

  1. 1 The Wire Protocol Overview
  2. 2 Connection Startup
  3. 3 Simple Query Protocol
  4. 4 Extended Query Protocol
  5. 5 Complete Query Execution Flow
  6. 6 PostgreSQL Type OIDs
  7. 7 Challenges Building in Rust
  8. 8 How AI Accelerated This
  9. Summary: Wire Protocol in One Diagram

In Part 4, we built WAL and crash recovery. Our database can now survive power failures. But thereโ€™s a problem.

How do clients actually talk to our database?

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”                          โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚   psql      โ”‚                          โ”‚  Vaultgres  โ”‚
โ”‚   client    โ”‚                          โ”‚   server    โ”‚
โ”‚             โ”‚     ??? How to talk ???  โ”‚             โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜                          โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

We could invent our own protocol. But then weโ€™d need to build a client from scratch.

Better approach: Speak PostgreSQLโ€™s wire protocol. Then psql, JDBC, libpqโ€”all existing toolsโ€”just work.

Today: implementing the PostgreSQL wire protocol in Rust, from startup handshake to result set serialization.


1 The Wire Protocol Overview

Frontend/Backend Model

PostgreSQL uses a frontend/backend architecture:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚                    PostgreSQL Protocol                       โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚                                                              โ”‚
โ”‚  Frontend (Client)          Backend (Server)                โ”‚
โ”‚  - psql                     - Vaultgres                     โ”‚
โ”‚  - libpq (C driver)         - Query processor               โ”‚
โ”‚  - JDBC/ODBC              - Storage engine                 โ”‚
โ”‚  - psycopg (Python)         - Transaction manager           โ”‚
โ”‚                                                              โ”‚
โ”‚  Communication: TCP/IP (usually port 5432)                  โ”‚
โ”‚  Message format: Length-prefixed binary protocol            โ”‚
โ”‚                                                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Message Structure

Every message has the same format:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ Message Format                                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ฌโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”   โ”‚
โ”‚ โ”‚ Type (1B)   โ”‚ Length (4B, includes itself)            โ”‚   โ”‚
โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ดโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค   โ”‚
โ”‚ โ”‚ Payload (variable)                                     โ”‚   โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜   โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Example: SimpleQuery ('Q')
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ 'Q' โ”‚ 0x00 0x00 0x00 0x1A โ”‚ "SELECT * FROM users\0"        โ”‚
โ”‚  1B โ”‚      4B (26 bytes)   โ”‚ variable (null-terminated)     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Key insight: Length is big-endian (network byte order) and includes itself (not the type byte).


Message Types

Type Code Direction Purpose
StartupMessage (none) Fโ†’B Initial connection (no type byte)
AuthenticationOk โ€˜Rโ€™ Bโ†’F Login successful
Query โ€˜Qโ€™ Fโ†’B Simple query (SQL string)
RowDescription โ€˜Tโ€™ Bโ†’F Column metadata
DataRow โ€˜Dโ€™ Bโ†’F Actual row data
CommandComplete โ€˜Cโ€™ Bโ†’F Query finished
ReadyForQuery โ€˜Zโ€™ Bโ†’F Server ready for next query
ErrorResponse โ€˜Eโ€™ Bโ†’F Something went wrong
Parse โ€˜Pโ€™ Fโ†’B Extended query: prepare
Bind โ€˜Bโ€™ Fโ†’B Extended query: bind parameters
Execute โ€˜Eโ€™ Fโ†’B Extended query: run
Sync โ€˜Sโ€™ Fโ†’B Extended query: finish batch

Fโ†’B = Frontend to Backend, Bโ†’F = Backend to Frontend


2 Connection Startup

The Handshake Flow

sequenceDiagram participant Client participant Server Client->>Server: StartupMessage (user, database, options) Server->>Client: AuthenticationOk Server->>Client: ParameterStatus (server_version, encoding, ...) Server->>Client: ReadyForQuery (idle) Client->>Server: Query / Extended Query Server->>Client: RowDescription (for SELECT) Server->>Client: DataRow ร— N Server->>Client: CommandComplete Server->>Client: ReadyForQuery (idle)

StartupMessage

The first message is specialโ€”no type byte, just length:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ StartupMessage                                              โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Length (4B): 8 + parameters                                 โ”‚
โ”‚ Protocol Version (4B): 196608 (3.0)                         โ”‚
โ”‚ Parameters (null-terminated key=value pairs):               โ”‚
โ”‚   "user\0neo\0database\0vaultgres\0\0"                      โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
// src/wire_protocol/startup.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

pub struct StartupMessage {
    pub user: String,
    pub database: String,
    pub options: HashMap<String, String>,
}

impl StartupMessage {
    pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
        // Read length (4 bytes, big-endian)
        let mut len_buf = [0u8; 4];
        stream.read_exact(&mut len_buf).await?;
        let len = u32::from_be_bytes(len_buf);

        // Read protocol version
        let mut version_buf = [0u8; 4];
        stream.read_exact(&mut version_buf).await?;
        let version = u32::from_be_bytes(version_buf);

        if version != 196608 {
            return Err(ProtocolError::UnsupportedVersion(version));
        }

        // Read parameters (null-terminated key=value pairs)
        let mut params = HashMap::new();
        let mut remaining = len - 8;  // Subtract length and version bytes

        while remaining > 1 {
            let mut key = Vec::new();
            let mut byte = [0u8; 1];
            
            loop {
                stream.read_exact(&mut byte).await?;
                remaining -= 1;
                if byte[0] == 0 { break; }
                key.push(byte[0]);
            }

            if key.is_empty() { break; }  // Empty key = end of parameters

            let mut value = Vec::new();
            loop {
                stream.read_exact(&mut byte).await?;
                remaining -= 1;
                if byte[0] == 0 { break; }
                value.push(byte[0]);
            }

            let key = String::from_utf8(key)?;
            let value = String::from_utf8(value)?;
            params.insert(key, value);
        }

        Ok(Self {
            user: params.remove("user").unwrap_or_default(),
            database: params.remove("database").unwrap_or_default(),
            options: params,
        })
    }
}

Authentication and ParameterStatus

// src/wire_protocol/messages.rs
pub struct MessageBuilder {
    buffer: Vec<u8>,
}

impl MessageBuilder {
    pub fn new() -> Self {
        Self { buffer: Vec::new() }
    }

    pub fn authentication_ok(&mut self) -> &[u8] {
        // 'R' (1B) + Length (4B) + Auth Type (4B = 0 for Ok)
        self.buffer.clear();
        self.buffer.push(b'R');
        self.buffer.extend_from_slice(&12u32.to_be_bytes());  // Length
        self.buffer.extend_from_slice(&0u32.to_be_bytes());   // AuthOk
        &self.buffer
    }

    pub fn parameter_status(&mut self, name: &str, value: &str) -> &[u8] {
        // 'S' (1B) + Length (4B) + name\0 + value\0
        self.buffer.clear();
        self.buffer.push(b'S');
        
        let payload_len = 4 + name.len() + 1 + value.len() + 1;
        self.buffer.extend_from_slice(&(payload_len as u32).to_be_bytes());
        self.buffer.extend_from_slice(name.as_bytes());
        self.buffer.push(0);
        self.buffer.extend_from_slice(value.as_bytes());
        self.buffer.push(0);
        
        &self.buffer
    }

    pub fn ready_for_query(&mut self, status: TransactionStatus) -> &[u8] {
        // 'Z' (1B) + Length (4B) + Status (1B)
        self.buffer.clear();
        self.buffer.push(b'Z');
        self.buffer.extend_from_slice(&5u32.to_be_bytes());
        self.buffer.push(status as u8);
        &self.buffer
    }
}

#[derive(Debug, Clone, Copy)]
#[repr(u8)]
pub enum TransactionStatus {
    Idle = b'I',
    InTransaction = b'T',
    InFailedTransaction = b'E',
}

Server sends these parameters:

Parameter Value Purpose
server_version 16.0 PostgreSQL version weโ€™re emulating
server_encoding UTF8 Character encoding
client_encoding UTF8 Clientโ€™s encoding
integer_datetimes on 64-bit integer timestamps

3 Simple Query Protocol

Query Flow

Client: Query("SELECT id, name FROM users WHERE id = 1")
Server: RowDescription (column metadata)
Server: DataRow (row 1)
Server: DataRow (row 2)
...
Server: CommandComplete ("SELECT 2")
Server: ReadyForQuery ('I')

RowDescription: Telling Clients About Columns

// src/wire_protocol/row_description.rs
pub struct FieldDescription {
    pub name: String,
    pub table_oid: u32,
    pub column_attr_num: i16,
    pub type_oid: u32,
    pub type_size: i16,
    pub type_modifier: i32,
    pub format_code: i16,  // 0 = text, 1 = binary
}

pub struct RowDescription {
    pub fields: Vec<FieldDescription>,
}

impl RowDescription {
    pub fn serialize(&self, builder: &mut MessageBuilder) -> &[u8] {
        // 'T' (1B) + Length (4B) + Num Fields (2B) + Fields...
        builder.buffer.clear();
        builder.buffer.push(b'T');
        
        // Calculate payload length
        let payload_len = 2 + (self.fields.len() * 19) + 
            self.fields.iter().map(|f| f.name.len() + 1).sum::<usize>();
        
        builder.buffer.extend_from_slice(&(payload_len as u32).to_be_bytes());
        builder.buffer.extend_from_slice(&(self.fields.len() as i16).to_be_bytes());
        
        for field in &self.fields {
            builder.buffer.extend_from_slice(field.name.as_bytes());
            builder.buffer.push(0);  // Null terminator
            builder.buffer.extend_from_slice(&field.table_oid.to_be_bytes());
            builder.buffer.extend_from_slice(&field.column_attr_num.to_be_bytes());
            builder.buffer.extend_from_slice(&field.type_oid.to_be_bytes());
            builder.buffer.extend_from_slice(&field.type_size.to_be_bytes());
            builder.buffer.extend_from_slice(&field.type_modifier.to_be_bytes());
            builder.buffer.extend_from_slice(&field.format_code.to_be_bytes());
        }
        
        &builder.buffer
    }
}

Example output:

SELECT id, name FROM users

RowDescription:
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ 'T' โ”‚ Length โ”‚ 2 fields                                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Field 1: "id"                                               โ”‚
โ”‚   table_oid: 16384                                          โ”‚
โ”‚   column_attr_num: 1                                        โ”‚
โ”‚   type_oid: 23 (INT4)                                       โ”‚
โ”‚   type_size: 4                                              โ”‚
โ”‚   type_modifier: -1                                         โ”‚
โ”‚   format_code: 0 (text)                                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Field 2: "name"                                             โ”‚
โ”‚   table_oid: 16384                                          โ”‚
โ”‚   column_attr_num: 2                                        โ”‚
โ”‚   type_oid: 25 (TEXT)                                       โ”‚
โ”‚   type_size: -1 (variable)                                  โ”‚
โ”‚   type_modifier: -1                                         โ”‚
โ”‚   format_code: 0 (text)                                     โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

DataRow: Serializing Actual Rows

// src/wire_protocol/data_row.rs
pub struct DataRow {
    pub values: Vec<Option<Vec<u8>>>,  // None = NULL
    pub format_codes: Vec<i16>,
}

impl DataRow {
    pub fn serialize(&self, builder: &mut MessageBuilder) -> &[u8] {
        // 'D' (1B) + Length (4B) + Num Values (2B) + Values...
        builder.buffer.clear();
        builder.buffer.push(b'D');
        
        // Calculate payload length
        let mut payload_len = 2u32;  // Num values
        for value in &self.values {
            payload_len += 4;  // Length prefix
            if let Some(data) = value {
                payload_len += data.len() as u32;
            }
        }
        
        builder.buffer.extend_from_slice(&payload_len.to_be_bytes());
        builder.buffer.extend_from_slice(&(self.values.len() as i16).to_be_bytes());
        
        for value in &self.values {
            match value {
                None => {
                    // NULL: length = -1
                    builder.buffer.extend_from_slice(&(-1i32).to_be_bytes());
                }
                Some(data) => {
                    // Non-NULL: length + data
                    builder.buffer.extend_from_slice(&(data.len() as i32).to_be_bytes());
                    builder.buffer.extend_from_slice(data);
                }
            }
        }
        
        &builder.buffer
    }
}

Example:

Row: id=1, name="Alice", email=NULL

DataRow:
โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ 'D' โ”‚ Length โ”‚ 3 values                                     โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Value 1: 4 bytes โ”‚ "1"                                      โ”‚
โ”‚ Value 2: 5 bytes โ”‚ "Alice"                                  โ”‚
โ”‚ Value 3: -1 (NULL)                                          โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Text vs. Binary Format

Text format (format_code = 0): Human-readable strings

INT4: "42"
TEXT: "Alice"
TIMESTAMP: "2026-03-29 14:30:00.123456+00"

Binary format (format_code = 1): Native representation

// src/wire_protocol/type_encoding.rs
pub fn encode_int4(value: i32, format: i16) -> Vec<u8> {
    match format {
        0 => value.to_string().into_bytes(),  // Text
        1 => value.to_be_bytes().to_vec(),    // Binary
        _ => panic!("Invalid format code"),
    }
}

pub fn encode_text(value: &str, format: i16) -> Vec<u8> {
    match format {
        0 => value.as_bytes().to_vec(),       // Text (UTF-8)
        1 => {
            // Binary: 4-byte length prefix + data
            let mut buf = Vec::new();
            buf.extend_from_slice(&(value.len() as i32).to_be_bytes());
            buf.extend_from_slice(value.as_bytes());
            buf
        }
        _ => panic!("Invalid format code"),
    }
}

pub fn encode_timestamp(value: chrono::DateTime<chrono::Utc>, format: i16) -> Vec<u8> {
    match format {
        0 => value.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string().into_bytes(),
        1 => {
            // PostgreSQL epoch: 2000-01-01 00:00:00 UTC
            let epoch = chrono::DateTime::from_timestamp(946684800, 0).unwrap();
            let micros = value.signed_duration_since(epoch).num_microseconds().unwrap();
            micros.to_be_bytes().to_vec()
        }
        _ => panic!("Invalid format code"),
    }
}

4 Extended Query Protocol

Why Extended Query?

Simple Query: SQL injection risk, no prepared statements

Client: Query("SELECT * FROM users WHERE id = " + user_input)
โ†’ SQL injection vulnerability!

Extended Query: Prepared statements, parameter binding

Client: Parse("SELECT * FROM users WHERE id = $1")
Client: Bind([42])
Client: Execute()
โ†’ Safe from SQL injection!

Extended Query Flow

sequenceDiagram participant Client participant Server Client->>Server: Parse (SQL, parameter types) Server->>Client: ParseComplete Client->>Server: Bind (parameter values) Server->>Client: BindComplete loop Multiple executions Client->>Server: Execute (max_rows) Server->>Client: DataRow ร— N end Client->>Server: Sync Server->>Client: CommandComplete Server->>Client: ReadyForQuery

Parse: Preparing a Statement

// src/wire_protocol/parse.rs
pub struct ParseMessage {
    pub statement_name: String,
    pub query: String,
    pub parameter_types: Vec<u32>,  // OID for each parameter
}

impl ParseMessage {
    pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
        // statement_name (null-terminated)
        let statement_name = read_null_terminated(stream).await?;
        
        // query (null-terminated)
        let query = read_null_terminated(stream).await?;
        
        // num_parameter_types (2B)
        let mut num_types_buf = [0u8; 2];
        stream.read_exact(&mut num_types_buf).await?;
        let num_types = i16::from_be_bytes(num_types_buf);
        
        // parameter_types (4B each)
        let mut parameter_types = Vec::new();
        for _ in 0..num_types {
            let mut type_buf = [0u8; 4];
            stream.read_exact(&mut type_buf).await?;
            parameter_types.push(u32::from_be_bytes(type_buf));
        }
        
        Ok(Self {
            statement_name,
            query,
            parameter_types,
        })
    }
}

// Server response
pub fn parse_complete(builder: &mut MessageBuilder) -> &[u8] {
    // '1' (1B) + Length (4B = 4)
    builder.buffer.clear();
    builder.buffer.push(b'1');
    builder.buffer.extend_from_slice(&4u32.to_be_bytes());
    &builder.buffer
}

Bind: Creating a Portal

// src/wire_protocol/bind.rs
pub struct BindMessage {
    pub portal_name: String,
    pub statement_name: String,
    pub parameter_format_codes: Vec<i16>,
    pub parameter_values: Vec<Option<Vec<u8>>>,
    pub result_format_codes: Vec<i16>,
}

impl BindMessage {
    pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
        // portal_name (null-terminated)
        let portal_name = read_null_terminated(stream).await?;
        
        // statement_name (null-terminated)
        let statement_name = read_null_terminated(stream).await?;
        
        // num_parameter_format_codes (2B)
        let num_formats = read_i16(stream).await?;
        
        // parameter_format_codes
        let mut parameter_format_codes = Vec::new();
        for _ in 0..num_formats {
            parameter_format_codes.push(read_i16(stream).await?);
        }
        
        // num_parameter_values (2B)
        let num_values = read_i16(stream).await?;
        
        // parameter_values
        let mut parameter_values = Vec::new();
        for _ in 0..num_values {
            let len = read_i32(stream).await?;
            if len == -1 {
                parameter_values.push(None);  // NULL
            } else {
                let mut data = vec![0u8; len as usize];
                stream.read_exact(&mut data).await?;
                parameter_values.push(Some(data));
            }
        }
        
        // num_result_format_codes (2B)
        let num_result_formats = read_i16(stream).await?;
        
        // result_format_codes
        let mut result_format_codes = Vec::new();
        for _ in 0..num_result_formats {
            result_format_codes.push(read_i16(stream).await?);
        }
        
        Ok(Self {
            portal_name,
            statement_name,
            parameter_format_codes,
            parameter_values,
            result_format_codes,
        })
    }
}

// Server response
pub fn bind_complete(builder: &mut MessageBuilder) -> &[u8] {
    // '2' (1B) + Length (4B = 4)
    builder.buffer.clear();
    builder.buffer.push(b'2');
    builder.buffer.extend_from_slice(&4u32.to_be_bytes());
    &builder.buffer
}

Execute: Running the Prepared Statement

// src/wire_protocol/execute.rs
pub struct ExecuteMessage {
    pub portal_name: String,
    pub max_rows: i32,  // 0 = all rows
}

impl ExecuteMessage {
    pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
        let portal_name = read_null_terminated(stream).await?;
        let max_rows = read_i32(stream).await?;
        
        Ok(Self { portal_name, max_rows })
    }
}

Server response: DataRow messages (no specific โ€œExecuteCompleteโ€ message)


Sync: Finishing the Batch

// src/wire_protocol/sync.rs
pub struct SyncMessage;

impl SyncMessage {
    pub async fn read_from(_stream: &mut TcpStream) -> Result<Self, ProtocolError> {
        // Sync has no body, just the message header
        Ok(SyncMessage)
    }
}

// Server response
pub fn sync_complete(builder: &mut MessageBuilder, status: TransactionStatus) -> &[u8] {
    // CommandComplete + ReadyForQuery
    builder.buffer.clear();
    
    // CommandComplete: 'C' + Length + "SELECT 2\0"
    builder.buffer.push(b'C');
    let cmd = b"SELECT 2";
    builder.buffer.extend_from_slice(&((cmd.len() + 1) as u32).to_be_bytes());
    builder.buffer.extend_from_slice(cmd);
    builder.buffer.push(0);
    
    &builder.buffer
}

5 Complete Query Execution Flow

Putting It All Together

// src/wire_protocol/handler.rs
use tokio::net::TcpStream;
use crate::query_executor::QueryExecutor;
use crate::storage::buffer_pool::BufferPool;

pub struct ProtocolHandler {
    stream: TcpStream,
    executor: QueryExecutor,
    builder: MessageBuilder,
    prepared_statements: HashMap<String, PreparedStatement>,
    portals: HashMap<String, Portal>,
}

impl ProtocolHandler {
    pub async fn handle_connection(mut stream: TcpStream) -> Result<(), ProtocolError> {
        // 1. Read startup message
        let startup = StartupMessage::read_from(&mut stream).await?;
        
        // 2. Send authentication
        stream.write_all(self.builder.authentication_ok()).await?;
        
        // 3. Send parameter status
        stream.write_all(self.builder.parameter_status("server_version", "16.0")).await?;
        stream.write_all(self.builder.parameter_status("server_encoding", "UTF8")).await?;
        stream.write_all(self.builder.parameter_status("client_encoding", "UTF8")).await?;
        
        // 4. Send ready for query
        stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
        
        // 5. Main message loop
        loop {
            let mut type_buf = [0u8; 1];
            stream.read_exact(&mut type_buf).await?;
            
            match type_buf[0] as char {
                'Q' => self.handle_simple_query(&mut stream).await?,
                'P' => self.handle_parse(&mut stream).await?,
                'B' => self.handle_bind(&mut stream).await?,
                'E' => self.handle_execute(&mut stream).await?,
                'S' => self.handle_sync(&mut stream).await?,
                'X' => {
                    // Terminate
                    return Ok(());
                }
                _ => return Err(ProtocolError::UnknownMessage(type_buf[0])),
            }
        }
    }
    
    async fn handle_simple_query(&mut self, stream: &mut TcpStream) -> Result<(), ProtocolError> {
        // Read query string
        let query = read_null_terminated(stream).await?;
        
        // Execute query
        let result = self.executor.execute(&query).await?;
        
        // Send RowDescription (if SELECT)
        if let Some(columns) = result.columns {
            let row_desc = self.create_row_description(&columns);
            stream.write_all(row_desc.serialize(&mut self.builder)).await?;
            
            // Send DataRows
            for row in result.rows {
                let data_row = self.create_data_row(&row);
                stream.write_all(data_row.serialize(&mut self.builder)).await?;
            }
        }
        
        // Send CommandComplete
        self.builder.command_complete(&result.command_tag);
        stream.write_all(&self.builder.buffer).await?;
        
        // Send ReadyForQuery
        stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
        
        Ok(())
    }
}

Result Set Serialization Example

// src/wire_protocol/result_set.rs
pub struct ResultSet {
    pub columns: Vec<Column>,
    pub rows: Vec<Row>,
    pub command_tag: String,
}

pub struct Column {
    pub name: String,
    pub type_oid: u32,
    pub type_size: i16,
}

pub struct Row {
    pub values: Vec<Option<String>>,
}

impl ResultSet {
    pub fn send_to(&self, stream: &mut TcpStream, builder: &mut MessageBuilder) -> Result<(), io::Error> {
        // RowDescription
        let fields: Vec<FieldDescription> = self.columns.iter().map(|col| {
            FieldDescription {
                name: col.name.clone(),
                table_oid: 0,
                column_attr_num: 0,
                type_oid: col.type_oid,
                type_size: col.type_size,
                type_modifier: -1,
                format_code: 0,  // Text format
            }
        }).collect();
        
        let row_desc = RowDescription { fields };
        stream.write_all(row_desc.serialize(builder))?;
        
        // DataRows
        for row in &self.rows {
            let values: Vec<Option<Vec<u8>>> = row.values.iter()
                .map(|v| v.as_ref().map(|s| s.as_bytes().to_vec()))
                .collect();
            
            let data_row = DataRow {
                values,
                format_codes: vec![0; self.columns.len()],
            };
            stream.write_all(data_row.serialize(builder))?;
        }
        
        // CommandComplete
        builder.command_complete(&self.command_tag);
        stream.write_all(&builder.buffer)?;
        
        Ok(())
    }
}

// Usage example
let result = ResultSet {
    columns: vec![
        Column { name: "id".to_string(), type_oid: 23, type_size: 4 },
        Column { name: "name".to_string(), type_oid: 25, type_size: -1 },
    ],
    rows: vec![
        Row { values: vec![Some("1".to_string()), Some("Alice".to_string())] },
        Row { values: vec![Some("2".to_string()), Some("Bob".to_string())] },
    ],
    command_tag: "SELECT 2".to_string(),
};

result.send_to(&mut stream, &mut builder)?;

What psql receives:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ T (RowDescription)                                          โ”‚
โ”‚   2 columns: id (INT4), name (TEXT)                         โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ D (DataRow)                                                 โ”‚
โ”‚   id=1, name="Alice"                                        โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ D (DataRow)                                                 โ”‚
โ”‚   id=2, name="Bob"                                          โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ C (CommandComplete)                                         โ”‚
โ”‚   "SELECT 2"                                                โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ Z (ReadyForQuery)                                           โ”‚
โ”‚   Status: Idle                                              โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

6 PostgreSQL Type OIDs

Common Types

Type Name OID Size Description
BOOL 16 1 Boolean
INT2 (SMALLINT) 21 2 2-byte integer
INT4 (INTEGER) 23 4 4-byte integer
INT8 (BIGINT) 20 8 8-byte integer
TEXT 25 -1 Variable-length text
VARCHAR 1043 -1 Variable-length char
TIMESTAMP 1114 8 Timestamp without timezone
TIMESTAMPTZ 1184 8 Timestamp with timezone
FLOAT4 (REAL) 700 4 4-byte float
FLOAT8 (DOUBLE) 701 8 8-byte float
NUMERIC 1700 -1 Arbitrary precision
BYTEA 17 -1 Binary data
OID 26 4 Object identifier
// src/wire_protocol/oids.rs
pub mod oid {
    pub const BOOL: u32 = 16;
    pub const INT2: u32 = 21;
    pub const INT4: u32 = 23;
    pub const INT8: u32 = 20;
    pub const TEXT: u32 = 25;
    pub const VARCHAR: u32 = 1043;
    pub const TIMESTAMP: u32 = 1114;
    pub const TIMESTAMPTZ: u32 = 1184;
    pub const FLOAT4: u32 = 700;
    pub const FLOAT8: u32 = 701;
    pub const NUMERIC: u32 = 1700;
    pub const BYTEA: u32 = 17;
    pub const OID: u32 = 26;
}

7 Challenges Building in Rust

Challenge 1: Async I/O and Borrowing

Problem: tokio requires &mut self for async I/O, but we need to borrow from self.

// โŒ Doesn't compile
impl ProtocolHandler {
    pub async fn handle_query(&mut self) -> Result<(), Error> {
        let query = self.read_query().await?;  // Borrows self
        let result = self.executor.execute(&query).await?;  // Also borrows self!
        // Error: cannot borrow as mutable more than once
    }
}

Solution: Restructure to avoid concurrent borrows

// โœ… Works
impl ProtocolHandler {
    pub async fn handle_query(&mut self) -> Result<(), Error> {
        let query = self.read_query().await?;
        
        // Release borrow before next operation
        let result = {
            self.executor.execute(&query).await?
        };
        
        self.send_result(result).await?;
        Ok(())
    }
}

Challenge 2: Zero-Copy vs. Allocation

Problem: Wire protocol messages need to be serialized. Copying is expensive.

// โŒ Allocates on every message
pub fn serialize_row(&self) -> Vec<u8> {
    let mut buffer = Vec::new();
    buffer.push(b'D');
    // ... lots of allocations ...
    buffer
}

Solution: Reuse buffers

// โœ… Reuses allocated buffer
pub struct MessageBuilder {
    buffer: Vec<u8>,  // Pre-allocated, reused
}

impl MessageBuilder {
    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            buffer: Vec::with_capacity(capacity),
        }
    }
    
    pub fn data_row(&mut self, row: &Row) -> &[u8] {
        self.buffer.clear();  // Reuse capacity
        self.buffer.push(b'D');
        // ... write to buffer ...
        &self.buffer  // Return reference, not owned
    }
}

Challenge 3: Error Handling Across Layers

Problem: Wire protocol errors, query errors, storage errorsโ€”all different types.

// โŒ Error type explosion
pub enum Error {
    Io(io::Error),
    Protocol(ProtocolError),
    Query(QueryError),
    Storage(StorageError),
    Transaction(TransactionError),
    // ... 20 more variants ...
}

Solution: Use thiserror and conversion traits

// โœ… Clean error handling
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
    #[error("IO error: {0}")]
    Io(#[from] io::Error),
    
    #[error("Invalid message type: {0}")]
    UnknownMessage(u8),
    
    #[error("Query error: {0}")]
    Query(#[from] QueryError),
}

// Use ? operator for automatic conversion
pub async fn handle_query(&mut self) -> Result<(), ProtocolError> {
    let query = self.read_query().await?;  // io::Error โ†’ ProtocolError
    let result = self.executor.execute(&query).await?;  // QueryError โ†’ ProtocolError
    Ok(())
}

8 How AI Accelerated This

What AI Got Right

Task AI Contribution
Message format Correct big-endian encoding
Extended query flow Parse โ†’ Bind โ†’ Execute sequence
Type OIDs Accurate PostgreSQL type OIDs
Null handling -1 length prefix for NULLs

What AI Got Wrong

Issue What Happened
Length calculation First draft didnโ€™t include length bytes in length
Startup message Tried to add type byte (startup has none!)
Binary format Suggested little-endian (PostgreSQL uses big-endian)
Portal lifetime Missed that portals are destroyed after Execute

Pattern: Wire protocol is precise. Off-by-one errors break everything.


Example: Debugging psql Connection

My question to AI:

โ€œpsql connects but immediately disconnects. Whatโ€™s wrong?โ€

What I learned:

  1. psql expects specific ParameterStatus messages
  2. Missing server_version causes silent disconnect
  3. ReadyForQuery must be sent after authentication

Result: Added required parameters:

stream.write_all(self.builder.parameter_status("server_version", "16.0")).await?;
stream.write_all(self.builder.parameter_status("server_encoding", "UTF8")).await?;
stream.write_all(self.builder.parameter_status("client_encoding", "UTF8")).await?;
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;

Now psql connects successfully!

$ psql -h localhost -p 5432 -U neo vaultgres
psql (16.0, server 16.0 (Vaultgres))
Type "help" for help.

vaultgres=> SELECT 1;
 ?column? 
----------
        1
(1 row)

Summary: Wire Protocol in One Diagram

flowchart TD subgraph "Connection Startup" A[Client connects] --> B[StartupMessage] B --> C[AuthenticationOk] C --> D[ParameterStatus] D --> E[ReadyForQuery] end subgraph "Simple Query" F[Query 'Q'] --> G[RowDescription 'T'] G --> H[DataRow 'D' ร— N] H --> I[CommandComplete 'C'] I --> E end subgraph "Extended Query" J[Parse 'P'] --> K[ParseComplete '1'] K --> L[Bind 'B'] L --> M[BindComplete '2'] M --> N[Execute 'E'] N --> H O[Sync 'S'] --> I end subgraph "Message Format" P[Type 1B] --> Q[Length 4B BE] Q --> R[Payload] end subgraph "Result Serialization" S[Row: id=1, name='Alice'] --> T[DataRow: 'D' + len + values] end style B fill:#e3f2fd,stroke:#1976d2 style F fill:#e8f5e9,stroke:#388e3c style J fill:#e8f5e9,stroke:#388e3c style P fill:#fff3e0,stroke:#f57c00

Key Takeaways:

Concept Why It Matters
Wire protocol Compatibility with existing PostgreSQL tools
Message framing Length-prefixed binary protocol
Simple vs. Extended Quick queries vs. prepared statements
RowDescription Column metadata for clients
DataRow Actual row data (text or binary)
Type OIDs PostgreSQL type identification
Null encoding -1 length prefix

Further Reading:

Share