Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lightning-transaction-sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ electrum-client = { version = "0.24.0", optional = true, default-features = fals

[dev-dependencies]
lightning = { version = "0.2.0", path = "../lightning", default-features = false, features = ["std", "_test_utils"] }
tokio = { version = "1.35.0", features = ["macros"] }
tokio = { version = "1.35.0", features = ["macros", "rt", "rt-multi-thread", "net", "io-util", "time"] }

[target.'cfg(not(target_os = "windows"))'.dev-dependencies]
electrsd = { version = "0.35.0", default-features = false, features = ["legacy"] }
Expand Down
156 changes: 150 additions & 6 deletions lightning-transaction-sync/src/esplora.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ use esplora_client::Builder;
use core::ops::Deref;
use std::collections::HashSet;

/// Maximum number of concurrent in-flight Esplora requests issued while syncing
/// confirmed/unconfirmed transactions (async client only).
///
/// The Esplora chain sync re-confirms every watched transaction/output on each
/// pass, which is one or more HTTP round-trips each. Against a remote Esplora
/// these run sequentially in the stock client, so sync wall-time scales with
/// `watched_set * round_trip_latency` and easily exceeds an LDK wallet-sync
/// timeout on wallets with real channel history. We fan these out with a bounded
/// concurrency instead. The bound is deliberately small: when many nodes sync
/// against a shared, rate-limited endpoint the effective request rate is the
/// fleet aggregate, so a low per-node concurrency keeps us under the server's
/// per-client limit while still removing the strictly-serial latency floor.
#[cfg(feature = "async-interface")]
const ESPLORA_SYNC_CONCURRENCY: usize = 4;

/// Synchronizes LDK with a given [`Esplora`] server.
///
/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
Expand Down Expand Up @@ -298,19 +313,109 @@ where

let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();

// Phase A: resolve the confirmation status of each directly-watched
// transaction. `watched_transactions` is a set (unique txids), so a tx
// resolved here is never a duplicate; the async path fans the lookups
// out with bounded concurrency and merges the results.
#[cfg(feature = "async-interface")]
{
use futures::stream::{self, StreamExt};
let results: Vec<Result<Option<ConfirmedTx>, InternalError>> =
stream::iter(sync_state.watched_transactions.iter().copied())
.map(|txid| async move { self.get_confirmed_tx(txid, None, None).await })
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
.collect()
.await;
for r in results {
if let Some(confirmed_tx) = r? {
if !confirmed_txs.iter().any(|ctx| ctx.txid == confirmed_tx.txid) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: If watched transactions is a set, is it necessary to guard against inserting the same tx twice here? I see it was there before so probably best to keep it just looks strange at first glance.

confirmed_txs.push(confirmed_tx);
}
}
}
}
#[cfg(not(feature = "async-interface"))]
for txid in &sync_state.watched_transactions {
if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
continue;
}
if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
if let Some(confirmed_tx) = self.get_confirmed_tx(*txid, None, None)? {
confirmed_txs.push(confirmed_tx);
}
}

// Phase B: for each watched output, fetch its spend status and, if it was
// spent, resolve the spending transaction. Phase A is fully merged into
// `confirmed_txs` before this runs, so the consistency check below sees
// the same state the sequential version did.
#[cfg(feature = "async-interface")]
{
use futures::stream::{self, StreamExt};

// B1: fan out the output-status lookups.
let outpoints: Vec<_> =
sync_state.watched_outputs.values().map(|o| o.outpoint).collect();
let status_results: Vec<Result<Option<esplora_client::OutputStatus>, InternalError>> =
stream::iter(outpoints.into_iter())
.map(|outpoint| async move {
self.client
.get_output_status(&outpoint.txid, outpoint.index as u64)
.await
.map_err(InternalError::from)
})
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
.collect()
.await;

// B2: post-process sequentially, preserving every consistency check
// the sequential version performed, and build the to-fetch list.
let mut to_fetch: Vec<(Txid, Option<BlockHash>, Option<u32>)> = Vec::new();
for status_res in status_results {
let output_status = match status_res? {
Some(s) => s,
None => continue,
};
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
if spending_tx_status.confirmed {
continue;
} else {
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
return Err(InternalError::Inconsistency);
}
}
to_fetch.push((
spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
));
}
}
}
Comment on lines +370 to +395
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// B2: post-process sequentially, preserving every consistency check
// the sequential version performed, and build the to-fetch list.
let mut to_fetch: Vec<(Txid, Option<BlockHash>, Option<u32>)> = Vec::new();
for status_res in status_results {
let output_status = match status_res? {
Some(s) => s,
None => continue,
};
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
if spending_tx_status.confirmed {
continue;
} else {
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
return Err(InternalError::Inconsistency);
}
}
to_fetch.push((
spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
));
}
}
}
// B2: post-process sequentially, preserving every consistency check
// the sequential version performed, and build the to-fetch list.
let phase_a_txids: HashSet<Txid> =
confirmed_txs.iter().map(|ctx| ctx.txid).collect();
let mut to_fetch: Vec<(Txid, Option<BlockHash>, Option<u32>)> = Vec::new();
// `transpose` drops outputs with no status while still surfacing any
// lookup error through the `?` below.
for status_res in status_results.into_iter().filter_map(Result::transpose) {
let output_status = status_res?;
let (Some(spending_txid), Some(spending_tx_status)) =
(output_status.txid, output_status.status)
else {
continue;
};
if phase_a_txids.contains(&spending_txid) {
// Phase A already resolved this spend as confirmed; the server
// flipping it back to unconfirmed is an inconsistency.
if !spending_tx_status.confirmed {
log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
return Err(InternalError::Inconsistency);
}
continue;
}
to_fetch.push((
spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
));
}

