- 1 The Wire Protocol Overview
- 2 Connection Startup
- 3 Simple Query Protocol
- 4 Extended Query Protocol
- 5 Complete Query Execution Flow
- 6 PostgreSQL Type OIDs
- 7 Challenges Building in Rust
- 8 How AI Accelerated This
- Summary: Wire Protocol in One Diagram
In Part 4, we built WAL and crash recovery. Our database can now survive power failures. But thereโs a problem.
How do clients actually talk to our database?
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ psql โ โ Vaultgres โ
โ client โ โ server โ
โ โ ??? How to talk ??? โ โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
We could invent our own protocol. But then weโd need to build a client from scratch.
Better approach: Speak PostgreSQLโs wire protocol. Then psql, JDBC, libpqโall existing toolsโjust work.
Today: implementing the PostgreSQL wire protocol in Rust, from startup handshake to result set serialization.
1 The Wire Protocol Overview
Frontend/Backend Model
PostgreSQL uses a frontend/backend architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PostgreSQL Protocol โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Frontend (Client) Backend (Server) โ
โ - psql - Vaultgres โ
โ - libpq (C driver) - Query processor โ
โ - JDBC/ODBC - Storage engine โ
โ - psycopg (Python) - Transaction manager โ
โ โ
โ Communication: TCP/IP (usually port 5432) โ
โ Message format: Length-prefixed binary protocol โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Message Structure
Every message has the same format:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Message Format โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Type (1B) โ Length (4B, includes itself) โ โ
โ โโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โ
โ โ Payload (variable) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Example: SimpleQuery ('Q')
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 'Q' โ 0x00 0x00 0x00 0x1A โ "SELECT * FROM users\0" โ
โ 1B โ 4B (26 bytes) โ variable (null-terminated) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key insight: Length is big-endian (network byte order) and includes itself (not the type byte).
Message Types
| Type | Code | Direction | Purpose |
|---|---|---|---|
| StartupMessage | (none) | FโB | Initial connection (no type byte) |
| AuthenticationOk | โRโ | BโF | Login successful |
| Query | โQโ | FโB | Simple query (SQL string) |
| RowDescription | โTโ | BโF | Column metadata |
| DataRow | โDโ | BโF | Actual row data |
| CommandComplete | โCโ | BโF | Query finished |
| ReadyForQuery | โZโ | BโF | Server ready for next query |
| ErrorResponse | โEโ | BโF | Something went wrong |
| Parse | โPโ | FโB | Extended query: prepare |
| Bind | โBโ | FโB | Extended query: bind parameters |
| Execute | โEโ | FโB | Extended query: run |
| Sync | โSโ | FโB | Extended query: finish batch |
FโB = Frontend to Backend, BโF = Backend to Frontend
2 Connection Startup
The Handshake Flow
StartupMessage
The first message is specialโno type byte, just length:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ StartupMessage โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Length (4B): 8 + parameters โ
โ Protocol Version (4B): 196608 (3.0) โ
โ Parameters (null-terminated key=value pairs): โ
โ "user\0neo\0database\0vaultgres\0\0" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
// src/wire_protocol/startup.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
pub struct StartupMessage {
pub user: String,
pub database: String,
pub options: HashMap<String, String>,
}
impl StartupMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// Read length (4 bytes, big-endian)
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf);
// Read protocol version
let mut version_buf = [0u8; 4];
stream.read_exact(&mut version_buf).await?;
let version = u32::from_be_bytes(version_buf);
if version != 196608 {
return Err(ProtocolError::UnsupportedVersion(version));
}
// Read parameters (null-terminated key=value pairs)
let mut params = HashMap::new();
let mut remaining = len - 8; // Subtract length and version bytes
while remaining > 1 {
let mut key = Vec::new();
let mut byte = [0u8; 1];
loop {
stream.read_exact(&mut byte).await?;
remaining -= 1;
if byte[0] == 0 { break; }
key.push(byte[0]);
}
if key.is_empty() { break; } // Empty key = end of parameters
let mut value = Vec::new();
loop {
stream.read_exact(&mut byte).await?;
remaining -= 1;
if byte[0] == 0 { break; }
value.push(byte[0]);
}
let key = String::from_utf8(key)?;
let value = String::from_utf8(value)?;
params.insert(key, value);
}
Ok(Self {
user: params.remove("user").unwrap_or_default(),
database: params.remove("database").unwrap_or_default(),
options: params,
})
}
}
Authentication and ParameterStatus
// src/wire_protocol/messages.rs
pub struct MessageBuilder {
buffer: Vec<u8>,
}
impl MessageBuilder {
pub fn new() -> Self {
Self { buffer: Vec::new() }
}
pub fn authentication_ok(&mut self) -> &[u8] {
// 'R' (1B) + Length (4B) + Auth Type (4B = 0 for Ok)
self.buffer.clear();
self.buffer.push(b'R');
self.buffer.extend_from_slice(&12u32.to_be_bytes()); // Length
self.buffer.extend_from_slice(&0u32.to_be_bytes()); // AuthOk
&self.buffer
}
pub fn parameter_status(&mut self, name: &str, value: &str) -> &[u8] {
// 'S' (1B) + Length (4B) + name\0 + value\0
self.buffer.clear();
self.buffer.push(b'S');
let payload_len = 4 + name.len() + 1 + value.len() + 1;
self.buffer.extend_from_slice(&(payload_len as u32).to_be_bytes());
self.buffer.extend_from_slice(name.as_bytes());
self.buffer.push(0);
self.buffer.extend_from_slice(value.as_bytes());
self.buffer.push(0);
&self.buffer
}
pub fn ready_for_query(&mut self, status: TransactionStatus) -> &[u8] {
// 'Z' (1B) + Length (4B) + Status (1B)
self.buffer.clear();
self.buffer.push(b'Z');
self.buffer.extend_from_slice(&5u32.to_be_bytes());
self.buffer.push(status as u8);
&self.buffer
}
}
#[derive(Debug, Clone, Copy)]
#[repr(u8)]
pub enum TransactionStatus {
Idle = b'I',
InTransaction = b'T',
InFailedTransaction = b'E',
}
Server sends these parameters:
| Parameter | Value | Purpose |
|---|---|---|
server_version |
16.0 |
PostgreSQL version weโre emulating |
server_encoding |
UTF8 |
Character encoding |
client_encoding |
UTF8 |
Clientโs encoding |
integer_datetimes |
on |
64-bit integer timestamps |
3 Simple Query Protocol
Query Flow
Client: Query("SELECT id, name FROM users WHERE id = 1")
Server: RowDescription (column metadata)
Server: DataRow (row 1)
Server: DataRow (row 2)
...
Server: CommandComplete ("SELECT 2")
Server: ReadyForQuery ('I')
RowDescription: Telling Clients About Columns
// src/wire_protocol/row_description.rs
pub struct FieldDescription {
pub name: String,
pub table_oid: u32,
pub column_attr_num: i16,
pub type_oid: u32,
pub type_size: i16,
pub type_modifier: i32,
pub format_code: i16, // 0 = text, 1 = binary
}
pub struct RowDescription {
pub fields: Vec<FieldDescription>,
}
impl RowDescription {
pub fn serialize(&self, builder: &mut MessageBuilder) -> &[u8] {
// 'T' (1B) + Length (4B) + Num Fields (2B) + Fields...
builder.buffer.clear();
builder.buffer.push(b'T');
// Calculate payload length
let payload_len = 2 + (self.fields.len() * 19) +
self.fields.iter().map(|f| f.name.len() + 1).sum::<usize>();
builder.buffer.extend_from_slice(&(payload_len as u32).to_be_bytes());
builder.buffer.extend_from_slice(&(self.fields.len() as i16).to_be_bytes());
for field in &self.fields {
builder.buffer.extend_from_slice(field.name.as_bytes());
builder.buffer.push(0); // Null terminator
builder.buffer.extend_from_slice(&field.table_oid.to_be_bytes());
builder.buffer.extend_from_slice(&field.column_attr_num.to_be_bytes());
builder.buffer.extend_from_slice(&field.type_oid.to_be_bytes());
builder.buffer.extend_from_slice(&field.type_size.to_be_bytes());
builder.buffer.extend_from_slice(&field.type_modifier.to_be_bytes());
builder.buffer.extend_from_slice(&field.format_code.to_be_bytes());
}
&builder.buffer
}
}
Example output:
SELECT id, name FROM users
RowDescription:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 'T' โ Length โ 2 fields โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Field 1: "id" โ
โ table_oid: 16384 โ
โ column_attr_num: 1 โ
โ type_oid: 23 (INT4) โ
โ type_size: 4 โ
โ type_modifier: -1 โ
โ format_code: 0 (text) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Field 2: "name" โ
โ table_oid: 16384 โ
โ column_attr_num: 2 โ
โ type_oid: 25 (TEXT) โ
โ type_size: -1 (variable) โ
โ type_modifier: -1 โ
โ format_code: 0 (text) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
DataRow: Serializing Actual Rows
// src/wire_protocol/data_row.rs
pub struct DataRow {
pub values: Vec<Option<Vec<u8>>>, // None = NULL
pub format_codes: Vec<i16>,
}
impl DataRow {
pub fn serialize(&self, builder: &mut MessageBuilder) -> &[u8] {
// 'D' (1B) + Length (4B) + Num Values (2B) + Values...
builder.buffer.clear();
builder.buffer.push(b'D');
// Calculate payload length
let mut payload_len = 2u32; // Num values
for value in &self.values {
payload_len += 4; // Length prefix
if let Some(data) = value {
payload_len += data.len() as u32;
}
}
builder.buffer.extend_from_slice(&payload_len.to_be_bytes());
builder.buffer.extend_from_slice(&(self.values.len() as i16).to_be_bytes());
for value in &self.values {
match value {
None => {
// NULL: length = -1
builder.buffer.extend_from_slice(&(-1i32).to_be_bytes());
}
Some(data) => {
// Non-NULL: length + data
builder.buffer.extend_from_slice(&(data.len() as i32).to_be_bytes());
builder.buffer.extend_from_slice(data);
}
}
}
&builder.buffer
}
}
Example:
Row: id=1, name="Alice", email=NULL
DataRow:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 'D' โ Length โ 3 values โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Value 1: 4 bytes โ "1" โ
โ Value 2: 5 bytes โ "Alice" โ
โ Value 3: -1 (NULL) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Text vs. Binary Format
Text format (format_code = 0): Human-readable strings
INT4: "42"
TEXT: "Alice"
TIMESTAMP: "2026-03-29 14:30:00.123456+00"
Binary format (format_code = 1): Native representation
// src/wire_protocol/type_encoding.rs
pub fn encode_int4(value: i32, format: i16) -> Vec<u8> {
match format {
0 => value.to_string().into_bytes(), // Text
1 => value.to_be_bytes().to_vec(), // Binary
_ => panic!("Invalid format code"),
}
}
pub fn encode_text(value: &str, format: i16) -> Vec<u8> {
match format {
0 => value.as_bytes().to_vec(), // Text (UTF-8)
1 => {
// Binary: 4-byte length prefix + data
let mut buf = Vec::new();
buf.extend_from_slice(&(value.len() as i32).to_be_bytes());
buf.extend_from_slice(value.as_bytes());
buf
}
_ => panic!("Invalid format code"),
}
}
pub fn encode_timestamp(value: chrono::DateTime<chrono::Utc>, format: i16) -> Vec<u8> {
match format {
0 => value.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string().into_bytes(),
1 => {
// PostgreSQL epoch: 2000-01-01 00:00:00 UTC
let epoch = chrono::DateTime::from_timestamp(946684800, 0).unwrap();
let micros = value.signed_duration_since(epoch).num_microseconds().unwrap();
micros.to_be_bytes().to_vec()
}
_ => panic!("Invalid format code"),
}
}
4 Extended Query Protocol
Why Extended Query?
Simple Query: SQL injection risk, no prepared statements
Client: Query("SELECT * FROM users WHERE id = " + user_input)
โ SQL injection vulnerability!
Extended Query: Prepared statements, parameter binding
Client: Parse("SELECT * FROM users WHERE id = $1")
Client: Bind([42])
Client: Execute()
โ Safe from SQL injection!
Extended Query Flow
Parse: Preparing a Statement
// src/wire_protocol/parse.rs
pub struct ParseMessage {
pub statement_name: String,
pub query: String,
pub parameter_types: Vec<u32>, // OID for each parameter
}
impl ParseMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// statement_name (null-terminated)
let statement_name = read_null_terminated(stream).await?;
// query (null-terminated)
let query = read_null_terminated(stream).await?;
// num_parameter_types (2B)
let mut num_types_buf = [0u8; 2];
stream.read_exact(&mut num_types_buf).await?;
let num_types = i16::from_be_bytes(num_types_buf);
// parameter_types (4B each)
let mut parameter_types = Vec::new();
for _ in 0..num_types {
let mut type_buf = [0u8; 4];
stream.read_exact(&mut type_buf).await?;
parameter_types.push(u32::from_be_bytes(type_buf));
}
Ok(Self {
statement_name,
query,
parameter_types,
})
}
}
// Server response
pub fn parse_complete(builder: &mut MessageBuilder) -> &[u8] {
// '1' (1B) + Length (4B = 4)
builder.buffer.clear();
builder.buffer.push(b'1');
builder.buffer.extend_from_slice(&4u32.to_be_bytes());
&builder.buffer
}
Bind: Creating a Portal
// src/wire_protocol/bind.rs
pub struct BindMessage {
pub portal_name: String,
pub statement_name: String,
pub parameter_format_codes: Vec<i16>,
pub parameter_values: Vec<Option<Vec<u8>>>,
pub result_format_codes: Vec<i16>,
}
impl BindMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// portal_name (null-terminated)
let portal_name = read_null_terminated(stream).await?;
// statement_name (null-terminated)
let statement_name = read_null_terminated(stream).await?;
// num_parameter_format_codes (2B)
let num_formats = read_i16(stream).await?;
// parameter_format_codes
let mut parameter_format_codes = Vec::new();
for _ in 0..num_formats {
parameter_format_codes.push(read_i16(stream).await?);
}
// num_parameter_values (2B)
let num_values = read_i16(stream).await?;
// parameter_values
let mut parameter_values = Vec::new();
for _ in 0..num_values {
let len = read_i32(stream).await?;
if len == -1 {
parameter_values.push(None); // NULL
} else {
let mut data = vec![0u8; len as usize];
stream.read_exact(&mut data).await?;
parameter_values.push(Some(data));
}
}
// num_result_format_codes (2B)
let num_result_formats = read_i16(stream).await?;
// result_format_codes
let mut result_format_codes = Vec::new();
for _ in 0..num_result_formats {
result_format_codes.push(read_i16(stream).await?);
}
Ok(Self {
portal_name,
statement_name,
parameter_format_codes,
parameter_values,
result_format_codes,
})
}
}
// Server response
pub fn bind_complete(builder: &mut MessageBuilder) -> &[u8] {
// '2' (1B) + Length (4B = 4)
builder.buffer.clear();
builder.buffer.push(b'2');
builder.buffer.extend_from_slice(&4u32.to_be_bytes());
&builder.buffer
}
Execute: Running the Prepared Statement
// src/wire_protocol/execute.rs
pub struct ExecuteMessage {
pub portal_name: String,
pub max_rows: i32, // 0 = all rows
}
impl ExecuteMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
let portal_name = read_null_terminated(stream).await?;
let max_rows = read_i32(stream).await?;
Ok(Self { portal_name, max_rows })
}
}
Server response: DataRow messages (no specific โExecuteCompleteโ message)
Sync: Finishing the Batch
// src/wire_protocol/sync.rs
pub struct SyncMessage;
impl SyncMessage {
pub async fn read_from(_stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// Sync has no body, just the message header
Ok(SyncMessage)
}
}
// Server response
pub fn sync_complete(builder: &mut MessageBuilder, status: TransactionStatus) -> &[u8] {
// CommandComplete + ReadyForQuery
builder.buffer.clear();
// CommandComplete: 'C' + Length + "SELECT 2\0"
builder.buffer.push(b'C');
let cmd = b"SELECT 2";
builder.buffer.extend_from_slice(&((cmd.len() + 1) as u32).to_be_bytes());
builder.buffer.extend_from_slice(cmd);
builder.buffer.push(0);
&builder.buffer
}
5 Complete Query Execution Flow
Putting It All Together
// src/wire_protocol/handler.rs
use tokio::net::TcpStream;
use crate::query_executor::QueryExecutor;
use crate::storage::buffer_pool::BufferPool;
pub struct ProtocolHandler {
stream: TcpStream,
executor: QueryExecutor,
builder: MessageBuilder,
prepared_statements: HashMap<String, PreparedStatement>,
portals: HashMap<String, Portal>,
}
impl ProtocolHandler {
pub async fn handle_connection(mut stream: TcpStream) -> Result<(), ProtocolError> {
// 1. Read startup message
let startup = StartupMessage::read_from(&mut stream).await?;
// 2. Send authentication
stream.write_all(self.builder.authentication_ok()).await?;
// 3. Send parameter status
stream.write_all(self.builder.parameter_status("server_version", "16.0")).await?;
stream.write_all(self.builder.parameter_status("server_encoding", "UTF8")).await?;
stream.write_all(self.builder.parameter_status("client_encoding", "UTF8")).await?;
// 4. Send ready for query
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
// 5. Main message loop
loop {
let mut type_buf = [0u8; 1];
stream.read_exact(&mut type_buf).await?;
match type_buf[0] as char {
'Q' => self.handle_simple_query(&mut stream).await?,
'P' => self.handle_parse(&mut stream).await?,
'B' => self.handle_bind(&mut stream).await?,
'E' => self.handle_execute(&mut stream).await?,
'S' => self.handle_sync(&mut stream).await?,
'X' => {
// Terminate
return Ok(());
}
_ => return Err(ProtocolError::UnknownMessage(type_buf[0])),
}
}
}
async fn handle_simple_query(&mut self, stream: &mut TcpStream) -> Result<(), ProtocolError> {
// Read query string
let query = read_null_terminated(stream).await?;
// Execute query
let result = self.executor.execute(&query).await?;
// Send RowDescription (if SELECT)
if let Some(columns) = result.columns {
let row_desc = self.create_row_description(&columns);
stream.write_all(row_desc.serialize(&mut self.builder)).await?;
// Send DataRows
for row in result.rows {
let data_row = self.create_data_row(&row);
stream.write_all(data_row.serialize(&mut self.builder)).await?;
}
}
// Send CommandComplete
self.builder.command_complete(&result.command_tag);
stream.write_all(&self.builder.buffer).await?;
// Send ReadyForQuery
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
Ok(())
}
}
Result Set Serialization Example
// src/wire_protocol/result_set.rs
pub struct ResultSet {
pub columns: Vec<Column>,
pub rows: Vec<Row>,
pub command_tag: String,
}
pub struct Column {
pub name: String,
pub type_oid: u32,
pub type_size: i16,
}
pub struct Row {
pub values: Vec<Option<String>>,
}
impl ResultSet {
pub fn send_to(&self, stream: &mut TcpStream, builder: &mut MessageBuilder) -> Result<(), io::Error> {
// RowDescription
let fields: Vec<FieldDescription> = self.columns.iter().map(|col| {
FieldDescription {
name: col.name.clone(),
table_oid: 0,
column_attr_num: 0,
type_oid: col.type_oid,
type_size: col.type_size,
type_modifier: -1,
format_code: 0, // Text format
}
}).collect();
let row_desc = RowDescription { fields };
stream.write_all(row_desc.serialize(builder))?;
// DataRows
for row in &self.rows {
let values: Vec<Option<Vec<u8>>> = row.values.iter()
.map(|v| v.as_ref().map(|s| s.as_bytes().to_vec()))
.collect();
let data_row = DataRow {
values,
format_codes: vec![0; self.columns.len()],
};
stream.write_all(data_row.serialize(builder))?;
}
// CommandComplete
builder.command_complete(&self.command_tag);
stream.write_all(&builder.buffer)?;
Ok(())
}
}
// Usage example
let result = ResultSet {
columns: vec![
Column { name: "id".to_string(), type_oid: 23, type_size: 4 },
Column { name: "name".to_string(), type_oid: 25, type_size: -1 },
],
rows: vec![
Row { values: vec![Some("1".to_string()), Some("Alice".to_string())] },
Row { values: vec![Some("2".to_string()), Some("Bob".to_string())] },
],
command_tag: "SELECT 2".to_string(),
};
result.send_to(&mut stream, &mut builder)?;
What psql receives:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ T (RowDescription) โ
โ 2 columns: id (INT4), name (TEXT) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ D (DataRow) โ
โ id=1, name="Alice" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ D (DataRow) โ
โ id=2, name="Bob" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ C (CommandComplete) โ
โ "SELECT 2" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Z (ReadyForQuery) โ
โ Status: Idle โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
6 PostgreSQL Type OIDs
Common Types
| Type Name | OID | Size | Description |
|---|---|---|---|
BOOL |
16 | 1 | Boolean |
INT2 (SMALLINT) |
21 | 2 | 2-byte integer |
INT4 (INTEGER) |
23 | 4 | 4-byte integer |
INT8 (BIGINT) |
20 | 8 | 8-byte integer |
TEXT |
25 | -1 | Variable-length text |
VARCHAR |
1043 | -1 | Variable-length char |
TIMESTAMP |
1114 | 8 | Timestamp without timezone |
TIMESTAMPTZ |
1184 | 8 | Timestamp with timezone |
FLOAT4 (REAL) |
700 | 4 | 4-byte float |
FLOAT8 (DOUBLE) |
701 | 8 | 8-byte float |
NUMERIC |
1700 | -1 | Arbitrary precision |
BYTEA |
17 | -1 | Binary data |
OID |
26 | 4 | Object identifier |
// src/wire_protocol/oids.rs
pub mod oid {
pub const BOOL: u32 = 16;
pub const INT2: u32 = 21;
pub const INT4: u32 = 23;
pub const INT8: u32 = 20;
pub const TEXT: u32 = 25;
pub const VARCHAR: u32 = 1043;
pub const TIMESTAMP: u32 = 1114;
pub const TIMESTAMPTZ: u32 = 1184;
pub const FLOAT4: u32 = 700;
pub const FLOAT8: u32 = 701;
pub const NUMERIC: u32 = 1700;
pub const BYTEA: u32 = 17;
pub const OID: u32 = 26;
}
7 Challenges Building in Rust
Challenge 1: Async I/O and Borrowing
Problem: tokio requires &mut self for async I/O, but we need to borrow from self.
// โ Doesn't compile
impl ProtocolHandler {
pub async fn handle_query(&mut self) -> Result<(), Error> {
let query = self.read_query().await?; // Borrows self
let result = self.executor.execute(&query).await?; // Also borrows self!
// Error: cannot borrow as mutable more than once
}
}
Solution: Restructure to avoid concurrent borrows
// โ
Works
impl ProtocolHandler {
pub async fn handle_query(&mut self) -> Result<(), Error> {
let query = self.read_query().await?;
// Release borrow before next operation
let result = {
self.executor.execute(&query).await?
};
self.send_result(result).await?;
Ok(())
}
}
Challenge 2: Zero-Copy vs. Allocation
Problem: Wire protocol messages need to be serialized. Copying is expensive.
// โ Allocates on every message
pub fn serialize_row(&self) -> Vec<u8> {
let mut buffer = Vec::new();
buffer.push(b'D');
// ... lots of allocations ...
buffer
}
Solution: Reuse buffers
// โ
Reuses allocated buffer
pub struct MessageBuilder {
buffer: Vec<u8>, // Pre-allocated, reused
}
impl MessageBuilder {
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
}
}
pub fn data_row(&mut self, row: &Row) -> &[u8] {
self.buffer.clear(); // Reuse capacity
self.buffer.push(b'D');
// ... write to buffer ...
&self.buffer // Return reference, not owned
}
}
Challenge 3: Error Handling Across Layers
Problem: Wire protocol errors, query errors, storage errorsโall different types.
// โ Error type explosion
pub enum Error {
Io(io::Error),
Protocol(ProtocolError),
Query(QueryError),
Storage(StorageError),
Transaction(TransactionError),
// ... 20 more variants ...
}
Solution: Use thiserror and conversion traits
// โ
Clean error handling
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Invalid message type: {0}")]
UnknownMessage(u8),
#[error("Query error: {0}")]
Query(#[from] QueryError),
}
// Use ? operator for automatic conversion
pub async fn handle_query(&mut self) -> Result<(), ProtocolError> {
let query = self.read_query().await?; // io::Error โ ProtocolError
let result = self.executor.execute(&query).await?; // QueryError โ ProtocolError
Ok(())
}
8 How AI Accelerated This
What AI Got Right
| Task | AI Contribution |
|---|---|
| Message format | Correct big-endian encoding |
| Extended query flow | Parse โ Bind โ Execute sequence |
| Type OIDs | Accurate PostgreSQL type OIDs |
| Null handling | -1 length prefix for NULLs |
What AI Got Wrong
| Issue | What Happened |
|---|---|
| Length calculation | First draft didnโt include length bytes in length |
| Startup message | Tried to add type byte (startup has none!) |
| Binary format | Suggested little-endian (PostgreSQL uses big-endian) |
| Portal lifetime | Missed that portals are destroyed after Execute |
Pattern: Wire protocol is precise. Off-by-one errors break everything.
Example: Debugging psql Connection
My question to AI:
โpsql connects but immediately disconnects. Whatโs wrong?โ
What I learned:
- psql expects specific ParameterStatus messages
- Missing
server_versioncauses silent disconnect - ReadyForQuery must be sent after authentication
Result: Added required parameters:
stream.write_all(self.builder.parameter_status("server_version", "16.0")).await?;
stream.write_all(self.builder.parameter_status("server_encoding", "UTF8")).await?;
stream.write_all(self.builder.parameter_status("client_encoding", "UTF8")).await?;
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
Now psql connects successfully!
$ psql -h localhost -p 5432 -U neo vaultgres
psql (16.0, server 16.0 (Vaultgres))
Type "help" for help.
vaultgres=> SELECT 1;
?column?
----------
1
(1 row)
Summary: Wire Protocol in One Diagram
Key Takeaways:
| Concept | Why It Matters |
|---|---|
| Wire protocol | Compatibility with existing PostgreSQL tools |
| Message framing | Length-prefixed binary protocol |
| Simple vs. Extended | Quick queries vs. prepared statements |
| RowDescription | Column metadata for clients |
| DataRow | Actual row data (text or binary) |
| Type OIDs | PostgreSQL type identification |
| Null encoding | -1 length prefix |
Further Reading:
- PostgreSQL Source:
src/backend/tcop/postgres.c - PostgreSQL Source:
src/include/libpq/pqformat.h - โPostgreSQL Wire Protocolโ documentation: https://www.postgresql.org/docs/current/protocol.html
- libpq source:
src/interfaces/libpq/ - Vaultgres Repository: github.com/neoalienson/Vaultgres