A high-level PostgreSQL pgoutput logical replication value decoder for Ruby.
pgoutput-decoder is the companion layer to pgoutput-parser. It accepts immutable protocol messages produced by pgoutput-parser and turns tuple payloads into application-friendly Ruby row-change events.
It does not parse PostgreSQL wire bytes and it does not open replication connections. Those concerns belong to lower-level parser and future client layers.
- Ruby 3.4+
pgoutput-parser~> 0.1
pgoutput-parser
│
▼
Protocol messages
│
▼
pgoutput-decoder
│
▼
Decoded row-change events
- Decodes PostgreSQL OID-backed tuple values
- Builds Ruby hashes from relation columns and tuple values
- Tracks relation metadata from
Relationmessages - Tracks active transaction context from
Begin/Commitmessages - Attaches
transaction_idto DML events - Returns immutable, Ractor-shareable event objects
- Supports custom OID decoders
This gem intentionally does not:
- Parse PostgreSQL
CopyDatabytes - Manage replication slots
- Open replication connections
- Maintain WAL acknowledgements
- Reconnect to PostgreSQL
- Publish events to queues
- Integrate with ActiveRecord
gem "pgoutput-decoder"Then:
bundle installRequire it with:
require "pgoutput/decoder"require "pgoutput"
require "pgoutput/decoder"
stream = Pgoutput::RelationTracker.new
decoder = Pgoutput::Decoder.new
protocol_message = stream.process(payload)
event = decoder.decode(protocol_message)A Relation message updates decoder metadata and returns nil:
decoder.decode(relation_message)
# => nilAn insert message returns a decoded event:
event = decoder.decode(insert_message)
event.transaction_id
# => 789
event.schema
# => "public"
event.table
# => "users"
event.values
# => { "id" => 7, "name" => "Alice", "active" => true }PostgreSQL pgoutput carries the transaction ID in the Begin (B) message, not on every row-change message.
The decoder remembers the active transaction and attaches it to decoded DML events:
decoder.decode(begin_message)
decoder.decode(relation_message)
insert = decoder.decode(insert_message)
insert.transaction_id
# => 789The transaction ID is useful for grouping changes, debugging, and CDC processing. It should not be treated as a globally permanent identifier because PostgreSQL transaction IDs can wrap around.
Pgoutput::Decoder::Events::Begin
Pgoutput::Decoder::Events::Commit
Pgoutput::Decoder::Events::Insert
Pgoutput::Decoder::Events::Update
Pgoutput::Decoder::Events::DeleteThe default registry supports common scalar PostgreSQL OIDs:
| OID | Type |
|---|---|
| 16 | boolean |
| 20 | bigint |
| 21 | smallint |
| 23 | integer |
| 25 | text |
| 114 | json |
| 700 | real |
| 701 | double precision |
| 1043 | varchar |
| 1082 | date |
| 1114 | timestamp |
| 1184 | timestamptz |
| 1700 | numeric |
| 2950 | uuid |
| 3802 | jsonb |
Unsupported OIDs are returned as frozen raw strings.
Binary decoding is intentionally conservative.
The decoder handles safe fixed-width binary scalar types such as:
- boolean
- int2
- int4
- int8
- float4
- float8
Unsupported binary values are preserved as frozen raw bytes.
registry =
Pgoutput::Decoder::TypeRegistry.default.with_decoder(999_999) do |raw, format|
format == :text ? "custom:#{raw}" : raw
end
decoder = Pgoutput::Decoder.new(type_registry: registry)update = decoder.decode(update_message)
update.old_key
# => { "id" => 7 } or nil
update.old_values
# => { ... } or nil
update.new_values
# => { "id" => 7, "name" => "Bob" }delete = decoder.decode(delete_message)
delete.old_key
# => { "id" => 7 } or nil
delete.old_values
# => { ... } or nilDecoded events are deeply shareable:
event = decoder.decode(update_message)
Ractor.shareable?(event)
# => trueThe decoder instance itself is stateful and should not be shared across Ractors.
bundle exec rake testWith coverage:
COVERAGE=true bundle exec rake testbundle exec steep checkMIT.