From bab132703f07b48ce319101612071030a6061830 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 11:55:26 +0200 Subject: [PATCH 01/12] Add shared lock helpers Introduce a small lock helper module so mutex and rwlock access can use a single naming convention instead of repeating direct locking at every call site. Co-Authored-By: HAL 9000 --- src/util/locks.rs | 33 +++++++++++++++++++++++++++++++++ src/util/mod.rs | 8 ++++++++ 2 files changed, 41 insertions(+) create mode 100644 src/util/locks.rs create mode 100644 src/util/mod.rs diff --git a/src/util/locks.rs b/src/util/locks.rs new file mode 100644 index 000000000..4afe2d046 --- /dev/null +++ b/src/util/locks.rs @@ -0,0 +1,33 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; + +pub(crate) trait MutexExt { + fn lck(&self) -> MutexGuard<'_, T>; +} + +impl MutexExt for Mutex { + fn lck(&self) -> MutexGuard<'_, T> { + self.lock().expect("mutex poisoning indicates a broken internal invariant") + } +} + +pub(crate) trait RwLockExt { + fn rlck(&self) -> RwLockReadGuard<'_, T>; + fn wlck(&self) -> RwLockWriteGuard<'_, T>; +} + +impl RwLockExt for RwLock { + fn rlck(&self) -> RwLockReadGuard<'_, T> { + self.read().expect("rwlock poisoning indicates a broken internal invariant") + } + + fn wlck(&self) -> RwLockWriteGuard<'_, T> { + self.write().expect("rwlock poisoning indicates a broken internal invariant") + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 000000000..3cde8e381 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,8 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +pub(crate) mod locks; From 8ebaab17812320bc97723c39bc3a8d1fcee16d1c Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 11:55:35 +0200 Subject: [PATCH 02/12] Route lock access through the shared helpers Switch the codebase over to the new lock helper names so lock poisoning behavior is centralized and the call sites stay shorter and more consistent. Co-Authored-By: HAL 9000 --- src/builder.rs | 78 ++++++++----------- src/chain/bitcoind.rs | 39 +++++----- src/chain/electrum.rs | 33 ++++---- src/chain/esplora.rs | 18 +++-- src/chain/mod.rs | 5 +- src/connection.rs | 5 +- src/data_store.rs | 15 ++-- src/event.rs | 28 ++++--- src/fee_estimator.rs | 6 +- src/io/sqlite_store/mod.rs | 18 +++-- src/io/vss_store.rs | 40 ++++++---- src/lib.rs | 43 ++++++---- src/liquidity.rs | 44 +++++------ src/logger.rs | 3 +- src/payment/asynchronous/om_mailbox.rs | 15 ++-- .../asynchronous/static_invoice_store.rs | 3 +- src/payment/bolt11.rs | 13 ++-- src/payment/bolt12.rs | 18 +++-- src/payment/onchain.rs | 5 +- src/payment/spontaneous.rs | 5 +- src/peer_store.rs | 9 ++- src/runtime.rs | 15 ++-- src/scoring.rs | 10 ++- src/wallet/mod.rs | 72 ++++++++--------- 24 files changed, 292 insertions(+), 248 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index cd8cc184f..c56f2ed54 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -80,6 +80,7 @@ use crate::types::{ GossipSync, Graph, KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, SyncAndAsyncKVStore, }; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; use crate::{Node, NodeMetrics}; @@ -861,7 +862,7 @@ impl ArcedNodeBuilder { pub fn set_chain_source_esplora( &self, server_url: String, sync_config: Option, ) { - self.inner.write().unwrap().set_chain_source_esplora(server_url, sync_config); + self.inner.wlck().set_chain_source_esplora(server_url, sync_config); } /// Configures the [`Node`] instance to source its chain data from the given Esplora server. @@ -875,11 +876,7 @@ impl ArcedNodeBuilder { &self, server_url: String, headers: HashMap, sync_config: Option, ) { - self.inner.write().unwrap().set_chain_source_esplora_with_headers( - server_url, - headers, - sync_config, - ); + self.inner.wlck().set_chain_source_esplora_with_headers(server_url, headers, sync_config); } /// Configures the [`Node`] instance to source its chain data from the given Electrum server. @@ -889,7 +886,7 @@ impl ArcedNodeBuilder { pub fn set_chain_source_electrum( &self, server_url: String, sync_config: Option, ) { - self.inner.write().unwrap().set_chain_source_electrum(server_url, sync_config); + self.inner.wlck().set_chain_source_electrum(server_url, sync_config); } /// Configures the [`Node`] instance to connect to a Bitcoin Core node via RPC. @@ -903,12 +900,7 @@ impl ArcedNodeBuilder { pub fn set_chain_source_bitcoind_rpc( &self, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, ) { - self.inner.write().unwrap().set_chain_source_bitcoind_rpc( - rpc_host, - rpc_port, - rpc_user, - rpc_password, - ); + self.inner.wlck().set_chain_source_bitcoind_rpc(rpc_host, rpc_port, rpc_user, rpc_password); } /// Configures the [`Node`] instance to synchronize chain data from a Bitcoin Core REST endpoint. @@ -924,7 +916,7 @@ impl ArcedNodeBuilder { &self, rest_host: String, rest_port: u16, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, ) { - self.inner.write().unwrap().set_chain_source_bitcoind_rest( + self.inner.wlck().set_chain_source_bitcoind_rest( rest_host, rest_port, rpc_host, @@ -937,20 +929,20 @@ impl ArcedNodeBuilder { /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer /// network. pub fn set_gossip_source_p2p(&self) { - self.inner.write().unwrap().set_gossip_source_p2p(); + self.inner.wlck().set_gossip_source_p2p(); } /// Configures the [`Node`] instance to source its gossip data from the given RapidGossipSync /// server. pub fn set_gossip_source_rgs(&self, rgs_server_url: String) { - self.inner.write().unwrap().set_gossip_source_rgs(rgs_server_url); + self.inner.wlck().set_gossip_source_rgs(rgs_server_url); } /// Configures the [`Node`] instance to source its external scores from the given URL. /// /// The external scores are merged into the local scoring system to improve routing. pub fn set_pathfinding_scores_source(&self, url: String) { - self.inner.write().unwrap().set_pathfinding_scores_source(url); + self.inner.wlck().set_pathfinding_scores_source(url); } /// Configures the [`Node`] instance to source inbound liquidity from the given @@ -964,7 +956,7 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps1( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps1(node_id, address, token); + self.inner.wlck().set_liquidity_source_lsps1(node_id, address, token); } /// Configures the [`Node`] instance to source just-in-time inbound liquidity from the given @@ -978,7 +970,7 @@ impl ArcedNodeBuilder { pub fn set_liquidity_source_lsps2( &self, node_id: PublicKey, address: SocketAddress, token: Option, ) { - self.inner.write().unwrap().set_liquidity_source_lsps2(node_id, address, token); + self.inner.wlck().set_liquidity_source_lsps2(node_id, address, token); } /// Configures the [`Node`] instance to provide an [LSPS2] service, issuing just-in-time @@ -988,12 +980,12 @@ impl ArcedNodeBuilder { /// /// [LSPS2]: https://github.com/BitcoinAndLightningLayerSpecs/lsp/blob/main/LSPS2/README.md pub fn set_liquidity_provider_lsps2(&self, service_config: LSPS2ServiceConfig) { - self.inner.write().unwrap().set_liquidity_provider_lsps2(service_config); + self.inner.wlck().set_liquidity_provider_lsps2(service_config); } /// Sets the used storage directory path. pub fn set_storage_dir_path(&self, storage_dir_path: String) { - self.inner.write().unwrap().set_storage_dir_path(storage_dir_path); + self.inner.wlck().set_storage_dir_path(storage_dir_path); } /// Configures the [`Node`] instance to write logs to the filesystem. @@ -1012,29 +1004,29 @@ impl ArcedNodeBuilder { pub fn set_filesystem_logger( &self, log_file_path: Option, log_level: Option, ) { - self.inner.write().unwrap().set_filesystem_logger(log_file_path, log_level); + self.inner.wlck().set_filesystem_logger(log_file_path, log_level); } /// Configures the [`Node`] instance to write logs to the [`log`](https://crates.io/crates/log) facade. pub fn set_log_facade_logger(&self) { - self.inner.write().unwrap().set_log_facade_logger(); + self.inner.wlck().set_log_facade_logger(); } /// Configures the [`Node`] instance to write logs to the provided custom [`LogWriter`]. pub fn set_custom_logger(&self, log_writer: Arc) { - self.inner.write().unwrap().set_custom_logger(log_writer); + self.inner.wlck().set_custom_logger(log_writer); } /// Sets the Bitcoin network used. pub fn set_network(&self, network: Network) { - self.inner.write().unwrap().set_network(network); + self.inner.wlck().set_network(network); } /// Sets the IP address and TCP port on which [`Node`] will listen for incoming network connections. pub fn set_listening_addresses( &self, listening_addresses: Vec, ) -> Result<(), BuildError> { - self.inner.write().unwrap().set_listening_addresses(listening_addresses).map(|_| ()) + self.inner.wlck().set_listening_addresses(listening_addresses).map(|_| ()) } /// Sets the IP address and TCP port which [`Node`] will announce to the gossip network that it accepts connections on. @@ -1045,7 +1037,7 @@ impl ArcedNodeBuilder { pub fn set_announcement_addresses( &self, announcement_addresses: Vec, ) -> Result<(), BuildError> { - self.inner.write().unwrap().set_announcement_addresses(announcement_addresses).map(|_| ()) + self.inner.wlck().set_announcement_addresses(announcement_addresses).map(|_| ()) } /// Configures the [`Node`] instance to use a Tor SOCKS proxy for outbound connections to peers with OnionV3 addresses. @@ -1054,7 +1046,7 @@ impl ArcedNodeBuilder { /// /// **Note**: If unset, connecting to peer OnionV3 addresses will fail. pub fn set_tor_config(&self, tor_config: TorConfig) -> Result<(), BuildError> { - self.inner.write().unwrap().set_tor_config(tor_config).map(|_| ()) + self.inner.wlck().set_tor_config(tor_config).map(|_| ()) } /// Sets the node alias that will be used when broadcasting announcements to the gossip @@ -1062,14 +1054,14 @@ impl ArcedNodeBuilder { /// /// The provided alias must be a valid UTF-8 string and no longer than 32 bytes in total. pub fn set_node_alias(&self, node_alias: String) -> Result<(), BuildError> { - self.inner.write().unwrap().set_node_alias(node_alias).map(|_| ()) + self.inner.wlck().set_node_alias(node_alias).map(|_| ()) } /// Sets the role of the node in an asynchronous payments context. pub fn set_async_payments_role( &self, role: Option, ) -> Result<(), BuildError> { - self.inner.write().unwrap().set_async_payments_role(role).map(|_| ()) + self.inner.wlck().set_async_payments_role(role).map(|_| ()) } /// Configures the [`Node`] to resync chain data from genesis on first startup, recovering any @@ -1078,13 +1070,13 @@ impl ArcedNodeBuilder { /// This should only be set on first startup when importing an older wallet from a previously /// used [`NodeEntropy`]. pub fn set_wallet_recovery_mode(&self) { - self.inner.write().unwrap().set_wallet_recovery_mode(); + self.inner.wlck().set_wallet_recovery_mode(); } /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { - self.inner.read().unwrap().build(*node_entropy).map(Arc::new) + self.inner.rlck().build(*node_entropy).map(Arc::new) } /// Builds a [`Node`] instance with a [`FilesystemStore`] backend and according to the options @@ -1092,7 +1084,7 @@ impl ArcedNodeBuilder { pub fn build_with_fs_store( &self, node_entropy: Arc, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_fs_store(*node_entropy).map(Arc::new) + self.inner.rlck().build_with_fs_store(*node_entropy).map(Arc::new) } /// Builds a [`Node`] instance with a [VSS] backend and according to the options @@ -1117,8 +1109,7 @@ impl ArcedNodeBuilder { fixed_headers: HashMap, ) -> Result, BuildError> { self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store(*node_entropy, vss_url, store_id, fixed_headers) .map(Arc::new) } @@ -1150,8 +1141,7 @@ impl ArcedNodeBuilder { lnurl_auth_server_url: String, fixed_headers: HashMap, ) -> Result, BuildError> { self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store_and_lnurl_auth( *node_entropy, vss_url, @@ -1179,8 +1169,7 @@ impl ArcedNodeBuilder { fixed_headers: HashMap, ) -> Result, BuildError> { self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store_and_fixed_headers(*node_entropy, vss_url, store_id, fixed_headers) .map(Arc::new) } @@ -1202,8 +1191,7 @@ impl ArcedNodeBuilder { ) -> Result, BuildError> { let adapter = Arc::new(crate::ffi::VssHeaderProviderAdapter::new(header_provider)); self.inner - .read() - .unwrap() + .rlck() .build_with_vss_store_and_header_provider(*node_entropy, vss_url, store_id, adapter) .map(Arc::new) } @@ -1214,7 +1202,7 @@ impl ArcedNodeBuilder { pub fn build_with_store( &self, node_entropy: Arc, kv_store: S, ) -> Result, BuildError> { - self.inner.read().unwrap().build_with_store(*node_entropy, kv_store).map(Arc::new) + self.inner.rlck().build_with_store(*node_entropy, kv_store).map(Arc::new) } } @@ -1610,7 +1598,7 @@ fn build_with_store_internal( // Restore external pathfinding scores from cache if possible. match external_scores_res { Ok(external_scores) => { - scorer.lock().unwrap().merge(external_scores, cur_time); + scorer.lck().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); }, Err(e) => { @@ -1763,7 +1751,7 @@ fn build_with_store_internal( // Reset the RGS sync timestamp in case we somehow switch gossip sources { - let mut locked_node_metrics = node_metrics.write().unwrap(); + let mut locked_node_metrics = node_metrics.wlck(); locked_node_metrics.latest_rgs_snapshot_timestamp = None; write_node_metrics(&*locked_node_metrics, &*kv_store, Arc::clone(&logger)) .map_err(|e| { @@ -1775,7 +1763,7 @@ fn build_with_store_internal( }, GossipSourceConfig::RapidGossipSync(rgs_server) => { let latest_sync_timestamp = - node_metrics.read().unwrap().latest_rgs_snapshot_timestamp.unwrap_or(0); + node_metrics.rlck().latest_rgs_snapshot_timestamp.unwrap_or(0); Arc::new(GossipSource::new_rgs( rgs_server.clone(), latest_sync_timestamp, diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 26924d8af..64cbf7829 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -42,6 +42,7 @@ use crate::fee_estimator::{ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{Error, NodeMetrics}; const CHAIN_POLLING_INTERVAL_SECS: u64 = 2; @@ -132,7 +133,7 @@ impl BitcoindChainSource { // First register for the wallet polling status to make sure `Node::sync_wallets` calls // wait on the result before proceeding. { - let mut status_lock = self.wallet_polling_status.lock().unwrap(); + let mut status_lock = self.wallet_polling_status.lck(); if status_lock.register_or_subscribe_pending_sync().is_some() { debug_assert!(false, "Sync already in progress. This should never happen."); } @@ -194,15 +195,17 @@ impl BitcoindChainSource { { Ok(chain_tip) => { { + #[allow(clippy::unwrap_used)] + let elapsed_ms = now.elapsed().unwrap().as_millis(); log_info!( self.logger, "Finished synchronizing listeners in {}ms", - now.elapsed().unwrap().as_millis() + elapsed_ms ); - *self.latest_chain_tip.write().unwrap() = Some(chain_tip); + *self.latest_chain_tip.wlck() = Some(chain_tip); let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = @@ -262,7 +265,7 @@ impl BitcoindChainSource { } // Now propagate the initial result to unblock waiting subscribers. - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(Ok(())); + self.wallet_polling_status.lck().propagate_result_to_subscribers(Ok(())); let mut chain_polling_interval = tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); @@ -346,7 +349,7 @@ impl BitcoindChainSource { match validate_res { Ok(tip) => { - *self.latest_chain_tip.write().unwrap() = Some(tip); + *self.latest_chain_tip.wlck() = Some(tip); Ok(tip) }, Err(e) => { @@ -361,7 +364,7 @@ impl BitcoindChainSource { chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.wallet_polling_status.lock().unwrap(); + let mut status_lock = self.wallet_polling_status.lck(); status_lock.register_or_subscribe_pending_sync() }; @@ -383,7 +386,7 @@ impl BitcoindChainSource { ) .await; - self.wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + self.wallet_polling_status.lck().propagate_result_to_subscribers(res); res } @@ -392,7 +395,7 @@ impl BitcoindChainSource { &self, onchain_wallet: Arc, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, ) -> Result<(), Error> { - let latest_chain_tip_opt = self.latest_chain_tip.read().unwrap().clone(); + let latest_chain_tip_opt = self.latest_chain_tip.rlck().clone(); let chain_tip = if let Some(tip) = latest_chain_tip_opt { tip } else { self.poll_chain_tip().await? }; @@ -410,12 +413,10 @@ impl BitcoindChainSource { let now = SystemTime::now(); match spv_client.poll_best_tip().await { Ok((ChainTip::Better(tip), true)) => { - log_trace!( - self.logger, - "Finished polling best tip in {}ms", - now.elapsed().unwrap().as_millis() - ); - *self.latest_chain_tip.write().unwrap() = Some(tip); + #[allow(clippy::unwrap_used)] + let elapsed_ms = now.elapsed().unwrap().as_millis(); + log_trace!(self.logger, "Finished polling best tip in {}ms", elapsed_ms); + *self.latest_chain_tip.wlck() = Some(tip); }, Ok(_) => {}, Err(e) => { @@ -434,12 +435,14 @@ impl BitcoindChainSource { .await { Ok((unconfirmed_txs, evicted_txids)) => { + #[allow(clippy::unwrap_used)] + let elapsed_ms = now.elapsed().unwrap().as_millis(); log_trace!( self.logger, "Finished polling mempool of size {} and {} evicted transactions in {}ms", unconfirmed_txs.len(), evicted_txids.len(), - now.elapsed().unwrap().as_millis() + elapsed_ms ); onchain_wallet.apply_mempool_txs(unconfirmed_txs, evicted_txids).unwrap_or_else( |e| { @@ -455,7 +458,7 @@ impl BitcoindChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; @@ -570,7 +573,7 @@ impl BitcoindChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } diff --git a/src/chain/electrum.rs b/src/chain/electrum.rs index 7b08c3845..d862c7433 100644 --- a/src/chain/electrum.rs +++ b/src/chain/electrum.rs @@ -34,6 +34,7 @@ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::NodeMetrics; const BDK_ELECTRUM_CLIENT_BATCH_SIZE: usize = 5; @@ -76,7 +77,7 @@ impl ElectrumChainSource { } pub(super) fn start(&self, runtime: Arc) -> Result<(), Error> { - self.electrum_runtime_status.write().unwrap().start( + self.electrum_runtime_status.wlck().start( self.server_url.clone(), self.sync_config.clone(), Arc::clone(&runtime), @@ -86,14 +87,14 @@ impl ElectrumChainSource { } pub(super) fn stop(&self) { - self.electrum_runtime_status.write().unwrap().stop(); + self.electrum_runtime_status.wlck().stop(); } pub(crate) async fn sync_onchain_wallet( &self, onchain_wallet: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.onchain_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -107,14 +108,14 @@ impl ElectrumChainSource { let res = self.sync_onchain_wallet_inner(onchain_wallet).await; - self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.onchain_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } async fn sync_onchain_wallet_inner(&self, onchain_wallet: Arc) -> Result<(), Error> { let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + if let Some(client) = self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { debug_assert!( @@ -126,7 +127,7 @@ impl ElectrumChainSource { // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = - self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + self.node_metrics.rlck().latest_onchain_wallet_sync_timestamp.is_some(); let apply_wallet_update = |update_res: Result, now: Instant| match update_res { @@ -141,7 +142,7 @@ impl ElectrumChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics( @@ -184,7 +185,7 @@ impl ElectrumChainSource { output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.lightning_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -199,7 +200,7 @@ impl ElectrumChainSource { let res = self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await; - self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.lightning_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } @@ -218,7 +219,7 @@ impl ElectrumChainSource { ]; let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + if let Some(client) = self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { debug_assert!( @@ -234,7 +235,7 @@ impl ElectrumChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } @@ -245,7 +246,7 @@ impl ElectrumChainSource { pub(crate) async fn update_fee_rate_estimates(&self) -> Result<(), Error> { let electrum_client: Arc = if let Some(client) = - self.electrum_runtime_status.read().unwrap().client().as_ref() + self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { @@ -267,7 +268,7 @@ impl ElectrumChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } @@ -277,7 +278,7 @@ impl ElectrumChainSource { pub(crate) async fn process_broadcast_package(&self, package: Vec) { let electrum_client: Arc = - if let Some(client) = self.electrum_runtime_status.read().unwrap().client().as_ref() { + if let Some(client) = self.electrum_runtime_status.rlck().client().as_ref() { Arc::clone(client) } else { debug_assert!(false, "We should have started the chain source before broadcasting"); @@ -292,10 +293,10 @@ impl ElectrumChainSource { impl Filter for ElectrumChainSource { fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { - self.electrum_runtime_status.write().unwrap().register_tx(txid, script_pubkey) + self.electrum_runtime_status.wlck().register_tx(txid, script_pubkey) } fn register_output(&self, output: lightning::chain::WatchedOutput) { - self.electrum_runtime_status.write().unwrap().register_output(output) + self.electrum_runtime_status.wlck().register_output(output) } } diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index 245db72f6..d93a2fbc6 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -25,6 +25,7 @@ use crate::fee_estimator::{ use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_debug, log_error, log_trace, LdkLogger, Logger}; use crate::types::{ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{Error, NodeMetrics}; pub(super) struct EsploraChainSource { @@ -54,6 +55,7 @@ impl EsploraChainSource { client_builder = client_builder.header(header_name, header_value); } + #[allow(clippy::unwrap_used)] let esplora_client = client_builder.build_async().unwrap(); let tx_sync = Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger))); @@ -78,7 +80,7 @@ impl EsploraChainSource { &self, onchain_wallet: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.onchain_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.onchain_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -92,7 +94,7 @@ impl EsploraChainSource { let res = self.sync_onchain_wallet_inner(onchain_wallet).await; - self.onchain_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.onchain_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } @@ -101,7 +103,7 @@ impl EsploraChainSource { // If this is our first sync, do a full scan with the configured gap limit. // Otherwise just do an incremental sync. let incremental_sync = - self.node_metrics.read().unwrap().latest_onchain_wallet_sync_timestamp.is_some(); + self.node_metrics.rlck().latest_onchain_wallet_sync_timestamp.is_some(); macro_rules! get_and_apply_wallet_update { ($sync_future: expr) => {{ @@ -121,7 +123,7 @@ impl EsploraChainSource { .ok() .map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics( &*locked_node_metrics, @@ -207,7 +209,7 @@ impl EsploraChainSource { output_sweeper: Arc, ) -> Result<(), Error> { let receiver_res = { - let mut status_lock = self.lightning_wallet_sync_status.lock().unwrap(); + let mut status_lock = self.lightning_wallet_sync_status.lck(); status_lock.register_or_subscribe_pending_sync() }; if let Some(mut sync_receiver) = receiver_res { @@ -222,7 +224,7 @@ impl EsploraChainSource { let res = self.sync_lightning_wallet_inner(channel_manager, chain_monitor, output_sweeper).await; - self.lightning_wallet_sync_status.lock().unwrap().propagate_result_to_subscribers(res); + self.lightning_wallet_sync_status.lck().propagate_result_to_subscribers(res); res } @@ -259,7 +261,7 @@ impl EsploraChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; @@ -344,7 +346,7 @@ impl EsploraChainSource { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = self.node_metrics.write().unwrap(); + let mut locked_node_metrics = self.node_metrics.wlck(); locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*self.kv_store, &*self.logger)?; } diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 49c011a78..03df993d0 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -27,6 +27,7 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::logger::{log_debug, log_info, log_trace, LdkLogger, Logger}; use crate::runtime::Runtime; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; +use crate::util::locks::MutexExt; use crate::{Error, NodeMetrics}; pub(crate) enum WalletSyncStatus { @@ -215,7 +216,7 @@ impl ChainSource { } pub(crate) fn registered_txids(&self) -> Vec { - self.registered_txids.lock().unwrap().clone() + self.registered_txids.lck().clone() } pub(crate) fn is_transaction_based(&self) -> bool { @@ -472,7 +473,7 @@ impl ChainSource { impl Filter for ChainSource { fn register_tx(&self, txid: &Txid, script_pubkey: &Script) { - self.registered_txids.lock().unwrap().push(*txid); + self.registered_txids.lck().push(*txid); match &self.kind { ChainSourceKind::Esplora(esplora_chain_source) => { esplora_chain_source.register_tx(txid, script_pubkey) diff --git a/src/connection.rs b/src/connection.rs index a1d24e36d..799dc056d 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -16,6 +16,7 @@ use lightning::ln::msgs::SocketAddress; use crate::config::TorConfig; use crate::logger::{log_debug, log_error, log_info, LdkLogger}; use crate::types::{KeysManager, PeerManager}; +use crate::util::locks::MutexExt; use crate::Error; pub(crate) struct ConnectionManager @@ -238,7 +239,7 @@ where fn register_or_subscribe_pending_connection( &self, node_id: &PublicKey, ) -> Option>> { - let mut pending_connections_lock = self.pending_connections.lock().unwrap(); + let mut pending_connections_lock = self.pending_connections.lck(); match pending_connections_lock.entry(*node_id) { hash_map::Entry::Occupied(mut entry) => { let (tx, rx) = tokio::sync::oneshot::channel(); @@ -254,7 +255,7 @@ where fn propagate_result_to_subscribers(&self, node_id: &PublicKey, res: Result<(), Error>) { // Send the result to any other tasks that might be waiting on it by now. - let mut pending_connections_lock = self.pending_connections.lock().unwrap(); + let mut pending_connections_lock = self.pending_connections.lck(); if let Some(connection_ready_senders) = pending_connections_lock.remove(node_id) { for sender in connection_ready_senders { let _ = sender.send(res).map_err(|e| { diff --git a/src/data_store.rs b/src/data_store.rs index ac5c78fb7..3634571a2 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -14,6 +14,7 @@ use lightning::util::ser::{Readable, Writeable}; use crate::logger::{log_error, LdkLogger}; use crate::types::DynStore; +use crate::util::locks::MutexExt; use crate::Error; pub(crate) trait StorableObject: Clone + Readable + Writeable { @@ -65,7 +66,7 @@ where } pub(crate) fn insert(&self, object: SO) -> Result { - let mut locked_objects = self.objects.lock().unwrap(); + let mut locked_objects = self.objects.lck(); self.persist(&object)?; let updated = locked_objects.insert(object.id(), object).is_some(); @@ -73,7 +74,7 @@ where } pub(crate) fn insert_or_update(&self, object: SO) -> Result { - let mut locked_objects = self.objects.lock().unwrap(); + let mut locked_objects = self.objects.lck(); let updated; match locked_objects.entry(object.id()) { @@ -95,7 +96,7 @@ where } pub(crate) fn remove(&self, id: &SO::Id) -> Result<(), Error> { - let removed = self.objects.lock().unwrap().remove(id).is_some(); + let removed = self.objects.lck().remove(id).is_some(); if removed { let store_key = id.encode_to_hex_str(); KVStoreSync::remove( @@ -121,11 +122,11 @@ where } pub(crate) fn get(&self, id: &SO::Id) -> Option { - self.objects.lock().unwrap().get(id).cloned() + self.objects.lck().get(id).cloned() } pub(crate) fn update(&self, update: SO::Update) -> Result { - let mut locked_objects = self.objects.lock().unwrap(); + let mut locked_objects = self.objects.lck(); if let Some(object) = locked_objects.get_mut(&update.id()) { let updated = object.update(update); @@ -141,7 +142,7 @@ where } pub(crate) fn list_filter bool>(&self, f: F) -> Vec { - self.objects.lock().unwrap().values().filter(f).cloned().collect::>() + self.objects.lck().values().filter(f).cloned().collect::>() } fn persist(&self, object: &SO) -> Result<(), Error> { @@ -169,7 +170,7 @@ where } pub(crate) fn contains_key(&self, id: &SO::Id) -> bool { - self.objects.lock().unwrap().contains_key(id) + self.objects.lck().contains_key(id) } } diff --git a/src/event.rs b/src/event.rs index f06d701bc..e2f953e6a 100644 --- a/src/event.rs +++ b/src/event.rs @@ -56,6 +56,7 @@ use crate::runtime::Runtime; use crate::types::{ CustomTlvRecord, DynStore, KeysManager, OnionMessenger, PaymentStore, Sweeper, Wallet, }; +use crate::util::locks::MutexExt; use crate::{ hex_utils, BumpTransactionEventHandler, ChannelManager, Error, Graph, PeerInfo, PeerStore, UserChannelId, @@ -370,21 +371,21 @@ where pub(crate) async fn add_event(&self, event: Event) -> Result<(), Error> { let data = { - let mut locked_queue = self.queue.lock().unwrap(); + let mut locked_queue = self.queue.lck(); locked_queue.push_back(event); EventQueueSerWrapper(&locked_queue).encode() }; self.persist_queue(data).await?; - if let Some(waker) = self.waker.lock().unwrap().take() { + if let Some(waker) = self.waker.lck().take() { waker.wake(); } Ok(()) } pub(crate) fn next_event(&self) -> Option { - let locked_queue = self.queue.lock().unwrap(); + let locked_queue = self.queue.lck(); locked_queue.front().cloned() } @@ -394,14 +395,14 @@ where pub(crate) async fn event_handled(&self) -> Result<(), Error> { let data = { - let mut locked_queue = self.queue.lock().unwrap(); + let mut locked_queue = self.queue.lck(); locked_queue.pop_front(); EventQueueSerWrapper(&locked_queue).encode() }; self.persist_queue(data).await?; - if let Some(waker) = self.waker.lock().unwrap().take() { + if let Some(waker) = self.waker.lck().take() { waker.wake(); } Ok(()) @@ -485,10 +486,10 @@ impl Future for EventFuture { fn poll( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll { - if let Some(event) = self.event_queue.lock().unwrap().front() { + if let Some(event) = self.event_queue.lck().front() { Poll::Ready(event.clone()) } else { - *self.waker.lock().unwrap() = Some(cx.waker().clone()); + *self.waker.lck() = Some(cx.waker().clone()); Poll::Pending } } @@ -1091,11 +1092,13 @@ where }; self.payment_store.get(&payment_id).map(|payment| { + #[allow(clippy::unwrap_used)] + let amount_msat = payment.amount_msat.unwrap(); log_info!( self.logger, "Successfully sent payment of {}msat{} from \ payment hash {:?} with preimage {:?}", - payment.amount_msat.unwrap(), + amount_msat, if let Some(fee) = fee_paid_msat { format!(" (fee {} msat)", fee) } else { @@ -1256,7 +1259,9 @@ where } let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + self.keys_manager.get_secure_random_bytes()[..16] + .try_into() + .expect("a 16-byte slice should convert into a [u8; 16]"), ); let allow_0conf = self.config.trusted_peers_0conf.contains(&counterparty_node_id); let mut channel_override_config = None; @@ -1446,10 +1451,13 @@ where counterparty_node_id, ); + #[allow(clippy::unwrap_used)] + let former_temporary_channel_id = former_temporary_channel_id.unwrap(); + let event = Event::ChannelPending { channel_id, user_channel_id: UserChannelId(user_channel_id), - former_temporary_channel_id: former_temporary_channel_id.unwrap(), + former_temporary_channel_id, counterparty_node_id, funding_txo, }; diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs index b787ecd33..ad73d0d58 100644 --- a/src/fee_estimator.rs +++ b/src/fee_estimator.rs @@ -14,6 +14,8 @@ use lightning::chain::chaininterface::{ FEERATE_FLOOR_SATS_PER_KW, }; +use crate::util::locks::RwLockExt; + #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq)] pub(crate) enum ConfirmationTarget { /// The default target for onchain payments. @@ -48,7 +50,7 @@ impl OnchainFeeEstimator { pub(crate) fn set_fee_rate_cache( &self, fee_rate_cache_update: HashMap, ) -> bool { - let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); + let mut locked_fee_rate_cache = self.fee_rate_cache.wlck(); if fee_rate_cache_update != *locked_fee_rate_cache { *locked_fee_rate_cache = fee_rate_cache_update; true @@ -60,7 +62,7 @@ impl OnchainFeeEstimator { impl FeeEstimator for OnchainFeeEstimator { fn estimate_fee_rate(&self, confirmation_target: ConfirmationTarget) -> FeeRate { - let locked_fee_rate_cache = self.fee_rate_cache.read().unwrap(); + let locked_fee_rate_cache = self.fee_rate_cache.rlck(); let fallback_sats_kwu = get_fallback_rate_for_target(confirmation_target); diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 94e8360fc..4d72220c5 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -21,6 +21,7 @@ use lightning_types::string::PrintableString; use rusqlite::{named_params, Connection}; use crate::io::utils::check_namespace_key_validity; +use crate::util::locks::MutexExt; mod migrations; @@ -288,6 +289,7 @@ impl SqliteStoreInner { })?; let sql = format!("SELECT user_version FROM pragma_user_version"); + #[allow(clippy::unwrap_used)] let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap(); if version_res == 0 { @@ -364,7 +366,7 @@ impl SqliteStoreInner { } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { - let mut outer_lock = self.write_version_locks.lock().unwrap(); + let mut outer_lock = self.write_version_locks.lck(); Arc::clone(&outer_lock.entry(locking_key).or_default()) } @@ -373,7 +375,7 @@ impl SqliteStoreInner { ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sql = format!("SELECT value FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); @@ -423,7 +425,7 @@ impl SqliteStoreInner { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; self.execute_locked_write(inner_lock_ref, locking_key, version, || { - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sort_order = self.next_sort_order.fetch_add(1, Ordering::Relaxed); @@ -467,7 +469,7 @@ impl SqliteStoreInner { check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; self.execute_locked_write(inner_lock_ref, locking_key, version, || { - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sql = format!("DELETE FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace AND key=:key;", self.kv_table_name); @@ -500,7 +502,7 @@ impl SqliteStoreInner { ) -> io::Result> { check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); let sql = format!( "SELECT key FROM {} WHERE primary_namespace=:primary_namespace AND secondary_namespace=:secondary_namespace", @@ -546,7 +548,7 @@ impl SqliteStoreInner { "list_paginated", )?; - let locked_conn = self.connection.lock().unwrap(); + let locked_conn = self.connection.lck(); // Fetch one extra row beyond PAGE_SIZE to determine whether a next page exists. let fetch_limit = (PAGE_SIZE + 1) as i64; @@ -644,7 +646,7 @@ impl SqliteStoreInner { &self, inner_lock_ref: Arc>, locking_key: String, version: u64, callback: F, ) -> Result<(), lightning::io::Error> { let res = { - let mut last_written_version = inner_lock_ref.lock().unwrap(); + let mut last_written_version = inner_lock_ref.lck(); // Check if we already have a newer version written/removed. This is used in async contexts to realize eventual // consistency. @@ -670,7 +672,7 @@ impl SqliteStoreInner { // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already // counted. - let mut outer_lock = self.write_version_locks.lock().unwrap(); + let mut outer_lock = self.write_version_locks.lck(); let strong_count = Arc::strong_count(&inner_lock_ref); debug_assert!(strong_count >= 2, "Unexpected SqliteStore strong count"); diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 2f7a689b2..9aefaaa7f 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -45,6 +45,7 @@ use vss_client::util::storable_builder::{EntropySource, StorableBuilder}; use crate::entropy::NodeEntropy; use crate::io::utils::check_namespace_key_validity; use crate::lnurl_auth::LNURL_AUTH_HARDENED_CHILD_INDEX; +use crate::util::locks::MutexExt; type CustomRetryPolicy = FilteredRetryPolicy< JitteredRetryPolicy< @@ -100,17 +101,20 @@ impl VssStore { header_provider: Arc, ) -> io::Result { let next_version = AtomicU64::new(1); - let internal_runtime = tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("ldk-node-vss-runtime-{}", id) - }) - .worker_threads(INTERNAL_RUNTIME_WORKERS) - .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) - .build() - .unwrap(); + let internal_runtime = { + #[allow(clippy::unwrap_used)] + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("ldk-node-vss-runtime-{}", id) + }) + .worker_threads(INTERNAL_RUNTIME_WORKERS) + .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) + .build() + .unwrap() + }; let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); @@ -419,7 +423,7 @@ impl VssStoreInner { } fn get_inner_lock_ref(&self, locking_key: String) -> Arc> { - let mut outer_lock = self.locks.lock().unwrap(); + let mut outer_lock = self.locks.lck(); Arc::clone(&outer_lock.entry(locking_key).or_default()) } @@ -526,7 +530,10 @@ impl VssStoreInner { // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] - let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { + let storable = Storable::decode( + &resp.value.expect("successful VSS reads should include a value payload").value[..], + ) + .map_err(|e| { let msg = format!( "Failed to decode data read from key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -672,7 +679,7 @@ impl VssStoreInner { // to prevent leaking memory. The two arcs that are expected are the one in the map and the one held here in // inner_lock_ref. The outer lock is obtained first, to avoid a new arc being cloned after we've already // counted. - let mut outer_lock = self.locks.lock().unwrap(); + let mut outer_lock = self.locks.lck(); let strong_count = Arc::strong_count(&inner_lock_ref); debug_assert!(strong_count >= 2, "Unexpected VssStore strong count"); @@ -739,7 +746,10 @@ async fn determine_and_write_schema_version( // unwrap safety: resp.value must be always present for a non-erroneous VSS response, otherwise // it is an API-violation which is converted to [`VssError::InternalServerError`] in [`VssClient`] - let storable = Storable::decode(&resp.value.unwrap().value[..]).map_err(|e| { + let storable = Storable::decode( + &resp.value.expect("successful VSS reads should include a value payload").value[..], + ) + .map_err(|e| { let msg = format!("Failed to decode schema version: {}", e); Error::new(ErrorKind::Other, msg) })?; diff --git a/src/lib.rs b/src/lib.rs index 2ac4697e8..da5e13dee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -105,6 +105,7 @@ mod runtime; mod scoring; mod tx_broadcaster; mod types; +mod util; mod wallet; use std::default::Default; @@ -180,6 +181,7 @@ pub use types::{ChannelDetails, CustomTlvRecord, PeerDetails, SyncAndAsyncKVStor pub use vss_client; use crate::scoring::setup_background_pathfinding_scores_sync; +use crate::util::locks::RwLockExt; use crate::wallet::FundingAmount; #[cfg(feature = "uniffi")] @@ -253,7 +255,7 @@ impl Node { /// a thread-safe manner. pub fn start(&self) -> Result<(), Error> { // Acquire a run lock and hold it until we're setup. - let mut is_running_lock = self.is_running.write().unwrap(); + let mut is_running_lock = self.is_running.wlck(); if *is_running_lock { return Err(Error::AlreadyRunning); } @@ -321,7 +323,7 @@ impl Node { now.elapsed().as_millis() ); { - let mut locked_node_metrics = gossip_node_metrics.write().unwrap(); + let mut locked_node_metrics = gossip_node_metrics.wlck(); locked_node_metrics.latest_rgs_snapshot_timestamp = Some(updated_timestamp); write_node_metrics(&*locked_node_metrics, &*gossip_sync_store, Arc::clone(&gossip_sync_logger)) .unwrap_or_else(|e| { @@ -419,13 +421,16 @@ impl Node { break; } res = listener.accept() => { + #[allow(clippy::unwrap_used)] let tcp_stream = res.unwrap().0; let peer_mgr = Arc::clone(&peer_mgr); runtime.spawn_cancellable_background_task(async move { + #[allow(clippy::unwrap_used)] + let tcp_stream = tcp_stream.into_std().unwrap(); lightning_net_tokio::setup_inbound( Arc::clone(&peer_mgr), - tcp_stream.into_std().unwrap(), - ) + tcp_stream, + ) .await; }); } @@ -497,7 +502,7 @@ impl Node { return; } _ = interval.tick() => { - let skip_broadcast = match bcast_node_metrics.read().unwrap().latest_node_announcement_broadcast_timestamp { + let skip_broadcast = match bcast_node_metrics.rlck().latest_node_announcement_broadcast_timestamp { Some(latest_bcast_time_secs) => { // Skip if the time hasn't elapsed yet. let next_bcast_unix_time = SystemTime::UNIX_EPOCH + Duration::from_secs(latest_bcast_time_secs) + NODE_ANN_BCAST_INTERVAL; @@ -538,7 +543,7 @@ impl Node { let unix_time_secs_opt = SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); { - let mut locked_node_metrics = bcast_node_metrics.write().unwrap(); + let mut locked_node_metrics = bcast_node_metrics.wlck(); locked_node_metrics.latest_node_announcement_broadcast_timestamp = unix_time_secs_opt; write_node_metrics(&*locked_node_metrics, &*bcast_store, Arc::clone(&bcast_logger)) .unwrap_or_else(|e| { @@ -645,7 +650,13 @@ impl Node { Some(background_scorer), sleeper, true, - || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()), + || { + Some( + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("current time should not be earlier than the Unix epoch"), + ) + }, ) .await .unwrap_or_else(|e| { @@ -683,7 +694,7 @@ impl Node { /// /// After this returns most API methods will return [`Error::NotRunning`]. pub fn stop(&self) -> Result<(), Error> { - let mut is_running_lock = self.is_running.write().unwrap(); + let mut is_running_lock = self.is_running.wlck(); if !*is_running_lock { return Err(Error::NotRunning); } @@ -747,9 +758,9 @@ impl Node { /// Returns the status of the [`Node`]. pub fn status(&self) -> NodeStatus { - let is_running = *self.is_running.read().unwrap(); + let is_running = *self.is_running.rlck(); let current_best_block = self.channel_manager.current_best_block().into(); - let locked_node_metrics = self.node_metrics.read().unwrap(); + let locked_node_metrics = self.node_metrics.rlck(); let latest_lightning_wallet_sync_timestamp = locked_node_metrics.latest_lightning_wallet_sync_timestamp; let latest_onchain_wallet_sync_timestamp = @@ -1078,7 +1089,7 @@ impl Node { pub fn connect( &self, node_id: PublicKey, address: SocketAddress, persist: bool, ) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -1108,7 +1119,7 @@ impl Node { /// Will also remove the peer from the peer store, i.e., after this has been called we won't /// try to reconnect on restart. pub fn disconnect(&self, counterparty_node_id: PublicKey) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -1130,7 +1141,7 @@ impl Node { push_to_counterparty_msat: Option, channel_config: Option, announce_for_forwarding: bool, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -1193,7 +1204,9 @@ impl Node { let push_msat = push_to_counterparty_msat.unwrap_or(0); let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + self.keys_manager.get_secure_random_bytes()[..16] + .try_into() + .expect("a 16-byte slice should convert into a [u8; 16]"), ); match self.channel_manager.create_channel( @@ -1641,7 +1654,7 @@ impl Node { /// /// [`EsploraSyncConfig::background_sync_config`]: crate::config::EsploraSyncConfig::background_sync_config pub fn sync_wallets(&self) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/liquidity.rs b/src/liquidity.rs index 485da941c..b9c32b315 100644 --- a/src/liquidity.rs +++ b/src/liquidity.rs @@ -45,6 +45,7 @@ use crate::runtime::Runtime; use crate::types::{ Broadcaster, ChannelManager, DynStore, KeysManager, LiquidityManager, PeerManager, Wallet, }; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{total_anchor_channels_reserve_sats, Config, Error}; const LIQUIDITY_REQUEST_TIMEOUT_SECS: u64 = 5; @@ -302,7 +303,7 @@ where L::Target: LdkLogger, { pub(crate) fn set_peer_manager(&self, peer_manager: Weak) { - *self.peer_manager.write().unwrap() = Some(peer_manager); + *self.peer_manager.wlck() = Some(peer_manager); } pub(crate) fn liquidity_manager(&self) -> Arc { @@ -404,11 +405,8 @@ where return; } - if let Some(sender) = lsps1_client - .pending_opening_params_requests - .lock() - .unwrap() - .remove(&request_id) + if let Some(sender) = + lsps1_client.pending_opening_params_requests.lck().remove(&request_id) { let response = LSPS1OpeningParamsResponse { supported_options }; @@ -460,11 +458,8 @@ where return; } - if let Some(sender) = lsps1_client - .pending_create_order_requests - .lock() - .unwrap() - .remove(&request_id) + if let Some(sender) = + lsps1_client.pending_create_order_requests.lck().remove(&request_id) { let response = LSPS1OrderStatus { order_id, @@ -518,11 +513,8 @@ where return; } - if let Some(sender) = lsps1_client - .pending_check_order_status_requests - .lock() - .unwrap() - .remove(&request_id) + if let Some(sender) = + lsps1_client.pending_check_order_status_requests.lck().remove(&request_id) { let response = LSPS1OrderStatus { order_id, @@ -642,7 +634,9 @@ where }; let user_channel_id: u128 = u128::from_ne_bytes( - self.keys_manager.get_secure_random_bytes()[..16].try_into().unwrap(), + self.keys_manager.get_secure_random_bytes()[..16] + .try_into() + .expect("a 16-byte slice should convert into a [u8; 16]"), ); let intercept_scid = self.channel_manager.get_intercept_scid(); @@ -717,7 +711,7 @@ where }; let init_features = if let Some(Some(peer_manager)) = - self.peer_manager.read().unwrap().as_ref().map(|weak| weak.upgrade()) + self.peer_manager.rlck().as_ref().map(|weak| weak.upgrade()) { // Fail if we're not connected to the prospective channel partner. if let Some(peer) = peer_manager.peer_by_node_id(&their_network_key) { @@ -828,7 +822,7 @@ where } if let Some(sender) = - lsps2_client.pending_fee_requests.lock().unwrap().remove(&request_id) + lsps2_client.pending_fee_requests.lck().remove(&request_id) { let response = LSPS2FeeResponse { opening_fee_params_menu }; @@ -880,7 +874,7 @@ where } if let Some(sender) = - lsps2_client.pending_buy_requests.lock().unwrap().remove(&request_id) + lsps2_client.pending_buy_requests.lck().remove(&request_id) { let response = LSPS2BuyResponse { intercept_scid, cltv_expiry_delta }; @@ -930,7 +924,7 @@ where let (request_sender, request_receiver) = oneshot::channel(); { let mut pending_opening_params_requests_lock = - lsps1_client.pending_opening_params_requests.lock().unwrap(); + lsps1_client.pending_opening_params_requests.lck(); let request_id = client_handler.request_supported_options(lsps1_client.lsp_node_id); pending_opening_params_requests_lock.insert(request_id, request_sender); } @@ -1013,7 +1007,7 @@ where let request_id; { let mut pending_create_order_requests_lock = - lsps1_client.pending_create_order_requests.lock().unwrap(); + lsps1_client.pending_create_order_requests.lck(); request_id = client_handler.create_order( &lsps1_client.lsp_node_id, order_params.clone(), @@ -1059,7 +1053,7 @@ where let (request_sender, request_receiver) = oneshot::channel(); { let mut pending_check_order_status_requests_lock = - lsps1_client.pending_check_order_status_requests.lock().unwrap(); + lsps1_client.pending_check_order_status_requests.lck(); let request_id = client_handler.check_order_status(&lsps1_client.lsp_node_id, order_id); pending_check_order_status_requests_lock.insert(request_id, request_sender); } @@ -1200,7 +1194,7 @@ where let (fee_request_sender, fee_request_receiver) = oneshot::channel(); { - let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lock().unwrap(); + let mut pending_fee_requests_lock = lsps2_client.pending_fee_requests.lck(); let request_id = client_handler .request_opening_params(lsps2_client.lsp_node_id, lsps2_client.token.clone()); pending_fee_requests_lock.insert(request_id, fee_request_sender); @@ -1233,7 +1227,7 @@ where let (buy_request_sender, buy_request_receiver) = oneshot::channel(); { - let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lock().unwrap(); + let mut pending_buy_requests_lock = lsps2_client.pending_buy_requests.lck(); let request_id = client_handler .select_opening_params(lsps2_client.lsp_node_id, amount_msat, opening_fee_params) .map_err(|e| { diff --git a/src/logger.rs b/src/logger.rs index fed64d7a5..af17d5842 100644 --- a/src/logger.rs +++ b/src/logger.rs @@ -308,6 +308,7 @@ mod tests { use std::sync::Mutex; use super::*; + use crate::util::locks::MutexExt; /// A minimal log facade logger that captures log output for testing. struct TestLogger { @@ -320,7 +321,7 @@ mod tests { } fn log(&self, record: &log::Record) { - *self.log.lock().unwrap() = record.args().to_string(); + *self.log.lck() = record.args().to_string(); } fn flush(&self) {} diff --git a/src/payment/asynchronous/om_mailbox.rs b/src/payment/asynchronous/om_mailbox.rs index 9a7478706..5da916765 100644 --- a/src/payment/asynchronous/om_mailbox.rs +++ b/src/payment/asynchronous/om_mailbox.rs @@ -4,6 +4,8 @@ use std::sync::Mutex; use bitcoin::secp256k1::PublicKey; use lightning::ln::msgs::OnionMessage; +use crate::util::locks::MutexExt; + pub(crate) struct OnionMessageMailbox { map: Mutex>>, } @@ -17,7 +19,7 @@ impl OnionMessageMailbox { } pub(crate) fn onion_message_intercepted(&self, peer_node_id: PublicKey, message: OnionMessage) { - let mut map = self.map.lock().unwrap(); + let mut map = self.map.lck(); let queue = map.entry(peer_node_id).or_insert_with(VecDeque::new); if queue.len() >= Self::MAX_MESSAGES_PER_PEER { @@ -27,8 +29,11 @@ impl OnionMessageMailbox { // Enforce a peers limit. If exceeded, evict the peer with the longest queue. if map.len() > Self::MAX_PEERS { - let peer_to_remove = - map.iter().max_by_key(|(_, queue)| queue.len()).map(|(peer, _)| *peer).unwrap(); + let peer_to_remove = map + .iter() + .max_by_key(|(_, queue)| queue.len()) + .map(|(peer, _)| *peer) + .expect("a peer must exist when the mailbox exceeds its peer limit"); map.remove(&peer_to_remove); } @@ -37,7 +42,7 @@ impl OnionMessageMailbox { pub(crate) fn onion_message_peer_connected( &self, peer_node_id: PublicKey, ) -> Vec { - let mut map = self.map.lock().unwrap(); + let mut map = self.map.lck(); if let Some(queue) = map.remove(&peer_node_id) { queue.into() @@ -48,7 +53,7 @@ impl OnionMessageMailbox { #[cfg(test)] pub(crate) fn is_empty(&self) -> bool { - let map = self.map.lock().unwrap(); + let map = self.map.lck(); map.is_empty() } } diff --git a/src/payment/asynchronous/static_invoice_store.rs b/src/payment/asynchronous/static_invoice_store.rs index cd0e2ebd2..0e9753db8 100644 --- a/src/payment/asynchronous/static_invoice_store.rs +++ b/src/payment/asynchronous/static_invoice_store.rs @@ -22,6 +22,7 @@ use crate::hex_utils; use crate::io::STATIC_INVOICE_STORE_PRIMARY_NAMESPACE; use crate::payment::asynchronous::rate_limiter::RateLimiter; use crate::types::DynStore; +use crate::util::locks::MutexExt; struct PersistedStaticInvoice { invoice: StaticInvoice, @@ -63,7 +64,7 @@ impl StaticInvoiceStore { fn check_rate_limit( limiter: &Mutex, recipient_id: &[u8], ) -> Result<(), lightning::io::Error> { - let mut limiter = limiter.lock().unwrap(); + let mut limiter = limiter.lck(); if !limiter.allow(recipient_id) { Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, "Rate limit exceeded")) } else { diff --git a/src/payment/bolt11.rs b/src/payment/bolt11.rs index f2857e814..7c79685eb 100644 --- a/src/payment/bolt11.rs +++ b/src/payment/bolt11.rs @@ -37,6 +37,7 @@ use crate::payment::store::{ use crate::peer_store::{PeerInfo, PeerStore}; use crate::runtime::Runtime; use crate::types::{ChannelManager, PaymentStore}; +use crate::util::locks::RwLockExt; #[cfg(not(feature = "uniffi"))] type Bolt11Invoice = LdkBolt11Invoice; @@ -241,7 +242,7 @@ impl Bolt11Payment { pub fn send( &self, invoice: &Bolt11Invoice, route_parameters: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -275,7 +276,9 @@ impl Bolt11Payment { ) { Ok(()) => { let payee_pubkey = invoice.recover_payee_pub_key(); - let amt_msat = invoice.amount_milli_satoshis().unwrap(); + let amt_msat = invoice + .amount_milli_satoshis() + .expect("zero-amount invoices should be rejected before initiating payment"); log_info!(self.logger, "Initiated sending {}msat to {}", amt_msat, payee_pubkey); let kind = PaymentKind::Bolt11 { @@ -342,7 +345,7 @@ impl Bolt11Payment { &self, invoice: &Bolt11Invoice, amount_msat: u64, route_parameters: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -776,7 +779,7 @@ impl Bolt11Payment { pub fn send_probes( &self, invoice: &Bolt11Invoice, route_parameters: Option, ) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -831,7 +834,7 @@ impl Bolt11Payment { &self, invoice: &Bolt11Invoice, amount_msat: u64, route_parameters: Option, ) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/payment/bolt12.rs b/src/payment/bolt12.rs index 980e20696..677abed83 100644 --- a/src/payment/bolt12.rs +++ b/src/payment/bolt12.rs @@ -30,6 +30,7 @@ use crate::ffi::{maybe_deref, maybe_wrap}; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::types::{ChannelManager, KeysManager, PaymentStore}; +use crate::util::locks::RwLockExt; #[cfg(not(feature = "uniffi"))] type Bolt12Invoice = lightning::offers::invoice::Bolt12Invoice; @@ -89,7 +90,7 @@ impl Bolt12Payment { &self, offer: &Offer, amount_msat: u64, quantity: Option, payer_note: Option, route_parameters: Option, hrn: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -207,7 +208,7 @@ impl Bolt12Payment { if let Some(expiry_secs) = expiry_secs { let absolute_expiry = (SystemTime::now() + Duration::from_secs(expiry_secs as u64)) .duration_since(UNIX_EPOCH) - .unwrap(); + .expect("a future expiry should not be earlier than the Unix epoch"); offer_builder = offer_builder.absolute_expiry(absolute_expiry); } @@ -219,7 +220,10 @@ impl Bolt12Payment { log_error!(self.logger, "Failed to create offer: quantity can't be zero."); return Err(Error::InvalidQuantity); } else { - offer = offer.supported_quantity(Quantity::Bounded(NonZeroU64::new(qty).unwrap())) + offer = offer.supported_quantity(Quantity::Bounded( + NonZeroU64::new(qty) + .expect("qty == 0 was rejected before constructing NonZeroU64"), + )) }; }; @@ -262,7 +266,7 @@ impl Bolt12Payment { &self, offer: &Offer, quantity: Option, payer_note: Option, route_parameters: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -405,7 +409,7 @@ impl Bolt12Payment { if let Some(expiry_secs) = expiry_secs { let absolute_expiry = (SystemTime::now() + Duration::from_secs(expiry_secs as u64)) .duration_since(UNIX_EPOCH) - .unwrap(); + .expect("a future expiry should not be earlier than the Unix epoch"); offer_builder = offer_builder.absolute_expiry(absolute_expiry); } @@ -425,7 +429,7 @@ impl Bolt12Payment { /// [`Refund`]: lightning::offers::refund::Refund /// [`Bolt12Invoice`]: lightning::offers::invoice::Bolt12Invoice pub fn request_refund_payment(&self, refund: &Refund) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -474,7 +478,7 @@ impl Bolt12Payment { let absolute_expiry = (SystemTime::now() + Duration::from_secs(expiry_secs as u64)) .duration_since(UNIX_EPOCH) - .unwrap(); + .expect("a future expiry should not be earlier than the Unix epoch"); let retry_strategy = Retry::Timeout(LDK_PAYMENT_RETRY_TIMEOUT); let route_parameters = route_parameters.or(self.config.route_parameters).unwrap_or_default(); diff --git a/src/payment/onchain.rs b/src/payment/onchain.rs index cc16690e2..e711353ec 100644 --- a/src/payment/onchain.rs +++ b/src/payment/onchain.rs @@ -16,6 +16,7 @@ use crate::config::Config; use crate::error::Error; use crate::logger::{log_info, LdkLogger, Logger}; use crate::types::{ChannelManager, Wallet}; +use crate::util::locks::RwLockExt; use crate::wallet::OnchainSendAmount; #[cfg(not(feature = "uniffi"))] @@ -80,7 +81,7 @@ impl OnchainPayment { pub fn send_to_address( &self, address: &bitcoin::Address, amount_sats: u64, fee_rate: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -110,7 +111,7 @@ impl OnchainPayment { pub fn send_all_to_address( &self, address: &bitcoin::Address, retain_reserves: bool, fee_rate: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/payment/spontaneous.rs b/src/payment/spontaneous.rs index 74fa84c0e..0f3f41184 100644 --- a/src/payment/spontaneous.rs +++ b/src/payment/spontaneous.rs @@ -23,6 +23,7 @@ use crate::error::Error; use crate::logger::{log_error, log_info, LdkLogger, Logger}; use crate::payment::store::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::types::{ChannelManager, CustomTlvRecord, KeysManager, PaymentStore}; +use crate::util::locks::RwLockExt; // The default `final_cltv_expiry_delta` we apply when not set. const LDK_DEFAULT_FINAL_CLTV_EXPIRY_DELTA: u32 = 144; @@ -56,7 +57,7 @@ impl SpontaneousPayment { route_parameters: Option, custom_tlvs: Option>, preimage: Option, ) -> Result { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } @@ -206,7 +207,7 @@ impl SpontaneousPayment { /// /// [`Bolt11Payment::send_probes`]: crate::payment::Bolt11Payment pub fn send_probes(&self, amount_msat: u64, node_id: PublicKey) -> Result<(), Error> { - if !*self.is_running.read().unwrap() { + if !*self.is_running.rlck() { return Err(Error::NotRunning); } diff --git a/src/peer_store.rs b/src/peer_store.rs index ce8a9810e..5b83f25ab 100644 --- a/src/peer_store.rs +++ b/src/peer_store.rs @@ -20,6 +20,7 @@ use crate::io::{ }; use crate::logger::{log_error, LdkLogger}; use crate::types::DynStore; +use crate::util::locks::RwLockExt; use crate::{Error, SocketAddress}; pub struct PeerStore @@ -41,7 +42,7 @@ where } pub(crate) fn add_peer(&self, peer_info: PeerInfo) -> Result<(), Error> { - let mut locked_peers = self.peers.write().unwrap(); + let mut locked_peers = self.peers.wlck(); if locked_peers.contains_key(&peer_info.node_id) { return Ok(()); @@ -52,18 +53,18 @@ where } pub(crate) fn remove_peer(&self, node_id: &PublicKey) -> Result<(), Error> { - let mut locked_peers = self.peers.write().unwrap(); + let mut locked_peers = self.peers.wlck(); locked_peers.remove(node_id); self.persist_peers(&*locked_peers) } pub(crate) fn list_peers(&self) -> Vec { - self.peers.read().unwrap().values().cloned().collect() + self.peers.rlck().values().cloned().collect() } pub(crate) fn get_peer(&self, node_id: &PublicKey) -> Option { - self.peers.read().unwrap().get(node_id).cloned() + self.peers.rlck().get(node_id).cloned() } fn persist_peers(&self, locked_peers: &HashMap) -> Result<(), Error> { diff --git a/src/runtime.rs b/src/runtime.rs index 39a34ddfe..ddfb38cfd 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -16,6 +16,7 @@ use crate::config::{ BACKGROUND_TASK_SHUTDOWN_TIMEOUT_SECS, LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS, }; use crate::logger::{log_debug, log_error, log_trace, LdkLogger, Logger}; +use crate::util::locks::MutexExt; pub(crate) struct Runtime { mode: RuntimeMode, @@ -66,7 +67,7 @@ impl Runtime { where F: Future + Send + 'static, { - let mut background_tasks = self.background_tasks.lock().unwrap(); + let mut background_tasks = self.background_tasks.lck(); let runtime_handle = self.handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures @@ -78,7 +79,7 @@ impl Runtime { where F: Future + Send + 'static, { - let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap(); + let mut cancellable_background_tasks = self.cancellable_background_tasks.lck(); let runtime_handle = self.handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures @@ -90,7 +91,7 @@ impl Runtime { where F: Future + Send + 'static, { - let mut background_processor_task = self.background_processor_task.lock().unwrap(); + let mut background_processor_task = self.background_processor_task.lck(); debug_assert!(background_processor_task.is_none(), "Expected no background processor_task"); let runtime_handle = self.handle(); @@ -121,14 +122,14 @@ impl Runtime { } pub fn abort_cancellable_background_tasks(&self) { - let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lock().unwrap()); + let mut tasks = core::mem::take(&mut *self.cancellable_background_tasks.lck()); debug_assert!(tasks.len() > 0, "Expected some cancellable background_tasks"); tasks.abort_all(); self.block_on(async { while let Some(_) = tasks.join_next().await {} }) } pub fn wait_on_background_tasks(&self) { - let mut tasks = core::mem::take(&mut *self.background_tasks.lock().unwrap()); + let mut tasks = core::mem::take(&mut *self.background_tasks.lck()); debug_assert!(tasks.len() > 0, "Expected some background_tasks"); self.block_on(async { loop { @@ -160,9 +161,7 @@ impl Runtime { } pub fn wait_on_background_processor_task(&self) { - if let Some(background_processor_task) = - self.background_processor_task.lock().unwrap().take() - { + if let Some(background_processor_task) = self.background_processor_task.lck().take() { let abort_handle = background_processor_task.abort_handle(); // Since it seems to make a difference to `tokio` (see // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures diff --git a/src/scoring.rs b/src/scoring.rs index 3ed7b9d1e..89c314558 100644 --- a/src/scoring.rs +++ b/src/scoring.rs @@ -13,6 +13,7 @@ use crate::io::utils::write_external_pathfinding_scores_to_cache; use crate::logger::LdkLogger; use crate::runtime::Runtime; use crate::types::DynStore; +use crate::util::locks::{MutexExt, RwLockExt}; use crate::{write_node_metrics, Logger, NodeMetrics, Scorer}; /// Start a background task that periodically downloads scores via an external url and merges them into the local @@ -82,10 +83,11 @@ async fn sync_external_scores( log_error!(logger, "Failed to persist external scores to cache: {}", e); } - let duration_since_epoch = - SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); - scorer.lock().unwrap().merge(liquidities, duration_since_epoch); - let mut locked_node_metrics = node_metrics.write().unwrap(); + let duration_since_epoch = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("current time should not be earlier than the Unix epoch"); + scorer.lck().merge(liquidities, duration_since_epoch); + let mut locked_node_metrics = node_metrics.wlck(); locked_node_metrics.latest_pathfinding_scores_sync_timestamp = Some(duration_since_epoch.as_secs()); write_node_metrics(&*locked_node_metrics, &*kv_store, logger).unwrap_or_else(|e| { diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0e80a46db..fbaac8a3b 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -60,6 +60,7 @@ use crate::payment::{ PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, }; use crate::types::{Broadcaster, PaymentStore, PendingPaymentStore}; +use crate::util::locks::MutexExt; use crate::{ChainSource, Error}; pub(crate) enum OnchainSendAmount { @@ -115,21 +116,20 @@ impl Wallet { } pub(crate) fn get_full_scan_request(&self) -> FullScanRequest { - self.inner.lock().unwrap().start_full_scan().build() + self.inner.lck().start_full_scan().build() } pub(crate) fn get_incremental_sync_request(&self) -> SyncRequest<(KeychainKind, u32)> { - self.inner.lock().unwrap().start_sync_with_revealed_spks().build() + self.inner.lck().start_sync_with_revealed_spks().build() } pub(crate) fn get_cached_txs(&self) -> Vec> { - self.inner.lock().unwrap().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() + self.inner.lck().tx_graph().full_txs().map(|tx_node| tx_node.tx).collect() } pub(crate) fn get_unconfirmed_txids(&self) -> Vec { self.inner - .lock() - .unwrap() + .lck() .transactions() .filter(|t| t.chain_position.is_unconfirmed()) .map(|t| t.tx_node.txid) @@ -137,12 +137,12 @@ impl Wallet { } pub(crate) fn current_best_block(&self) -> BestBlock { - let checkpoint = self.inner.lock().unwrap().latest_checkpoint(); + let checkpoint = self.inner.lck().latest_checkpoint(); BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() } } pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); match locked_wallet.apply_update_events(update) { Ok(events) => { self.update_payment_store(&mut *locked_wallet, events).map_err(|e| { @@ -150,7 +150,7 @@ impl Wallet { Error::PersistenceFailed })?; - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -172,7 +172,7 @@ impl Wallet { return Ok(()); } - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); let chain_tip1 = locked_wallet.latest_checkpoint().block_id(); let wallet_txs1 = locked_wallet @@ -203,7 +203,7 @@ impl Wallet { Error::PersistenceFailed })?; - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -426,7 +426,7 @@ impl Wallet { ) -> Result { let fee_rate = self.fee_estimator.estimate_fee_rate(confirmation_target); - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); let mut tx_builder = locked_wallet.build_tx(); tx_builder.add_recipient(output_script, amount).fee_rate(fee_rate).nlocktime(locktime); @@ -454,7 +454,7 @@ impl Wallet { }, } - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -469,8 +469,8 @@ impl Wallet { } pub(crate) fn get_new_address(&self) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); let address_info = locked_wallet.reveal_next_address(KeychainKind::External); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -481,8 +481,8 @@ impl Wallet { } pub(crate) fn get_new_internal_address(&self) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); let address_info = locked_wallet.next_unused_address(KeychainKind::Internal); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -493,8 +493,8 @@ impl Wallet { } pub(crate) fn cancel_tx(&self, tx: &Transaction) -> Result<(), Error> { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); locked_wallet.cancel_tx(tx); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -508,7 +508,7 @@ impl Wallet { pub(crate) fn get_balances( &self, total_anchor_channels_reserve_sats: u64, ) -> Result<(u64, u64), Error> { - let balance = self.inner.lock().unwrap().balance(); + let balance = self.inner.lck().balance(); // Make sure `list_confirmed_utxos` returns at least one `Utxo` we could use to spend/bump // Anchors if we have any confirmed amounts. @@ -644,7 +644,7 @@ impl Wallet { pub(crate) fn get_max_funding_amount( &self, cur_anchor_reserve_sats: u64, fee_rate: FeeRate, ) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); // Use a dummy P2WSH script (34 bytes) to match the size of a real funding output. let dummy_p2wsh_script = ScriptBuf::new().to_p2wsh(); @@ -668,7 +668,7 @@ impl Wallet { &self, shared_input: Input, shared_output_script: ScriptBuf, cur_anchor_reserve_sats: u64, fee_rate: FeeRate, ) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); debug_assert!(matches!( locked_wallet.public_descriptor(KeychainKind::External), @@ -712,7 +712,7 @@ impl Wallet { fee_rate.unwrap_or_else(|| self.fee_estimator.estimate_fee_rate(confirmation_target)); let tx = { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); // Prepare the tx_builder. We properly check the reserve requirements (again) further down. let tx_builder = match send_amount { @@ -834,7 +834,7 @@ impl Wallet { }, } - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed @@ -888,8 +888,8 @@ impl Wallet { pub(crate) fn select_confirmed_utxos( &self, must_spend: Vec, must_pay_to: &[TxOut], fee_rate: FeeRate, ) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); debug_assert!(matches!( locked_wallet.public_descriptor(KeychainKind::External), @@ -964,7 +964,7 @@ impl Wallet { } fn list_confirmed_utxos_inner(&self) -> Result, ()> { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); let mut utxos = Vec::new(); let confirmed_txs: Vec = locked_wallet .transactions() @@ -1058,8 +1058,8 @@ impl Wallet { #[allow(deprecated)] fn get_change_script_inner(&self) -> Result { - let mut locked_wallet = self.inner.lock().unwrap(); - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); + let mut locked_persister = self.persister.lck(); let address_info = locked_wallet.next_unused_address(KeychainKind::Internal); locked_wallet.persist(&mut locked_persister).map_err(|e| { @@ -1071,7 +1071,7 @@ impl Wallet { #[allow(deprecated)] pub(crate) fn sign_owned_inputs(&self, unsigned_tx: Transaction) -> Result { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); let mut psbt = Psbt::from_unsigned_tx(unsigned_tx).map_err(|e| { log_error!(self.logger, "Failed to construct PSBT: {}", e); @@ -1108,7 +1108,7 @@ impl Wallet { #[allow(deprecated)] fn sign_psbt_inner(&self, mut psbt: Psbt) -> Result { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); // While BDK populates both `witness_utxo` and `non_witness_utxo` fields, LDK does not. As // BDK by default doesn't trust the witness UTXO to account for the Segwit bug, we must @@ -1256,7 +1256,7 @@ impl Wallet { }, }; - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); debug_assert!( locked_wallet.tx_details(txid).is_some(), @@ -1319,7 +1319,7 @@ impl Wallet { log_error!( self.logger, "Provided fee rate {} is too low for RBF fee bump of txid {}, required minimum fee rate: {}", - fee_rate.unwrap(), + fee_rate.expect("fee_rate.is_some() was checked above"), txid, required_fee_rate ); @@ -1380,7 +1380,7 @@ impl Wallet { }, } - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet after fee bump of {}: {}", txid, e); Error::PersistenceFailed @@ -1431,7 +1431,7 @@ impl Listen for Wallet { } fn block_connected(&self, block: &bitcoin::Block, height: u32) { - let mut locked_wallet = self.inner.lock().unwrap(); + let mut locked_wallet = self.inner.lck(); let pre_checkpoint = locked_wallet.latest_checkpoint(); if pre_checkpoint.height() != height - 1 @@ -1481,7 +1481,7 @@ impl Listen for Wallet { }, }; - let mut locked_persister = self.persister.lock().unwrap(); + let mut locked_persister = self.persister.lck(); match locked_wallet.persist(&mut locked_persister) { Ok(_) => (), Err(e) => { @@ -1513,7 +1513,7 @@ impl WalletSource for Wallet { &'a self, outpoint: OutPoint, ) -> impl Future> + Send + 'a { async move { - let locked_wallet = self.inner.lock().unwrap(); + let locked_wallet = self.inner.lck(); locked_wallet .tx_details(outpoint.txid) .map(|tx_details| tx_details.tx.deref().clone()) From e972486f9752b515ca56326319df089a93dbef3a Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 11:55:50 +0200 Subject: [PATCH 03/12] Replace remaining unreachable unwraps with expects Document the internal invariants behind the remaining non-lock unwrap sites so panic paths explain why they should be unreachable, while keeping the known reachable cases explicit for later handling. Co-Authored-By: HAL 9000 --- build.rs | 3 ++- src/balance.rs | 4 +++- src/ffi/types.rs | 4 +++- src/lnurl_auth.rs | 4 +++- src/types.rs | 1 + src/wallet/ser.rs | 49 +++++++++++++++++++++++++++++++++-------------- 6 files changed, 47 insertions(+), 18 deletions(-) diff --git a/build.rs b/build.rs index f011148e7..2e080ddcd 100644 --- a/build.rs +++ b/build.rs @@ -7,5 +7,6 @@ fn main() { #[cfg(feature = "uniffi")] - uniffi::generate_scaffolding("bindings/ldk_node.udl").unwrap(); + uniffi::generate_scaffolding("bindings/ldk_node.udl") + .expect("the checked-in UniFFI UDL should always generate scaffolding"); } diff --git a/src/balance.rs b/src/balance.rs index 6c6ad946d..2339c83e1 100644 --- a/src/balance.rs +++ b/src/balance.rs @@ -232,7 +232,9 @@ impl LightningBalance { inbound_htlc_rounded_msat, } => { // unwrap safety: confirmed_balance_candidate_index is guaranteed to index into balance_candidates - let balance = balance_candidates.get(confirmed_balance_candidate_index).unwrap(); + let balance = balance_candidates + .get(confirmed_balance_candidate_index) + .expect("LDK should provide a valid confirmed balance candidate index"); Self::ClaimableOnChannelClose { channel_id, diff --git a/src/ffi/types.rs b/src/ffi/types.rs index 5a1420882..4a0fbfa32 100644 --- a/src/ffi/types.rs +++ b/src/ffi/types.rs @@ -917,7 +917,9 @@ uniffi::custom_type!(PaymentHash, String, { } }, lower: |obj| { - Sha256::from_slice(&obj.0).unwrap().to_string() + Sha256::from_slice(&obj.0) + .expect("PaymentHash should always contain exactly 32 bytes") + .to_string() }, }); diff --git a/src/lnurl_auth.rs b/src/lnurl_auth.rs index 1a0def47c..0fef8dd72 100644 --- a/src/lnurl_auth.rs +++ b/src/lnurl_auth.rs @@ -182,7 +182,9 @@ fn linking_key_path(hashing_key: &[u8; 32], domain_name: &str) -> Vec for ChannelDetails { + #[allow(clippy::unwrap_used)] fn from(value: LdkChannelDetails) -> Self { ChannelDetails { channel_id: value.channel_id, diff --git a/src/wallet/ser.rs b/src/wallet/ser.rs index c1ad984e6..c6a707bcd 100644 --- a/src/wallet/ser.rs +++ b/src/wallet/ser.rs @@ -94,7 +94,9 @@ impl Readable for ChangeSetDeserWrapper { decode_tlv_stream!(reader, { (0, blocks, required), }); - Ok(Self(BdkLocalChainChangeSet { blocks: blocks.0.unwrap() })) + Ok(Self(BdkLocalChainChangeSet { + blocks: blocks.0.expect("required blocks TLV field should be present"), + })) } } @@ -141,10 +143,10 @@ impl Readable for ChangeSetDeserWrapper> (0, time, required), (2, txid, required), }); - set.insert((time.0.unwrap().0, txid.0.unwrap())); + set.insert(( + time.0.expect("required confirmation time TLV field should be present").0, + txid.0.expect("required txid TLV field should be present"), + )); } Ok(Self(set)) } @@ -205,7 +210,7 @@ impl Readable for ChangeSetDeserWrapper>> { read_tlv_fields!(reader, { (0, tx, required), }); - set.insert(Arc::new(tx.0.unwrap())); + set.insert(Arc::new(tx.0.expect("required transaction TLV field should be present"))); } Ok(Self(set)) } @@ -232,8 +237,10 @@ impl Readable for ChangeSetDeserWrapper { }); Ok(Self(ConfirmationBlockTime { - block_id: block_id.0.unwrap().0, - confirmation_time: confirmation_time.0.unwrap(), + block_id: block_id.0.expect("required block_id TLV field should be present").0, + confirmation_time: confirmation_time + .0 + .expect("required confirmation_time TLV field should be present"), })) } } @@ -257,7 +264,10 @@ impl Readable for ChangeSetDeserWrapper { (2, hash, required), }); - Ok(Self(BlockId { height: height.0.unwrap(), hash: hash.0.unwrap() })) + Ok(Self(BlockId { + height: height.0.expect("required height TLV field should be present"), + hash: hash.0.expect("required hash TLV field should be present"), + })) } } @@ -285,7 +295,10 @@ impl Readable for ChangeSetDeserWrapper { decode_tlv_stream!(reader, { (0, last_revealed, required) }); Ok(Self(BdkIndexerChangeSet { - last_revealed: last_revealed.0.unwrap().0, + last_revealed: last_revealed + .0 + .expect("required last_revealed TLV field should be present") + .0, spk_cache: Default::default(), })) } @@ -317,7 +330,10 @@ impl Readable for ChangeSetDeserWrapper> { (0, descriptor_id, required), (2, last_index, required), }); - set.insert(descriptor_id.0.unwrap().0, last_index.0.unwrap()); + set.insert( + descriptor_id.0.expect("required descriptor_id TLV field should be present").0, + last_index.0.expect("required last_index TLV field should be present"), + ); } Ok(Self(set)) } @@ -336,7 +352,9 @@ impl Readable for ChangeSetDeserWrapper { decode_tlv_stream!(reader, { (0, hash, required) }); - Ok(Self(DescriptorId(hash.0.unwrap().0))) + Ok(Self(DescriptorId( + hash.0.expect("required descriptor hash TLV field should be present").0, + ))) } } @@ -351,6 +369,9 @@ impl Readable for ChangeSetDeserWrapper { use bitcoin::hashes::Hash; let buf: [u8; 32] = Readable::read(reader)?; - Ok(Self(Sha256Hash::from_slice(&buf[..]).unwrap())) + Ok(Self( + Sha256Hash::from_slice(&buf[..]) + .expect("a 32-byte buffer should decode into a sha256 hash"), + )) } } From acc1cdca2440d1c32d312b3f4ad7933d1d9ac984 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:02:03 +0200 Subject: [PATCH 04/12] Handle inbound connection setup errors Log and skip runtime listener failures instead of panicking when accepting inbound connections or converting accepted sockets. These errors can happen in normal operation, so keeping the node running is safer than treating them as unreachable. Co-Authored-By: HAL 9000 --- src/lib.rs | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index da5e13dee..c13a9bdd3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -421,12 +421,23 @@ impl Node { break; } res = listener.accept() => { - #[allow(clippy::unwrap_used)] - let tcp_stream = res.unwrap().0; + let tcp_stream = match res { + Ok((tcp_stream, _)) => tcp_stream, + Err(e) => { + log_error!(logger, "Failed to accept inbound connection: {}", e); + continue; + }, + }; let peer_mgr = Arc::clone(&peer_mgr); + let logger = Arc::clone(&logger); runtime.spawn_cancellable_background_task(async move { - #[allow(clippy::unwrap_used)] - let tcp_stream = tcp_stream.into_std().unwrap(); + let tcp_stream = match tcp_stream.into_std() { + Ok(tcp_stream) => tcp_stream, + Err(e) => { + log_error!(logger, "Failed to convert inbound connection: {}", e); + return; + }, + }; lightning_net_tokio::setup_inbound( Arc::clone(&peer_mgr), tcp_stream, From b116c852d6ac489dadd8149a7b242d9ae841ac8f Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:04:20 +0200 Subject: [PATCH 05/12] Document outbound payment amount invariant Replace the success-path unwrap on payment amounts with an expect that explains why outbound payments must already have a recorded amount by the time LDK reports them as sent. Co-Authored-By: HAL 9000 --- src/event.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/event.rs b/src/event.rs index e2f953e6a..d6c9d41d6 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1092,8 +1092,9 @@ where }; self.payment_store.get(&payment_id).map(|payment| { - #[allow(clippy::unwrap_used)] - let amount_msat = payment.amount_msat.unwrap(); + let amount_msat = payment.amount_msat.expect( + "outbound payments should record their amount before they can succeed", + ); log_info!( self.logger, "Successfully sent payment of {}msat{} from \ From 07156ce6f62a5b95d7dd3250b1b2ea64fd775369 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:08:38 +0200 Subject: [PATCH 06/12] Document ChannelPending temporary id invariant Replace the pending-channel unwrap with an expect that records why supported LDK Node state should always include the former temporary channel id. Older rust-lightning state could omit it, but LDK Node never shipped before that field existed. Co-Authored-By: HAL 9000 --- src/event.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/event.rs b/src/event.rs index d6c9d41d6..65920775b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1452,8 +1452,9 @@ where counterparty_node_id, ); - #[allow(clippy::unwrap_used)] - let former_temporary_channel_id = former_temporary_channel_id.unwrap(); + let former_temporary_channel_id = former_temporary_channel_id.expect( + "LDK Node has only ever persisted ChannelPending events from rust-lightning 0.0.115 or later", + ); let event = Event::ChannelPending { channel_id, From 185efd28f6c53fa1eb3d16d3b7e1acbda6b7b513 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:12:43 +0200 Subject: [PATCH 07/12] Document supported channel details invariants Replace the remaining channel-details unwraps with expects that point to the supported LDK Node upgrade boundary. The missing fields only occur in rust-lightning versions older than the ldk-node v0.1.0 baseline. Co-Authored-By: HAL 9000 --- src/types.rs | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/types.rs b/src/types.rs index 1aa8c3357..d90b978d2 100644 --- a/src/types.rs +++ b/src/types.rs @@ -561,7 +561,6 @@ pub struct ChannelDetails { } impl From for ChannelDetails { - #[allow(clippy::unwrap_used)] fn from(value: LdkChannelDetails) -> Self { ChannelDetails { channel_id: value.channel_id, @@ -574,9 +573,9 @@ impl From for ChannelDetails { channel_value_sats: value.channel_value_satoshis, unspendable_punishment_reserve: value.unspendable_punishment_reserve, user_channel_id: UserChannelId(value.user_channel_id), - // unwrap safety: This value will be `None` for objects serialized with LDK versions - // prior to 0.0.115. - feerate_sat_per_1000_weight: value.feerate_sat_per_1000_weight.unwrap(), + feerate_sat_per_1000_weight: value + .feerate_sat_per_1000_weight + .expect("ldk-node v0.1.0 already required rust-lightning 0.0.115 feerate details"), outbound_capacity_msat: value.outbound_capacity_msat, inbound_capacity_msat: value.inbound_capacity_msat, confirmations_required: value.confirmations_required, @@ -609,11 +608,14 @@ impl From for ChannelDetails { next_outbound_htlc_limit_msat: value.next_outbound_htlc_limit_msat, next_outbound_htlc_minimum_msat: value.next_outbound_htlc_minimum_msat, force_close_spend_delay: value.force_close_spend_delay, - // unwrap safety: This field is only `None` for objects serialized prior to LDK 0.0.107 - inbound_htlc_minimum_msat: value.inbound_htlc_minimum_msat.unwrap_or(0), + inbound_htlc_minimum_msat: value.inbound_htlc_minimum_msat.expect( + "ldk-node v0.1.0 already required rust-lightning 0.0.115 inbound HTLC minimums", + ), inbound_htlc_maximum_msat: value.inbound_htlc_maximum_msat, - // unwrap safety: `config` is only `None` for LDK objects serialized prior to 0.0.109. - config: value.config.map(|c| c.into()).unwrap(), + config: value + .config + .map(|c| c.into()) + .expect("ldk-node v0.1.0 already required rust-lightning 0.0.115 channel config"), } } } From 0e5e906edc4be6b8771450d0bbb8938fed0b2e28 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:15:19 +0200 Subject: [PATCH 08/12] Propagate sqlite schema version read errors Replace the user_version query unwrap with normal io::Error propagation so database initialization failures are reported cleanly instead of panicking. Co-Authored-By: HAL 9000 --- src/io/sqlite_store/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 4d72220c5..c3990a07a 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -289,8 +289,10 @@ impl SqliteStoreInner { })?; let sql = format!("SELECT user_version FROM pragma_user_version"); - #[allow(clippy::unwrap_used)] - let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).unwrap(); + let version_res: u16 = connection.query_row(&sql, [], |row| row.get(0)).map_err(|e| { + let msg = format!("Failed to read PRAGMA user_version: {}", e); + io::Error::new(io::ErrorKind::Other, msg) + })?; if version_res == 0 { // New database, set our SCHEMA_USER_VERSION and continue From dcca9f489a91e59d3f95811a1d92d13f9d778111 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:17:26 +0200 Subject: [PATCH 09/12] Propagate VSS runtime construction errors Replace the Tokio runtime builder unwrap with io::Error propagation so VSS startup failures surface through the constructor instead of panicking. Co-Authored-By: HAL 9000 --- src/io/vss_store.rs | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 9aefaaa7f..527cf10bc 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -101,20 +101,19 @@ impl VssStore { header_provider: Arc, ) -> io::Result { let next_version = AtomicU64::new(1); - let internal_runtime = { - #[allow(clippy::unwrap_used)] - tokio::runtime::Builder::new_multi_thread() - .enable_all() - .thread_name_fn(|| { - static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); - let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); - format!("ldk-node-vss-runtime-{}", id) - }) - .worker_threads(INTERNAL_RUNTIME_WORKERS) - .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) - .build() - .unwrap() - }; + let internal_runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("ldk-node-vss-runtime-{}", id) + }) + .worker_threads(INTERNAL_RUNTIME_WORKERS) + .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) + .build() + .map_err(|e| { + io::Error::new(io::ErrorKind::Other, format!("Failed to build VSS runtime: {}", e)) + })?; let (data_encryption_key, obfuscation_master_key) = derive_data_encryption_and_obfuscation_keys(&vss_seed); From 13c8540233b382f0dd27d6980c5696332d6c21ba Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:19:48 +0200 Subject: [PATCH 10/12] Tolerate clock skew in bitcoind timing logs Use a zero-millisecond fallback for elapsed-time logging so clock adjustments do not panic the chain polling loop. Co-Authored-By: HAL 9000 --- src/chain/bitcoind.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/chain/bitcoind.rs b/src/chain/bitcoind.rs index 64cbf7829..06a316055 100644 --- a/src/chain/bitcoind.rs +++ b/src/chain/bitcoind.rs @@ -195,8 +195,7 @@ impl BitcoindChainSource { { Ok(chain_tip) => { { - #[allow(clippy::unwrap_used)] - let elapsed_ms = now.elapsed().unwrap().as_millis(); + let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); log_info!( self.logger, "Finished synchronizing listeners in {}ms", @@ -413,8 +412,7 @@ impl BitcoindChainSource { let now = SystemTime::now(); match spv_client.poll_best_tip().await { Ok((ChainTip::Better(tip), true)) => { - #[allow(clippy::unwrap_used)] - let elapsed_ms = now.elapsed().unwrap().as_millis(); + let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); log_trace!(self.logger, "Finished polling best tip in {}ms", elapsed_ms); *self.latest_chain_tip.wlck() = Some(tip); }, @@ -435,8 +433,7 @@ impl BitcoindChainSource { .await { Ok((unconfirmed_txs, evicted_txids)) => { - #[allow(clippy::unwrap_used)] - let elapsed_ms = now.elapsed().unwrap().as_millis(); + let elapsed_ms = now.elapsed().map(|d| d.as_millis()).unwrap_or(0); log_trace!( self.logger, "Finished polling mempool of size {} and {} evicted transactions in {}ms", From 3aafe0f56072faa7c82b7324b651b08c3280d0e1 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 12:31:02 +0200 Subject: [PATCH 11/12] Propagate Esplora client setup failures Return Esplora client construction failures through build-time error handling instead of panicking so invalid headers or reqwest setup errors fail node construction cleanly. Co-Authored-By: HAL 9000 --- src/builder.rs | 5 +++++ src/chain/esplora.rs | 11 ++++++----- src/chain/mod.rs | 6 +++--- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index c56f2ed54..350dbfcc8 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -190,6 +190,8 @@ pub enum BuildError { WalletSetupFailed, /// We failed to setup the logger. LoggerSetupFailed, + /// We failed to setup the configured chain source. + ChainSourceSetupFailed, /// The given network does not match the node's previously configured network. NetworkMismatch, /// The role of the node in an asynchronous payments context is not compatible with the current configuration. @@ -217,6 +219,7 @@ impl fmt::Display for BuildError { Self::KVStoreSetupFailed => write!(f, "Failed to setup KVStore."), Self::WalletSetupFailed => write!(f, "Failed to setup onchain wallet."), Self::LoggerSetupFailed => write!(f, "Failed to setup the logger."), + Self::ChainSourceSetupFailed => write!(f, "Failed to setup the chain source."), Self::InvalidNodeAlias => write!(f, "Given node alias is invalid."), Self::NetworkMismatch => { write!(f, "Given network does not match the node's previously configured network.") @@ -1298,6 +1301,7 @@ fn build_with_store_internal( Arc::clone(&logger), Arc::clone(&node_metrics), ) + .map_err(|()| BuildError::ChainSourceSetupFailed)? }, Some(ChainDataSourceConfig::Electrum { server_url, sync_config }) => { let sync_config = sync_config.unwrap_or(ElectrumSyncConfig::default()); @@ -1367,6 +1371,7 @@ fn build_with_store_internal( Arc::clone(&logger), Arc::clone(&node_metrics), ) + .map_err(|()| BuildError::ChainSourceSetupFailed)? }, }; let chain_source = Arc::new(chain_source); diff --git a/src/chain/esplora.rs b/src/chain/esplora.rs index d93a2fbc6..649f2c8c1 100644 --- a/src/chain/esplora.rs +++ b/src/chain/esplora.rs @@ -46,7 +46,7 @@ impl EsploraChainSource { server_url: String, headers: HashMap, sync_config: EsploraSyncConfig, fee_estimator: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> Self { + ) -> Result { let mut client_builder = esplora_client::Builder::new(&server_url); client_builder = client_builder.timeout(sync_config.timeouts_config.per_request_timeout_secs as u64); @@ -55,14 +55,15 @@ impl EsploraChainSource { client_builder = client_builder.header(header_name, header_value); } - #[allow(clippy::unwrap_used)] - let esplora_client = client_builder.build_async().unwrap(); + let esplora_client = client_builder.build_async().map_err(|e| { + log_error!(logger, "Failed to build Esplora client: {}", e); + })?; let tx_sync = Arc::new(EsploraSyncClient::from_client(esplora_client.clone(), Arc::clone(&logger))); let onchain_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); let lightning_wallet_sync_status = Mutex::new(WalletSyncStatus::Completed); - Self { + Ok(Self { sync_config, esplora_client, onchain_wallet_sync_status, @@ -73,7 +74,7 @@ impl EsploraChainSource { config, logger, node_metrics, - } + }) } pub(super) async fn sync_onchain_wallet( diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 03df993d0..3f0843d4c 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -102,7 +102,7 @@ impl ChainSource { fee_estimator: Arc, tx_broadcaster: Arc, kv_store: Arc, config: Arc, logger: Arc, node_metrics: Arc>, - ) -> (Self, Option) { + ) -> Result<(Self, Option), ()> { let esplora_chain_source = EsploraChainSource::new( server_url, headers, @@ -112,10 +112,10 @@ impl ChainSource { config, Arc::clone(&logger), node_metrics, - ); + )?; let kind = ChainSourceKind::Esplora(esplora_chain_source); let registered_txids = Mutex::new(Vec::new()); - (Self { kind, registered_txids, tx_broadcaster, logger }, None) + Ok((Self { kind, registered_txids, tx_broadcaster, logger }, None)) } pub(crate) fn new_electrum( From 2eb9f091c2a75f395c46ffc80f3887d4045d82d2 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Fri, 3 Apr 2026 11:55:56 +0200 Subject: [PATCH 12/12] Ban new library unwraps in CI Fail library clippy runs when new unwrap calls are introduced so the unwrap policy stays enforced without pulling tests, benches, or docs into the restriction. Co-Authored-By: HAL 9000 --- .github/workflows/rust.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 188bee166..baac8ec8b 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -81,6 +81,14 @@ jobs: - name: Check release build with UniFFI support on Rust ${{ matrix.toolchain }} if: matrix.build-uniffi run: cargo check --release --features uniffi --verbose --color always + - name: Ban unwrap in library code on Rust ${{ matrix.toolchain }} + if: matrix.build-uniffi + env: + RUSTFLAGS: "" + run: | + rustup component add clippy + cargo clippy --lib --verbose --color always -- -A warnings -D clippy::unwrap_used -A clippy::tabs_in_doc_comments + cargo clippy --lib --features uniffi --verbose --color always -- -A warnings -D clippy::unwrap_used -A clippy::tabs_in_doc_comments - name: Test on Rust ${{ matrix.toolchain }} if: "matrix.platform != 'windows-latest'" run: |