In Part 4, we built WAL and crash recovery. Our database can now survive power failures. But thereโs a problem.
How do clients actually talk to our database?
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
โ psql โ โ Vaultgres โ
โ client โ โ server โ
โ โ ??? How to talk ??? โ โ
โโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโ
We could invent our own protocol. But then weโd need to build a client from scratch.
Better approach: Speak PostgreSQLโs wire protocol. Then psql, JDBC, libpqโall existing toolsโjust work.
Today: implementing the PostgreSQL wire protocol in Rust, from startup handshake to result set serialization.
1 The Wire Protocol Overview
Frontend/Backend Model
PostgreSQL uses a frontend/backend architecture:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ PostgreSQL Protocol โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โ
โ Frontend (Client) Backend (Server) โ
โ - psql - Vaultgres โ
โ - libpq (C driver) - Query processor โ
โ - JDBC/ODBC - Storage engine โ
โ - psycopg (Python) - Transaction manager โ
โ โ
โ Communication: TCP/IP (usually port 5432) โ
โ Message format: Length-prefixed binary protocol โ
โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Message Structure
Every message has the same format:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ Message Format โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ โโโโโโโโโโโโโโโฌโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โ โ Type (1B) โ Length (4B, includes itself) โ โ
โ โโโโโโโโโโโโโโโดโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค โ
โ โ Payload (variable) โ โ
โ โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Example: SimpleQuery ('Q')
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 'Q' โ 0x00 0x00 0x00 0x1A โ "SELECT * FROM users\0" โ
โ 1B โ 4B (26 bytes) โ variable (null-terminated) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Key insight: Length is big-endian (network byte order) and includes itself (not the type byte).
Message Types
| Type | Code | Direction | Purpose |
|---|---|---|---|
| StartupMessage | (none) | FโB | Initial connection (no type byte) |
| AuthenticationOk | โRโ | BโF | Login successful |
| Query | โQโ | FโB | Simple query (SQL string) |
| RowDescription | โTโ | BโF | Column metadata |
| DataRow | โDโ | BโF | Actual row data |
| CommandComplete | โCโ | BโF | Query finished |
| ReadyForQuery | โZโ | BโF | Server ready for next query |
| ErrorResponse | โEโ | BโF | Something went wrong |
| Parse | โPโ | FโB | Extended query: prepare |
| Bind | โBโ | FโB | Extended query: bind parameters |
| Execute | โEโ | FโB | Extended query: run |
| Sync | โSโ | FโB | Extended query: finish batch |
FโB = Frontend to Backend, BโF = Backend to Frontend
2 Connection Startup
The Handshake Flow
StartupMessage
The first message is specialโno type byte, just length:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ StartupMessage โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Length (4B): 8 + parameters โ
โ Protocol Version (4B): 196608 (3.0) โ
โ Parameters (null-terminated key=value pairs): โ
โ "user\0neo\0database\0vaultgres\0\0" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
// src/wire_protocol/startup.rs
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
pub struct StartupMessage {
pub user: String,
pub database: String,
pub options: HashMap<String, String>,
}
impl StartupMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// Read length (4 bytes, big-endian)
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).await?;
let len = u32::from_be_bytes(len_buf);
// Read protocol version
let mut version_buf = [0u8; 4];
stream.read_exact(&mut version_buf).await?;
let version = u32::from_be_bytes(version_buf);
if version != 196608 {
return Err(ProtocolError::UnsupportedVersion(version));
}
// Read parameters (null-terminated key=value pairs)
let mut params = HashMap::new();
let mut remaining = len - 8; // Subtract length and version bytes
while remaining > 1 {
let mut key = Vec::new();
let mut byte = [0u8; 1];
loop {
stream.read_exact(&mut byte).await?;
remaining -= 1;
if byte[0] == 0 { break; }
key.push(byte[0]);
}
if key.is_empty() { break; } // Empty key = end of parameters
let mut value = Vec::new();
loop {
stream.read_exact(&mut byte).await?;
remaining -= 1;
if byte[0] == 0 { break; }
value.push(byte[0]);
}
let key = String::from_utf8(key)?;
let value = String::from_utf8(value)?;
params.insert(key, value);
}
Ok(Self {
user: params.remove("user").unwrap_or_default(),
database: params.remove("database").unwrap_or_default(),
options: params,
})
}
}
Authentication and ParameterStatus
// src/wire_protocol/messages.rs
pub struct MessageBuilder {
buffer: Vec<u8>,
}
impl MessageBuilder {
pub fn new() -> Self {
Self { buffer: Vec::new() }
}
pub fn authentication_ok(&mut self) -> &[u8] {
// 'R' (1B) + Length (4B) + Auth Type (4B = 0 for Ok)
self.buffer.clear();
self.buffer.push(b'R');
self.buffer.extend_from_slice(&12u32.to_be_bytes()); // Length
self.buffer.extend_from_slice(&0u32.to_be_bytes()); // AuthOk
&self.buffer
}
pub fn parameter_status(&mut self, name: &str, value: &str) -> &[u8] {
// 'S' (1B) + Length (4B) + name\0 + value\0
self.buffer.clear();
self.buffer.push(b'S');
let payload_len = 4 + name.len() + 1 + value.len() + 1;
self.buffer.extend_from_slice(&(payload_len as u32).to_be_bytes());
self.buffer.extend_from_slice(name.as_bytes());
self.buffer.push(0);
self.buffer.extend_from_slice(value.as_bytes());
self.buffer.push(0);
&self.buffer
}
pub fn ready_for_query(&mut self, status: TransactionStatus) -> &[u8] {
// 'Z' (1B) + Length (4B) + Status (1B)
self.buffer.clear();
self.buffer.push(b'Z');
self.buffer.extend_from_slice(&5u32.to_be_bytes());
self.buffer.push(status as u8);
&self.buffer
}
}
#[derive(Debug, Clone, Copy)]
#[repr(u8)]
pub enum TransactionStatus {
Idle = b'I',
InTransaction = b'T',
InFailedTransaction = b'E',
}
Server sends these parameters:
| Parameter | Value | Purpose |
|---|---|---|
server_version | 16.0 | PostgreSQL version weโre emulating |
server_encoding | UTF8 | Character encoding |
client_encoding | UTF8 | Clientโs encoding |
integer_datetimes | on | 64-bit integer timestamps |
3 Simple Query Protocol
Query Flow
Client: Query("SELECT id, name FROM users WHERE id = 1")
Server: RowDescription (column metadata)
Server: DataRow (row 1)
Server: DataRow (row 2)
...
Server: CommandComplete ("SELECT 2")
Server: ReadyForQuery ('I')
RowDescription: Telling Clients About Columns
// src/wire_protocol/row_description.rs
pub struct FieldDescription {
pub name: String,
pub table_oid: u32,
pub column_attr_num: i16,
pub type_oid: u32,
pub type_size: i16,
pub type_modifier: i32,
pub format_code: i16, // 0 = text, 1 = binary
}
pub struct RowDescription {
pub fields: Vec<FieldDescription>,
}
impl RowDescription {
pub fn serialize(&self, builder: &mut MessageBuilder) -> &[u8] {
// 'T' (1B) + Length (4B) + Num Fields (2B) + Fields...
builder.buffer.clear();
builder.buffer.push(b'T');
// Calculate payload length
let payload_len = 2 + (self.fields.len() * 19) +
self.fields.iter().map(|f| f.name.len() + 1).sum::<usize>();
builder.buffer.extend_from_slice(&(payload_len as u32).to_be_bytes());
builder.buffer.extend_from_slice(&(self.fields.len() as i16).to_be_bytes());
for field in &self.fields {
builder.buffer.extend_from_slice(field.name.as_bytes());
builder.buffer.push(0); // Null terminator
builder.buffer.extend_from_slice(&field.table_oid.to_be_bytes());
builder.buffer.extend_from_slice(&field.column_attr_num.to_be_bytes());
builder.buffer.extend_from_slice(&field.type_oid.to_be_bytes());
builder.buffer.extend_from_slice(&field.type_size.to_be_bytes());
builder.buffer.extend_from_slice(&field.type_modifier.to_be_bytes());
builder.buffer.extend_from_slice(&field.format_code.to_be_bytes());
}
&builder.buffer
}
}
Example output:
SELECT id, name FROM users
RowDescription:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 'T' โ Length โ 2 fields โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Field 1: "id" โ
โ table_oid: 16384 โ
โ column_attr_num: 1 โ
โ type_oid: 23 (INT4) โ
โ type_size: 4 โ
โ type_modifier: -1 โ
โ format_code: 0 (text) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Field 2: "name" โ
โ table_oid: 16384 โ
โ column_attr_num: 2 โ
โ type_oid: 25 (TEXT) โ
โ type_size: -1 (variable) โ
โ type_modifier: -1 โ
โ format_code: 0 (text) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
DataRow: Serializing Actual Rows
// src/wire_protocol/data_row.rs
pub struct DataRow {
pub values: Vec<Option<Vec<u8>>>, // None = NULL
pub format_codes: Vec<i16>,
}
impl DataRow {
pub fn serialize(&self, builder: &mut MessageBuilder) -> &[u8] {
// 'D' (1B) + Length (4B) + Num Values (2B) + Values...
builder.buffer.clear();
builder.buffer.push(b'D');
// Calculate payload length
let mut payload_len = 2u32; // Num values
for value in &self.values {
payload_len += 4; // Length prefix
if let Some(data) = value {
payload_len += data.len() as u32;
}
}
builder.buffer.extend_from_slice(&payload_len.to_be_bytes());
builder.buffer.extend_from_slice(&(self.values.len() as i16).to_be_bytes());
for value in &self.values {
match value {
None => {
// NULL: length = -1
builder.buffer.extend_from_slice(&(-1i32).to_be_bytes());
}
Some(data) => {
// Non-NULL: length + data
builder.buffer.extend_from_slice(&(data.len() as i32).to_be_bytes());
builder.buffer.extend_from_slice(data);
}
}
}
&builder.buffer
}
}
Example:
Row: id=1, name="Alice", email=NULL
DataRow:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ 'D' โ Length โ 3 values โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Value 1: 4 bytes โ "1" โ
โ Value 2: 5 bytes โ "Alice" โ
โ Value 3: -1 (NULL) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
Text vs. Binary Format
Text format (format_code = 0): Human-readable strings
INT4: "42"
TEXT: "Alice"
TIMESTAMP: "2026-03-29 14:30:00.123456+00"
Binary format (format_code = 1): Native representation
// src/wire_protocol/type_encoding.rs
pub fn encode_int4(value: i32, format: i16) -> Vec<u8> {
match format {
0 => value.to_string().into_bytes(), // Text
1 => value.to_be_bytes().to_vec(), // Binary
_ => panic!("Invalid format code"),
}
}
pub fn encode_text(value: &str, format: i16) -> Vec<u8> {
match format {
0 => value.as_bytes().to_vec(), // Text (UTF-8)
1 => {
// Binary: 4-byte length prefix + data
let mut buf = Vec::new();
buf.extend_from_slice(&(value.len() as i32).to_be_bytes());
buf.extend_from_slice(value.as_bytes());
buf
}
_ => panic!("Invalid format code"),
}
}
pub fn encode_timestamp(value: chrono::DateTime<chrono::Utc>, format: i16) -> Vec<u8> {
match format {
0 => value.format("%Y-%m-%d %H:%M:%S%.6f%z").to_string().into_bytes(),
1 => {
// PostgreSQL epoch: 2000-01-01 00:00:00 UTC
let epoch = chrono::DateTime::from_timestamp(946684800, 0).unwrap();
let micros = value.signed_duration_since(epoch).num_microseconds().unwrap();
micros.to_be_bytes().to_vec()
}
_ => panic!("Invalid format code"),
}
}
4 Extended Query Protocol
Why Extended Query?
Simple Query: SQL injection risk, no prepared statements
Client: Query("SELECT * FROM users WHERE id = " + user_input)
โ SQL injection vulnerability!
Extended Query: Prepared statements, parameter binding
Client: Parse("SELECT * FROM users WHERE id = $1")
Client: Bind([42])
Client: Execute()
โ Safe from SQL injection!
Extended Query Flow
Parse: Preparing a Statement
// src/wire_protocol/parse.rs
pub struct ParseMessage {
pub statement_name: String,
pub query: String,
pub parameter_types: Vec<u32>, // OID for each parameter
}
impl ParseMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// statement_name (null-terminated)
let statement_name = read_null_terminated(stream).await?;
// query (null-terminated)
let query = read_null_terminated(stream).await?;
// num_parameter_types (2B)
let mut num_types_buf = [0u8; 2];
stream.read_exact(&mut num_types_buf).await?;
let num_types = i16::from_be_bytes(num_types_buf);
// parameter_types (4B each)
let mut parameter_types = Vec::new();
for _ in 0..num_types {
let mut type_buf = [0u8; 4];
stream.read_exact(&mut type_buf).await?;
parameter_types.push(u32::from_be_bytes(type_buf));
}
Ok(Self {
statement_name,
query,
parameter_types,
})
}
}
// Server response
pub fn parse_complete(builder: &mut MessageBuilder) -> &[u8] {
// '1' (1B) + Length (4B = 4)
builder.buffer.clear();
builder.buffer.push(b'1');
builder.buffer.extend_from_slice(&4u32.to_be_bytes());
&builder.buffer
}
Bind: Creating a Portal
// src/wire_protocol/bind.rs
pub struct BindMessage {
pub portal_name: String,
pub statement_name: String,
pub parameter_format_codes: Vec<i16>,
pub parameter_values: Vec<Option<Vec<u8>>>,
pub result_format_codes: Vec<i16>,
}
impl BindMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// portal_name (null-terminated)
let portal_name = read_null_terminated(stream).await?;
// statement_name (null-terminated)
let statement_name = read_null_terminated(stream).await?;
// num_parameter_format_codes (2B)
let num_formats = read_i16(stream).await?;
// parameter_format_codes
let mut parameter_format_codes = Vec::new();
for _ in 0..num_formats {
parameter_format_codes.push(read_i16(stream).await?);
}
// num_parameter_values (2B)
let num_values = read_i16(stream).await?;
// parameter_values
let mut parameter_values = Vec::new();
for _ in 0..num_values {
let len = read_i32(stream).await?;
if len == -1 {
parameter_values.push(None); // NULL
} else {
let mut data = vec![0u8; len as usize];
stream.read_exact(&mut data).await?;
parameter_values.push(Some(data));
}
}
// num_result_format_codes (2B)
let num_result_formats = read_i16(stream).await?;
// result_format_codes
let mut result_format_codes = Vec::new();
for _ in 0..num_result_formats {
result_format_codes.push(read_i16(stream).await?);
}
Ok(Self {
portal_name,
statement_name,
parameter_format_codes,
parameter_values,
result_format_codes,
})
}
}
// Server response
pub fn bind_complete(builder: &mut MessageBuilder) -> &[u8] {
// '2' (1B) + Length (4B = 4)
builder.buffer.clear();
builder.buffer.push(b'2');
builder.buffer.extend_from_slice(&4u32.to_be_bytes());
&builder.buffer
}
Execute: Running the Prepared Statement
// src/wire_protocol/execute.rs
pub struct ExecuteMessage {
pub portal_name: String,
pub max_rows: i32, // 0 = all rows
}
impl ExecuteMessage {
pub async fn read_from(stream: &mut TcpStream) -> Result<Self, ProtocolError> {
let portal_name = read_null_terminated(stream).await?;
let max_rows = read_i32(stream).await?;
Ok(Self { portal_name, max_rows })
}
}
Server response: DataRow messages (no specific โExecuteCompleteโ message)
Sync: Finishing the Batch
// src/wire_protocol/sync.rs
pub struct SyncMessage;
impl SyncMessage {
pub async fn read_from(_stream: &mut TcpStream) -> Result<Self, ProtocolError> {
// Sync has no body, just the message header
Ok(SyncMessage)
}
}
// Server response
pub fn sync_complete(builder: &mut MessageBuilder, status: TransactionStatus) -> &[u8] {
// CommandComplete + ReadyForQuery
builder.buffer.clear();
// CommandComplete: 'C' + Length + "SELECT 2\0"
builder.buffer.push(b'C');
let cmd = b"SELECT 2";
builder.buffer.extend_from_slice(&((cmd.len() + 1) as u32).to_be_bytes());
builder.buffer.extend_from_slice(cmd);
builder.buffer.push(0);
&builder.buffer
}
5 Complete Query Execution Flow
Putting It All Together
// src/wire_protocol/handler.rs
use tokio::net::TcpStream;
use crate::query_executor::QueryExecutor;
use crate::storage::buffer_pool::BufferPool;
pub struct ProtocolHandler {
stream: TcpStream,
executor: QueryExecutor,
builder: MessageBuilder,
prepared_statements: HashMap<String, PreparedStatement>,
portals: HashMap<String, Portal>,
}
impl ProtocolHandler {
pub async fn handle_connection(mut stream: TcpStream) -> Result<(), ProtocolError> {
// 1. Read startup message
let startup = StartupMessage::read_from(&mut stream).await?;
// 2. Send authentication
stream.write_all(self.builder.authentication_ok()).await?;
// 3. Send parameter status
stream.write_all(self.builder.parameter_status("server_version", "16.0")).await?;
stream.write_all(self.builder.parameter_status("server_encoding", "UTF8")).await?;
stream.write_all(self.builder.parameter_status("client_encoding", "UTF8")).await?;
// 4. Send ready for query
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
// 5. Main message loop
loop {
let mut type_buf = [0u8; 1];
stream.read_exact(&mut type_buf).await?;
match type_buf[0] as char {
'Q' => self.handle_simple_query(&mut stream).await?,
'P' => self.handle_parse(&mut stream).await?,
'B' => self.handle_bind(&mut stream).await?,
'E' => self.handle_execute(&mut stream).await?,
'S' => self.handle_sync(&mut stream).await?,
'X' => {
// Terminate
return Ok(());
}
_ => return Err(ProtocolError::UnknownMessage(type_buf[0])),
}
}
}
async fn handle_simple_query(&mut self, stream: &mut TcpStream) -> Result<(), ProtocolError> {
// Read query string
let query = read_null_terminated(stream).await?;
// Execute query
let result = self.executor.execute(&query).await?;
// Send RowDescription (if SELECT)
if let Some(columns) = result.columns {
let row_desc = self.create_row_description(&columns);
stream.write_all(row_desc.serialize(&mut self.builder)).await?;
// Send DataRows
for row in result.rows {
let data_row = self.create_data_row(&row);
stream.write_all(data_row.serialize(&mut self.builder)).await?;
}
}
// Send CommandComplete
self.builder.command_complete(&result.command_tag);
stream.write_all(&self.builder.buffer).await?;
// Send ReadyForQuery
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
Ok(())
}
}
Result Set Serialization Example
// src/wire_protocol/result_set.rs
pub struct ResultSet {
pub columns: Vec<Column>,
pub rows: Vec<Row>,
pub command_tag: String,
}
pub struct Column {
pub name: String,
pub type_oid: u32,
pub type_size: i16,
}
pub struct Row {
pub values: Vec<Option<String>>,
}
impl ResultSet {
pub fn send_to(&self, stream: &mut TcpStream, builder: &mut MessageBuilder) -> Result<(), io::Error> {
// RowDescription
let fields: Vec<FieldDescription> = self.columns.iter().map(|col| {
FieldDescription {
name: col.name.clone(),
table_oid: 0,
column_attr_num: 0,
type_oid: col.type_oid,
type_size: col.type_size,
type_modifier: -1,
format_code: 0, // Text format
}
}).collect();
let row_desc = RowDescription { fields };
stream.write_all(row_desc.serialize(builder))?;
// DataRows
for row in &self.rows {
let values: Vec<Option<Vec<u8>>> = row.values.iter()
.map(|v| v.as_ref().map(|s| s.as_bytes().to_vec()))
.collect();
let data_row = DataRow {
values,
format_codes: vec![0; self.columns.len()],
};
stream.write_all(data_row.serialize(builder))?;
}
// CommandComplete
builder.command_complete(&self.command_tag);
stream.write_all(&builder.buffer)?;
Ok(())
}
}
// Usage example
let result = ResultSet {
columns: vec![
Column { name: "id".to_string(), type_oid: 23, type_size: 4 },
Column { name: "name".to_string(), type_oid: 25, type_size: -1 },
],
rows: vec![
Row { values: vec![Some("1".to_string()), Some("Alice".to_string())] },
Row { values: vec![Some("2".to_string()), Some("Bob".to_string())] },
],
command_tag: "SELECT 2".to_string(),
};
result.send_to(&mut stream, &mut builder)?;
What psql receives:
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
โ T (RowDescription) โ
โ 2 columns: id (INT4), name (TEXT) โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ D (DataRow) โ
โ id=1, name="Alice" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ D (DataRow) โ
โ id=2, name="Bob" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ C (CommandComplete) โ
โ "SELECT 2" โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโค
โ Z (ReadyForQuery) โ
โ Status: Idle โ
โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
6 PostgreSQL Type OIDs
Common Types
| Type Name | OID | Size | Description |
|---|---|---|---|
BOOL | 16 | 1 | Boolean |
INT2 (SMALLINT) | 21 | 2 | 2-byte integer |
INT4 (INTEGER) | 23 | 4 | 4-byte integer |
INT8 (BIGINT) | 20 | 8 | 8-byte integer |
TEXT | 25 | -1 | Variable-length text |
VARCHAR | 1043 | -1 | Variable-length char |
TIMESTAMP | 1114 | 8 | Timestamp without timezone |
TIMESTAMPTZ | 1184 | 8 | Timestamp with timezone |
FLOAT4 (REAL) | 700 | 4 | 4-byte float |
FLOAT8 (DOUBLE) | 701 | 8 | 8-byte float |
NUMERIC | 1700 | -1 | Arbitrary precision |
BYTEA | 17 | -1 | Binary data |
OID | 26 | 4 | Object identifier |
// src/wire_protocol/oids.rs
pub mod oid {
pub const BOOL: u32 = 16;
pub const INT2: u32 = 21;
pub const INT4: u32 = 23;
pub const INT8: u32 = 20;
pub const TEXT: u32 = 25;
pub const VARCHAR: u32 = 1043;
pub const TIMESTAMP: u32 = 1114;
pub const TIMESTAMPTZ: u32 = 1184;
pub const FLOAT4: u32 = 700;
pub const FLOAT8: u32 = 701;
pub const NUMERIC: u32 = 1700;
pub const BYTEA: u32 = 17;
pub const OID: u32 = 26;
}
7 Challenges Building in Rust
Challenge 1: Async I/O and Borrowing
Problem: tokio requires &mut self for async I/O, but we need to borrow from self.
// โ Doesn't compile
impl ProtocolHandler {
pub async fn handle_query(&mut self) -> Result<(), Error> {
let query = self.read_query().await?; // Borrows self
let result = self.executor.execute(&query).await?; // Also borrows self!
// Error: cannot borrow as mutable more than once
}
}
Solution: Restructure to avoid concurrent borrows
// โ
Works
impl ProtocolHandler {
pub async fn handle_query(&mut self) -> Result<(), Error> {
let query = self.read_query().await?;
// Release borrow before next operation
let result = {
self.executor.execute(&query).await?
};
self.send_result(result).await?;
Ok(())
}
}
Challenge 2: Zero-Copy vs. Allocation
Problem: Wire protocol messages need to be serialized. Copying is expensive.
// โ Allocates on every message
pub fn serialize_row(&self) -> Vec<u8> {
let mut buffer = Vec::new();
buffer.push(b'D');
// ... lots of allocations ...
buffer
}
Solution: Reuse buffers
// โ
Reuses allocated buffer
pub struct MessageBuilder {
buffer: Vec<u8>, // Pre-allocated, reused
}
impl MessageBuilder {
pub fn with_capacity(capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(capacity),
}
}
pub fn data_row(&mut self, row: &Row) -> &[u8] {
self.buffer.clear(); // Reuse capacity
self.buffer.push(b'D');
// ... write to buffer ...
&self.buffer // Return reference, not owned
}
}
Challenge 3: Error Handling Across Layers
Problem: Wire protocol errors, query errors, storage errorsโall different types.
// โ Error type explosion
pub enum Error {
Io(io::Error),
Protocol(ProtocolError),
Query(QueryError),
Storage(StorageError),
Transaction(TransactionError),
// ... 20 more variants ...
}
Solution: Use thiserror and conversion traits
// โ
Clean error handling
#[derive(Debug, thiserror::Error)]
pub enum ProtocolError {
#[error("IO error: {0}")]
Io(#[from] io::Error),
#[error("Invalid message type: {0}")]
UnknownMessage(u8),
#[error("Query error: {0}")]
Query(#[from] QueryError),
}
// Use ? operator for automatic conversion
pub async fn handle_query(&mut self) -> Result<(), ProtocolError> {
let query = self.read_query().await?; // io::Error โ ProtocolError
let result = self.executor.execute(&query).await?; // QueryError โ ProtocolError
Ok(())
}
8 How AI Accelerated This
What AI Got Right
| Task | AI Contribution |
|---|---|
| Message format | Correct big-endian encoding |
| Extended query flow | Parse โ Bind โ Execute sequence |
| Type OIDs | Accurate PostgreSQL type OIDs |
| Null handling | -1 length prefix for NULLs |
What AI Got Wrong
| Issue | What Happened |
|---|---|
| Length calculation | First draft didnโt include length bytes in length |
| Startup message | Tried to add type byte (startup has none!) |
| Binary format | Suggested little-endian (PostgreSQL uses big-endian) |
| Portal lifetime | Missed that portals are destroyed after Execute |
Pattern: Wire protocol is precise. Off-by-one errors break everything.
Example: Debugging psql Connection
My question to AI:
โpsql connects but immediately disconnects. Whatโs wrong?โ
What I learned:
- psql expects specific ParameterStatus messages
- Missing
server_versioncauses silent disconnect - ReadyForQuery must be sent after authentication
Result: Added required parameters:
stream.write_all(self.builder.parameter_status("server_version", "16.0")).await?;
stream.write_all(self.builder.parameter_status("server_encoding", "UTF8")).await?;
stream.write_all(self.builder.parameter_status("client_encoding", "UTF8")).await?;
stream.write_all(self.builder.ready_for_query(TransactionStatus::Idle)).await?;
Now psql connects successfully!
$ psql -h localhost -p 5432 -U neo vaultgres
psql (16.0, server 16.0 (Vaultgres))
Type "help" for help.
vaultgres=> SELECT 1;
?column?
----------
1
(1 row)
Summary: Wire Protocol in One Diagram
Key Takeaways:
| Concept | Why It Matters |
|---|---|
| Wire protocol | Compatibility with existing PostgreSQL tools |
| Message framing | Length-prefixed binary protocol |
| Simple vs. Extended | Quick queries vs. prepared statements |
| RowDescription | Column metadata for clients |
| DataRow | Actual row data (text or binary) |
| Type OIDs | PostgreSQL type identification |
| Null encoding | -1 length prefix |
Further Reading:
- PostgreSQL Source:
src/backend/tcop/postgres.c - PostgreSQL Source:
src/include/libpq/pqformat.h - โPostgreSQL Wire Protocolโ documentation: https://www.postgresql.org/docs/current/protocol.html
- libpq source:
src/interfaces/libpq/ - Vaultgres Repository: github.com/neoalienson/Vaultgres
Comments
Please accept the "Functionality" cookie category to view and post comments.
Comments failed to load. You can try again or view the discussion directly on GitHub.
View on GitHub