nit: readability improvement


// B3: fan out the dependent confirmed-tx lookups.
let dep_results: Vec<Result<Option<ConfirmedTx>, InternalError>> =
stream::iter(to_fetch.into_iter())
.map(|(txid, bh, height)| async move {
self.get_confirmed_tx(txid, bh, height).await
})
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
.collect()
.await;
for r in dep_results {
if let Some(confirmed_tx) = r? {
if !confirmed_txs.iter().any(|ctx| ctx.txid == confirmed_tx.txid) {
confirmed_txs.push(confirmed_tx);
}
}
}
}
#[cfg(not(feature = "async-interface"))]
for (_, output) in &sync_state.watched_outputs {
if let Some(output_status) = maybe_await!(self
if let Some(output_status) = self
.client
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
.get_output_status(&output.outpoint.txid, output.outpoint.index as u64)?
{
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
Expand All @@ -324,11 +429,11 @@ where
}
}

if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
if let Some(confirmed_tx) = self.get_confirmed_tx(
spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
))? {
)? {
confirmed_txs.push(confirmed_tx);
}
}
Expand Down Expand Up @@ -436,9 +541,48 @@ where

let mut unconfirmed_txs = Vec::new();

// The async path fans the per-block status checks out with bounded
// concurrency. The `None` block hash is a hard invariant violation
// (pre-0.0.113 channel), so we screen for it before fanning out rather
// than panicking from inside a concurrent task.
#[cfg(feature = "async-interface")]
{
use futures::stream::{self, StreamExt};
let mut items: Vec<(Txid, BlockHash)> = Vec::with_capacity(relevant_txids.len());
for (txid, _conf_height, block_hash_opt) in relevant_txids {
if let Some(block_hash) = block_hash_opt {
items.push((txid, block_hash));
} else {
log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
}
}
let results: Vec<(Txid, Result<esplora_client::BlockStatus, InternalError>)> =
stream::iter(items.into_iter())
.map(|(txid, block_hash)| async move {
let r = self
.client
.get_block_status(&block_hash)
.await
.map_err(InternalError::from);
(txid, r)
})
.buffer_unordered(ESPLORA_SYNC_CONCURRENCY)
.collect()
.await;
for (txid, status_res) in results {
let block_status = status_res?;
if block_status.in_best_chain {
// Skip if the block in question is still confirmed.
continue;
}
unconfirmed_txs.push(txid);
}
}
#[cfg(not(feature = "async-interface"))]
for (txid, _conf_height, block_hash_opt) in relevant_txids {
if let Some(block_hash) = block_hash_opt {
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
let block_status = self.client.get_block_status(&block_hash)?;
if block_status.in_best_chain {
// Skip if the block in question is still confirmed.
continue;
Expand Down
165 changes: 165 additions & 0 deletions lightning-transaction-sync/tests/parallel_sync_timing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
#![cfg(all(not(target_os = "windows"), feature = "esplora-async"))]

//! Self-contained before/after timing test for the parallelized Esplora tx_sync.
//!
//! Stands up a minimal mock Esplora HTTP server that injects a fixed latency on
//! the per-transaction `/merkleblock-proof` lookups (the fan-out the confirmed
//! sync parallelizes) and returns 404 for them, which esplora-client maps to
//! `Ok(None)` -> no confirmations, so the run is deterministic and needs no
//! bitcoind/electrs.
//!
//! With `N` watched transactions and `DELAY` per merkle lookup:
//! - strictly-serial sync ~= N * DELAY
//! - parallel sync (buffer_unordered(C)) ~= ceil(N / C) * DELAY
//!
//! The test asserts the wall-time is well under the serial estimate, so it
//! PASSES on the parallel implementation and would FAIL on the old serial one.
//! That is the "before/after": run it on the base commit (serial) to see it
//! blow the bound, and on this branch (parallel) to see it pass.
//!
//! Correctness of actual confirmation handling / ordering is covered by the
//! electrs-backed `test_esplora_syncs` integration test (run in CI).

use lightning::chain::transaction::TransactionData;
use lightning::chain::{Confirm, Filter};
use lightning::util::test_utils::TestLogger;
use lightning_transaction_sync::EsploraSyncClient;

use bitcoin::block::Header;
use bitcoin::consensus::encode::serialize_hex;
use bitcoin::constants::genesis_block;
use bitcoin::hashes::Hash;
use bitcoin::network::Network;
use bitcoin::{BlockHash, ScriptBuf, Txid};

use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpListener;

use std::time::{Duration, Instant};

// Number of watched transactions and the per-merkle-lookup latency injected by
// the mock. Chosen so serial (~N*DELAY = 4s) and parallel (~1s) are clearly
// separable with margin for CI jitter.
const N_TXS: usize = 40;
const DELAY_MS: u64 = 100;

// Minimal `Confirm` that reports nothing relevant, so `get_unconfirmed_transactions`
// has no work and the run is dominated by the confirmed-tx merkle fan-out.
struct NoopConfirmable;
impl Confirm for NoopConfirmable {
fn transactions_confirmed(&self, _h: &Header, _txdata: &TransactionData, _height: u32) {}
fn transaction_unconfirmed(&self, _txid: &Txid) {}
fn best_block_updated(&self, _h: &Header, _height: u32) {}
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
Vec::new()
}
}

// Spawn a mock Esplora server on an ephemeral port; returns the port.
async fn spawn_mock_esplora() -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();

// Precompute valid canned responses from the real genesis block.
let genesis = genesis_block(Network::Bitcoin);
let tip_hash_hex = genesis.block_hash().to_string(); // 64 hex chars
let header_hex = serialize_hex(&genesis.header); // 80-byte header, 160 hex chars

tokio::spawn(async move {
loop {
let (mut sock, _) = match listener.accept().await {
Ok(x) => x,
Err(_) => continue,
};
let tip = tip_hash_hex.clone();
let hdr = header_hex.clone();
tokio::spawn(async move {
// Read until end of request headers.
let mut buf = Vec::new();
let mut tmp = [0u8; 1024];
loop {
match sock.read(&mut tmp).await {
Ok(0) => break,
Ok(n) => {
buf.extend_from_slice(&tmp[..n]);
if buf.windows(4).any(|w| w == b"\r\n\r\n") {
break;
}
},
Err(_) => return,
}
}
let req = String::from_utf8_lossy(&buf);
let path = req
.lines()
.next()
.and_then(|l| l.split_whitespace().nth(1))
.unwrap_or("/")
.to_string();

let (status, body): (&str, String) = if path == "/blocks/tip/hash" {
("200 OK", tip.clone())
} else if path.ends_with("/header") {
("200 OK", hdr.clone())
} else if path.ends_with("/status") {
("200 OK", "{\"in_best_chain\":true,\"height\":100,\"next_best\":null}".into())
} else if path.contains("/merkleblock-proof") {
// The fan-out under test: inject latency, then 404 -> Ok(None).
tokio::time::sleep(Duration::from_millis(DELAY_MS)).await;
("404 Not Found", String::new())
} else {
("404 Not Found", String::new())
};

let resp = format!(
"HTTP/1.1 {}\r\ncontent-type: text/plain\r\ncontent-length: {}\r\nconnection: close\r\n\r\n{}",
status,
body.len(),
body
);
let _ = sock.write_all(resp.as_bytes()).await;
let _ = sock.shutdown().await;
});
}
});

port
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn parallel_tx_sync_beats_serial_bound() {
let port = spawn_mock_esplora().await;
let url = format!("http://127.0.0.1:{}", port);
let mut logger = TestLogger::new();
let tx_sync = EsploraSyncClient::new(url, &mut logger);

// Register N distinct watched transactions; each resolves to a 404 merkle
// lookup (Ok(None)), so no confirmations are produced.
let script = ScriptBuf::new();
for i in 0..N_TXS {
let txid = Txid::from_byte_array([(i as u8).wrapping_add(1); 32]);
tx_sync.register_tx(&txid, script.as_script());
}

let confirmable = NoopConfirmable;
let confirmables: Vec<&NoopConfirmable> = vec![&confirmable];

let start = Instant::now();
tx_sync.sync(confirmables).await.expect("sync should complete");
let elapsed = start.elapsed();
println!(
"[parallel-timing] N={} delay={}ms -> sync took {}ms (serial estimate ~{}ms)",
N_TXS,
DELAY_MS,
elapsed.as_millis(),
(N_TXS as u64) * DELAY_MS
);

let serial_estimate_ms = (N_TXS as u64) * DELAY_MS;
assert!(
elapsed.as_millis() < (serial_estimate_ms / 2) as u128,
"sync took {}ms; expected well under serial estimate {}ms -- parallel fan-out not happening?",
elapsed.as_millis(),
serial_estimate_ms
);
}
Loading