From 8ad8775405093957f8927b4b0b97b941a82c00a6 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 11:27:40 +0100 Subject: [PATCH 1/6] Implement tiered storage This commit: Adds `TierStore`, a tiered `KVStore`/`KVStoreSync` implementation that routes node persistence across three storage roles: - a primary store for durable, authoritative data - an optional backup store for a second durable copy of primary-backed data - an optional ephemeral store for rebuildable cached data such as the network graph and scorer TierStore routes ephemeral cache data to the ephemeral store when configured, while durable data remains primary+backup. Reads and lists do not consult the backup store during normal operation. For primary+backup writes and removals, this implementation treats the backup store as part of the persistence success path rather than as a best-effort background mirror. Earlier designs used asynchronous backup queueing to avoid blocking the primary path, but that weakens the durability contract by allowing primary success to be reported before backup persistence has completed. TierStore now issues primary and backup operations together and only returns success once both complete. This gives callers a clearer persistence guarantee when a backup store is configured: acknowledged primary+backup mutations have been attempted against both durable stores. The tradeoff is that dual-store operations are not atomic across stores, so an error may still be returned after one store has already been updated. TierStore also implements `KVStoreSync` in terms of dedicated synchronous helpers that call the wrapped stores' sync interfaces directly. This preserves the inner stores' synchronous semantics instead of routing sync operations through a previously held async runtime. Additionally, adds unit coverage for the current contract, including: - basic read/write/remove/list persistence - routing of ephemeral data away from the primary store - backup participation in the foreground success path for writes and removals --- src/io/mod.rs | 1 + src/io/tier_store.rs | 906 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 907 insertions(+) create mode 100644 src/io/tier_store.rs diff --git a/src/io/mod.rs b/src/io/mod.rs index e16a99975..c7dfe7af0 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -12,6 +12,7 @@ pub mod postgres_store; pub mod sqlite_store; #[cfg(test)] pub(crate) mod test_utils; +pub(crate) mod tier_store; pub(crate) mod utils; pub mod vss_store; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs new file mode 100644 index 000000000..13f17862f --- /dev/null +++ b/src/io/tier_store.rs @@ -0,0 +1,906 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. +#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. + +use crate::io::utils::check_namespace_key_validity; +use crate::logger::{LdkLogger, Logger}; +use crate::types::DynStore; + +use lightning::util::persist::{ + KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, +}; +use lightning::{io, log_error}; + +use std::future::Future; +use std::sync::Arc; + +/// A 3-tiered [`KVStore`]/[`KVStoreSync`] implementation that routes data across +/// storage backends that may be local or remote: +/// - a primary store for durable, authoritative persistence, +/// - an optional backup store that maintains an additional durable copy of +/// primary-backed data, and +/// - an optional ephemeral store for non-critical, rebuildable cached data. +/// +/// When a backup store is configured, writes and removals for primary-backed data +/// are issued to the primary and backup stores concurrently and only succeed once +/// both stores complete successfully. +/// +/// Reads and lists do not consult the backup store during normal operation. +/// Ephemeral data is read from and written to the ephemeral store when configured. +/// +/// Note that dual-store writes and removals are not atomic across the primary and +/// backup stores. If one store succeeds and the other fails, the operation +/// returns an error even though one store may already reflect the change. +pub(crate) struct TierStore { + inner: Arc, +} + +impl TierStore { + pub fn new(primary_store: Arc, logger: Arc) -> Self { + let inner = Arc::new(TierStoreInner::new(primary_store, Arc::clone(&logger))); + + Self { inner } + } + + /// Configures a backup store for primary-backed data. + /// + /// Once set, writes and removals targeting the primary tier succeed only if both + /// the primary and backup stores succeed. The two operations are issued + /// concurrently, and any failure is returned to the caller. + /// + /// Note: dual-store writes/removals are not atomic. An error may be returned + /// after the primary store has already been updated if the backup store fails. + /// + /// The backup store is not consulted for normal reads or lists. + pub fn set_backup_store(&mut self, backup: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.backup_store = Some(backup); + } + + /// Configures the ephemeral store for non-critical, rebuildable data. + /// + /// When configured, selected cache-like data is routed to this store instead of + /// the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral: Arc) { + debug_assert_eq!(Arc::strong_count(&self.inner), 1); + + let inner = Arc::get_mut(&mut self.inner).expect( + "TierStore should not be shared during configuration. No other references should exist", + ); + + inner.ephemeral_store = Some(ephemeral); + } +} + +impl KVStore for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await } + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.write_internal(primary_namespace, secondary_namespace, key, buf).await } + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> impl Future> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + + async move { inner.remove_internal(primary_namespace, secondary_namespace, key, lazy).await } + } + + fn list( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> impl Future, io::Error>> + 'static + Send { + let inner = Arc::clone(&self.inner); + + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + + async move { inner.list_internal(primary_namespace, secondary_namespace).await } + } +} + +impl KVStoreSync for TierStore { + fn read( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + self.inner.read_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + ) + } + + fn write( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + self.inner.write_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + buf, + ) + } + + fn remove( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + self.inner.remove_internal_sync( + primary_namespace.to_string(), + secondary_namespace.to_string(), + key.to_string(), + lazy, + ) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { + self.inner + .list_internal_sync(primary_namespace.to_string(), secondary_namespace.to_string()) + } +} + +struct TierStoreInner { + /// The authoritative store for durable data. + primary_store: Arc, + /// The store used for non-critical, rebuildable cached data. + ephemeral_store: Option>, + /// An optional second durable store for primary-backed data. + backup_store: Option>, + logger: Arc, +} + +impl TierStoreInner { + /// Creates a tier store with the primary data store. + pub fn new(primary_store: Arc, logger: Arc) -> Self { + Self { primary_store, ephemeral_store: None, backup_store: None, logger } + } + + /// Reads from the primary data store. + async fn read_primary( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStore::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) + .await + { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + fn read_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> io::Result> { + match KVStoreSync::read( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + ) { + Ok(data) => Ok(data), + Err(e) => { + log_error!( + self.logger, + "Failed to read from primary store for key {}/{}/{}: {}.", + primary_namespace, + secondary_namespace, + key, + e + ); + Err(e) + }, + } + } + + /// Lists keys from the primary data store. + async fn list_primary( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStore::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + .await + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list from primary store for namespace {}/{}: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + fn list_primary_sync( + &self, primary_namespace: &str, secondary_namespace: &str, + ) -> io::Result> { + match KVStoreSync::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) + { + Ok(keys) => Ok(keys), + Err(e) => { + log_error!( + self.logger, + "Failed to list keys in namespace {}/{} from primary store: {}.", + primary_namespace, + secondary_namespace, + e + ); + Err(e) + }, + } + } + + async fn write_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + let primary_fut = KVStore::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn write_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf.clone(), + ); + let backup_res = KVStoreSync::write( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ); + + self.handle_primary_backup_results( + "write", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::write( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + buf, + ) + } + } + + async fn remove_primary_backup_async( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + let primary_fut = KVStore::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + if let Some(backup_store) = self.backup_store.as_ref() { + let backup_fut = KVStore::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + let (primary_res, backup_res) = tokio::join!(primary_fut, backup_fut); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + primary_fut.await + } + } + + fn remove_primary_backup_sync( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + ) -> io::Result<()> { + if let Some(backup_store) = self.backup_store.as_ref() { + let primary_res = KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + let backup_res = KVStoreSync::remove( + backup_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ); + + self.handle_primary_backup_results( + "removal", + primary_namespace, + secondary_namespace, + key, + primary_res, + backup_res, + ) + } else { + KVStoreSync::remove( + self.primary_store.as_ref(), + primary_namespace, + secondary_namespace, + key, + lazy, + ) + } + } + + async fn read_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await + } else { + self.read_primary(&primary_namespace, &secondary_namespace, &key).await + } + } + + fn read_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "read", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key) + } else { + self.read_primary_sync(&primary_namespace, &secondary_namespace, &key) + } + } + + async fn write_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } else { + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + } + } + + fn write_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "write", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::write( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } else { + self.write_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + } + } + + async fn remove_internal( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(eph_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } else { + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + } + } + + fn remove_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, + ) -> io::Result<()> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + Some(key.as_str()), + "remove", + )?; + + if let Some(ephemeral_store) = + self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) + { + KVStoreSync::remove( + ephemeral_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } else { + self.remove_primary_backup_sync( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + } + } + + async fn list_internal( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store lists here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + KVStore::list(eph_store.as_ref(), &primary_namespace, &secondary_namespace) + .await + } else { + self.list_primary(&primary_namespace, &secondary_namespace).await + } + }, + _ => self.list_primary(&primary_namespace, &secondary_namespace).await, + } + } + + fn list_internal_sync( + &self, primary_namespace: String, secondary_namespace: String, + ) -> io::Result> { + check_namespace_key_validity( + primary_namespace.as_str(), + secondary_namespace.as_str(), + None, + "list", + )?; + + match (primary_namespace.as_str(), secondary_namespace.as_str()) { + ( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + ) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { + if let Some(ephemeral_store) = self.ephemeral_store.as_ref() { + KVStoreSync::list( + ephemeral_store.as_ref(), + &primary_namespace, + &secondary_namespace, + ) + } else { + self.list_primary_sync(&primary_namespace, &secondary_namespace) + } + }, + _ => self.list_primary_sync(&primary_namespace, &secondary_namespace), + } + } + + fn ephemeral_store( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> Option<&Arc> { + self.ephemeral_store + .as_ref() + .filter(|_s| is_ephemeral_cached_key(primary_namespace, secondary_namespace, key)) + } + + fn handle_primary_backup_results( + &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_res: io::Result<()>, backup_res: io::Result<()>, + ) -> io::Result<()> { + match (primary_res, backup_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(primary_err), Ok(())) => Err(primary_err), + (Ok(()), Err(backup_err)) => Err(backup_err), + (Err(primary_err), Err(backup_err)) => { + log_error!( + self.logger, + "Primary and backup {}s both failed for key {}/{}/{}: primary={}, backup={}", + op, + primary_namespace, + secondary_namespace, + key, + primary_err, + backup_err + ); + Err(primary_err) + }, + } + } +} + +fn is_ephemeral_cached_key(pn: &str, sn: &str, key: &str) -> bool { + matches!( + (pn, sn, key), + (NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, _, NETWORK_GRAPH_PERSISTENCE_KEY) + | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _, SCORER_PERSISTENCE_KEY) + ) +} + +#[cfg(test)] +mod tests { + use std::panic::RefUnwindSafe; + use std::path::PathBuf; + use std::sync::Arc; + + use lightning::util::logger::Level; + use lightning::util::persist::{ + CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + }; + use lightning_persister::fs_store::v1::FilesystemStore; + + use crate::io::test_utils::{do_read_write_remove_list_persist, random_storage_path}; + use crate::io::tier_store::TierStore; + use crate::logger::Logger; + use crate::types::DynStore; + use crate::types::DynStoreWrapper; + + use super::*; + + impl RefUnwindSafe for TierStore {} + + struct CleanupDir(PathBuf); + impl Drop for CleanupDir { + fn drop(&mut self) { + let _ = std::fs::remove_dir_all(&self.0); + } + } + + fn setup_tier_store(primary_store: Arc, logger: Arc) -> TierStore { + TierStore::new(primary_store, logger) + } + + #[test] + fn write_read_list_remove() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let tier = setup_tier_store(primary_store, logger); + + do_read_write_remove_list_persist(&tier); + } + + #[test] + fn ephemeral_routing() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let ephemeral_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("ephemeral")))); + tier.set_ephemeral_store(Arc::clone(&ephemeral_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read_ng = KVStoreSync::read( + &*primary_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + let ephemeral_read_ng = KVStoreSync::read( + &*ephemeral_store, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ); + + let primary_read_cm = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let ephemeral_read_cm = KVStoreSync::read( + &*ephemeral_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert!(primary_read_ng.is_err()); + assert_eq!(ephemeral_read_ng.unwrap(), data); + + assert!(ephemeral_read_cm.is_err()); + assert_eq!(primary_read_cm.unwrap(), data); + } + + #[test] + fn backup_write_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + data.clone(), + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ); + + assert_eq!(primary_read.unwrap(), data); + assert_eq!(backup_read.unwrap(), data); + } + + #[test] + fn backup_remove_is_part_of_success_path() { + let base_dir = random_storage_path(); + let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); + let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); + + let _cleanup = CleanupDir(base_dir.clone()); + + let primary_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); + let mut tier = setup_tier_store(Arc::clone(&primary_store), logger); + + let backup_store: Arc = + Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("backup")))); + tier.set_backup_store(Arc::clone(&backup_store)); + + let data = vec![42u8; 32]; + let key = CHANNEL_MANAGER_PERSISTENCE_KEY; + + KVStoreSync::write( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + data, + ) + .unwrap(); + + KVStoreSync::remove( + &tier, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + true, + ) + .unwrap(); + + let primary_read = KVStoreSync::read( + &*primary_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + let backup_read = KVStoreSync::read( + &*backup_store, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ); + + assert!(primary_read.is_err()); + assert!(backup_read.is_err()); + } +} From 7e6606c8202e8c20c1d14a2eecfc96a795e131b1 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Tue, 7 Apr 2026 19:17:22 +0100 Subject: [PATCH 2/6] Integrate TierStore into NodeBuilder Add native builder support for tiered storage by introducing `TierStoreConfig` and builder methods for configuring ephemeral storage and a local SQLite backup mirror. During node construction, wrap the configured primary store in `TierStore` and attach secondary tiers for cache-like ephemeral data and mirrored durable backup writes. The builder constructs the backup store internally using a dedicated SQLite database file and rejects configurations where the backup path conflicts with the primary storage path. Add test coverage for full-cycle backup mirroring, same-path rejection, and UniFFI-backed builder configuration. Update `setup_builder!` so FFI-backed builder tests can use mutable configuration helpers. --- src/builder.rs | 139 ++++++++++- src/io/sqlite_store/mod.rs | 2 + src/io/tier_store.rs | 418 ++++++-------------------------- src/types.rs | 4 +- tests/common/mod.rs | 20 +- tests/integration_tests_rust.rs | 74 ++++++ 6 files changed, 307 insertions(+), 350 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index c88c867cc..fb8a11c2c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -57,6 +57,7 @@ use crate::event::EventQueue; use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; +use crate::io::tier_store::TierStore; use crate::io::utils::{ open_or_migrate_fs_store, read_all_objects, read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, read_node_metrics, @@ -154,6 +155,21 @@ impl std::fmt::Debug for LogWriterConfig { } } +#[derive(Default)] +struct TierStoreConfig { + ephemeral: Option>, + backup_storage_dir_path: Option, +} + +impl std::fmt::Debug for TierStoreConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TierStoreConfig") + .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) + .field("backup_storage_dir_path", &self.backup_storage_dir_path) + .finish() + } +} + /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -200,6 +216,11 @@ pub enum BuildError { AsyncPaymentsConfigMismatch, /// An attempt to setup a DNS Resolver failed. DNSResolverSetupFailed, + /// The configured backup storage path conflicts with the primary storage path. + /// + /// Backup storage must use a distinct local directory so that the primary and + /// backup stores do not point to the same SQLite database. + BackupStorePathConflict, } impl fmt::Display for BuildError { @@ -237,6 +258,12 @@ impl fmt::Display for BuildError { Self::DNSResolverSetupFailed => { write!(f, "An attempt to setup a DNS resolver has failed.") }, + Self::BackupStorePathConflict => { + write!( + f, + "The configured backup storage path conflicts with the primary storage path." + ) + }, } } } @@ -289,6 +316,7 @@ pub struct NodeBuilder { liquidity_source_config: Option, log_writer_config: Option, async_payments_role: Option, + tier_store_config: Option, runtime_handle: Option, pathfinding_scores_sync_config: Option, recovery_mode: bool, @@ -307,6 +335,7 @@ impl NodeBuilder { let gossip_source_config = None; let liquidity_source_config = None; let log_writer_config = None; + let tier_store_config = None; let runtime_handle = None; let pathfinding_scores_sync_config = None; let recovery_mode = false; @@ -316,6 +345,7 @@ impl NodeBuilder { gossip_source_config, liquidity_source_config, log_writer_config, + tier_store_config, runtime_handle, async_payments_role: None, pathfinding_scores_sync_config, @@ -625,6 +655,42 @@ impl NodeBuilder { self } + /// Configures a local SQLite backup store for disaster recovery. + /// + /// When building with tiered storage, a SQLite store will be created at the + /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database + /// file name. It receives a second durable copy of data written to the + /// primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup SQLite stores complete successfully. + /// + /// The configured path must point to a distinct local directory from the + /// primary storage path. If the backup path equals the primary storage path, + /// building will fail with [`BuildError::BackupStorePathConflict`]. + /// + /// If not set, durable data will be stored only in the primary store. + /// + /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME + pub fn set_backup_storage_dir_path(&mut self, backup_storage_dir_path: String) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.backup_storage_dir_path = Some(backup_storage_dir_path.into()); + self + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); + tier_store_config.ephemeral = Some(ephemeral_store); + self + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: NodeEntropy) -> Result { @@ -826,11 +892,18 @@ impl NodeBuilder { } /// Builds a [`Node`] instance according to the options previously configured. + /// + /// The provided `kv_store` will be used as the primary storage backend. Optionally, + /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) + /// and a local SQLite backup store for disaster recovery can be configured via + /// [`set_ephemeral_store`] and [`set_backup_storage_dir_path`]. + /// + /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, ) -> Result { let logger = setup_logger(&self.log_writer_config, &self.config)?; - self.build_with_store_and_logger(node_entropy, kv_store, logger) } @@ -855,6 +928,36 @@ impl NodeBuilder { fn build_with_store_runtime_and_logger( &self, node_entropy: NodeEntropy, kv_store: S, runtime: Arc, logger: Arc, ) -> Result { + let ts_config = self.tier_store_config.as_ref(); + let primary_store = Arc::new(DynStoreWrapper(kv_store)); + let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); + if let Some(config) = ts_config { + config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { + let primary_storage_dir_path = PathBuf::from(&self.config.storage_dir_path); + if primary_storage_dir_path == *backup_storage_dir_path { + log_error!( + logger, + "Backup storage path must differ from primary storage path: {}", + backup_storage_dir_path.display() + ); + return Err(BuildError::BackupStorePathConflict); + } + + let backup_store = SqliteStore::new( + backup_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup backup SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let backup_store: Arc = Arc::new(DynStoreWrapper(backup_store)); + tier_store.set_backup_store(backup_store); + } + } + let seed_bytes = node_entropy.to_seed_bytes(); let config = Arc::new(self.config.clone()); @@ -869,7 +972,7 @@ impl NodeBuilder { seed_bytes, runtime, logger, - Arc::new(DynStoreWrapper(kv_store)), + Arc::new(DynStoreWrapper(tier_store)), ) } } @@ -1164,6 +1267,38 @@ impl ArcedNodeBuilder { self.inner.write().expect("lock").set_wallet_recovery_mode(); } + /// Configures a local SQLite backup store for disaster recovery. + /// + /// When building with tiered storage, a SQLite store will be created at the + /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database + /// file name. It receives a second durable copy of data written to the + /// primary store. + /// + /// Writes and removals for primary-backed data only succeed once both the + /// primary and backup SQLite stores complete successfully. + /// + /// The configured path must point to a distinct local directory from the + /// primary storage path. If the backup path equals the primary storage path, + /// building will fail with [`BuildError::BackupStorePathConflict`]. + /// + /// If not set, durable data will be stored only in the primary store. + /// + /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME + pub fn set_backup_storage_dir_path(&self, backup_storage_dir_path: String) { + self.inner.write().expect("lock").set_backup_storage_dir_path(backup_storage_dir_path); + } + + /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// + /// When building with tiered storage, this store is used for ephemeral data like + /// the network graph and scorer data to reduce latency for reads. Data stored here + /// can be rebuilt if lost. + /// + /// If not set, non-critical data will be stored in the primary store. + pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { + self.inner.write().expect("lock").set_ephemeral_store(ephemeral_store); + } + /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 076aeef9b..136676799 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -24,6 +24,8 @@ mod migrations; /// LDK Node's database file name. pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite"; +/// LDK Node's backup database file name. +pub const SQLITE_BACKUP_DB_FILE_NAME: &str = "ldk_node_data_backup.sqlite"; /// LDK Node's table in which we store all data. pub const KV_TABLE_NAME: &str = "ldk_node_data"; diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 13f17862f..775c7e588 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -4,23 +4,22 @@ // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license , at your option. You may not use this file except in // accordance with one or both of these licenses. -#![allow(dead_code)] // TODO: Temporal warning silencer. Will be removed in later commit. use crate::io::utils::check_namespace_key_validity; use crate::logger::{LdkLogger, Logger}; use crate::types::DynStore; use lightning::util::persist::{ - KVStore, KVStoreSync, NETWORK_GRAPH_PERSISTENCE_KEY, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStore, NETWORK_GRAPH_PERSISTENCE_KEY, NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, }; use lightning::{io, log_error}; use std::future::Future; use std::sync::Arc; -/// A 3-tiered [`KVStore`]/[`KVStoreSync`] implementation that routes data across +/// A 3-tiered [`KVStore`] implementation that routes data across /// storage backends that may be local or remote: /// - a primary store for durable, authoritative persistence, /// - an optional backup store that maintains an additional durable copy of @@ -132,45 +131,6 @@ impl KVStore for TierStore { } } -impl KVStoreSync for TierStore { - fn read( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> io::Result> { - self.inner.read_internal_sync( - primary_namespace.to_string(), - secondary_namespace.to_string(), - key.to_string(), - ) - } - - fn write( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> io::Result<()> { - self.inner.write_internal_sync( - primary_namespace.to_string(), - secondary_namespace.to_string(), - key.to_string(), - buf, - ) - } - - fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> io::Result<()> { - self.inner.remove_internal_sync( - primary_namespace.to_string(), - secondary_namespace.to_string(), - key.to_string(), - lazy, - ) - } - - fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { - self.inner - .list_internal_sync(primary_namespace.to_string(), secondary_namespace.to_string()) - } -} - struct TierStoreInner { /// The authoritative store for durable data. primary_store: Arc, @@ -214,30 +174,6 @@ impl TierStoreInner { } } - fn read_primary_sync( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> io::Result> { - match KVStoreSync::read( - self.primary_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - ) { - Ok(data) => Ok(data), - Err(e) => { - log_error!( - self.logger, - "Failed to read from primary store for key {}/{}/{}: {}.", - primary_namespace, - secondary_namespace, - key, - e - ); - Err(e) - }, - } - } - /// Lists keys from the primary data store. async fn list_primary( &self, primary_namespace: &str, secondary_namespace: &str, @@ -259,25 +195,6 @@ impl TierStoreInner { } } - fn list_primary_sync( - &self, primary_namespace: &str, secondary_namespace: &str, - ) -> io::Result> { - match KVStoreSync::list(self.primary_store.as_ref(), primary_namespace, secondary_namespace) - { - Ok(keys) => Ok(keys), - Err(e) => { - log_error!( - self.logger, - "Failed to list keys in namespace {}/{} from primary store: {}.", - primary_namespace, - secondary_namespace, - e - ); - Err(e) - }, - } - } - async fn write_primary_backup_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { @@ -313,44 +230,6 @@ impl TierStoreInner { } } - fn write_primary_backup_sync( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, - ) -> io::Result<()> { - if let Some(backup_store) = self.backup_store.as_ref() { - let primary_res = KVStoreSync::write( - self.primary_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - buf.clone(), - ); - let backup_res = KVStoreSync::write( - backup_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - buf, - ); - - self.handle_primary_backup_results( - "write", - primary_namespace, - secondary_namespace, - key, - primary_res, - backup_res, - ) - } else { - KVStoreSync::write( - self.primary_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - buf, - ) - } - } - async fn remove_primary_backup_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, ) -> io::Result<()> { @@ -386,44 +265,6 @@ impl TierStoreInner { } } - fn remove_primary_backup_sync( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, - ) -> io::Result<()> { - if let Some(backup_store) = self.backup_store.as_ref() { - let primary_res = KVStoreSync::remove( - self.primary_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - lazy, - ); - let backup_res = KVStoreSync::remove( - backup_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - lazy, - ); - - self.handle_primary_backup_results( - "removal", - primary_namespace, - secondary_namespace, - key, - primary_res, - backup_res, - ) - } else { - KVStoreSync::remove( - self.primary_store.as_ref(), - primary_namespace, - secondary_namespace, - key, - lazy, - ) - } - } - async fn read_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, ) -> io::Result> { @@ -445,25 +286,6 @@ impl TierStoreInner { } } - fn read_internal_sync( - &self, primary_namespace: String, secondary_namespace: String, key: String, - ) -> io::Result> { - check_namespace_key_validity( - primary_namespace.as_str(), - secondary_namespace.as_str(), - Some(key.as_str()), - "read", - )?; - - if let Some(eph_store) = - self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) - { - KVStoreSync::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key) - } else { - self.read_primary_sync(&primary_namespace, &secondary_namespace, &key) - } - } - async fn write_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, ) -> io::Result<()> { @@ -496,36 +318,6 @@ impl TierStoreInner { } } - fn write_internal_sync( - &self, primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, - ) -> io::Result<()> { - check_namespace_key_validity( - primary_namespace.as_str(), - secondary_namespace.as_str(), - Some(key.as_str()), - "write", - )?; - - if let Some(ephemeral_store) = - self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) - { - KVStoreSync::write( - ephemeral_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - } else { - self.write_primary_backup_sync( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - } - } - async fn remove_internal( &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, ) -> io::Result<()> { @@ -558,36 +350,6 @@ impl TierStoreInner { } } - fn remove_internal_sync( - &self, primary_namespace: String, secondary_namespace: String, key: String, lazy: bool, - ) -> io::Result<()> { - check_namespace_key_validity( - primary_namespace.as_str(), - secondary_namespace.as_str(), - Some(key.as_str()), - "remove", - )?; - - if let Some(ephemeral_store) = - self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) - { - KVStoreSync::remove( - ephemeral_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - } else { - self.remove_primary_backup_sync( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - } - } - async fn list_internal( &self, primary_namespace: String, secondary_namespace: String, ) -> io::Result> { @@ -617,36 +379,6 @@ impl TierStoreInner { } } - fn list_internal_sync( - &self, primary_namespace: String, secondary_namespace: String, - ) -> io::Result> { - check_namespace_key_validity( - primary_namespace.as_str(), - secondary_namespace.as_str(), - None, - "list", - )?; - - match (primary_namespace.as_str(), secondary_namespace.as_str()) { - ( - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - ) - | (SCORER_PERSISTENCE_PRIMARY_NAMESPACE, _) => { - if let Some(ephemeral_store) = self.ephemeral_store.as_ref() { - KVStoreSync::list( - ephemeral_store.as_ref(), - &primary_namespace, - &secondary_namespace, - ) - } else { - self.list_primary_sync(&primary_namespace, &secondary_namespace) - } - }, - _ => self.list_primary_sync(&primary_namespace, &secondary_namespace), - } - } - fn ephemeral_store( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Option<&Arc> { @@ -722,8 +454,8 @@ mod tests { TierStore::new(primary_store, logger) } - #[test] - fn write_read_list_remove() { + #[tokio::test] + async fn write_read_list_remove() { let base_dir = random_storage_path(); let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); @@ -734,11 +466,11 @@ mod tests { Arc::new(DynStoreWrapper(FilesystemStore::new(base_dir.join("primary")))); let tier = setup_tier_store(primary_store, logger); - do_read_write_remove_list_persist(&tier); + do_read_write_remove_list_persist(&tier).await; } - #[test] - fn ephemeral_routing() { + #[tokio::test] + async fn ephemeral_routing() { let base_dir = random_storage_path(); let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); @@ -755,49 +487,53 @@ mod tests { let data = vec![42u8; 32]; - KVStoreSync::write( - &tier, + tier.write( NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, data.clone(), ) + .await .unwrap(); - KVStoreSync::write( - &tier, + tier.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, data.clone(), ) + .await .unwrap(); - let primary_read_ng = KVStoreSync::read( - &*primary_store, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - ); - let ephemeral_read_ng = KVStoreSync::read( - &*ephemeral_store, - NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, - NETWORK_GRAPH_PERSISTENCE_KEY, - ); + let primary_read_ng = primary_store + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await; + let ephemeral_read_ng = ephemeral_store + .read( + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, + NETWORK_GRAPH_PERSISTENCE_KEY, + ) + .await; - let primary_read_cm = KVStoreSync::read( - &*primary_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ); - let ephemeral_read_cm = KVStoreSync::read( - &*ephemeral_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ); + let primary_read_cm = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + let ephemeral_read_cm = ephemeral_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; assert!(primary_read_ng.is_err()); assert_eq!(ephemeral_read_ng.unwrap(), data); @@ -806,8 +542,8 @@ mod tests { assert_eq!(primary_read_cm.unwrap(), data); } - #[test] - fn backup_write_is_part_of_success_path() { + #[tokio::test] + async fn backup_write_is_part_of_success_path() { let base_dir = random_storage_path(); let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); @@ -824,34 +560,36 @@ mod tests { let data = vec![42u8; 32]; - KVStoreSync::write( - &tier, + tier.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, data.clone(), ) + .await .unwrap(); - let primary_read = KVStoreSync::read( - &*primary_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ); - let backup_read = KVStoreSync::read( - &*backup_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ); + let primary_read = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; + let backup_read = backup_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ) + .await; assert_eq!(primary_read.unwrap(), data); assert_eq!(backup_read.unwrap(), data); } - #[test] - fn backup_remove_is_part_of_success_path() { + #[tokio::test] + async fn backup_remove_is_part_of_success_path() { let base_dir = random_storage_path(); let log_path = base_dir.join("tier_store_test.log").to_string_lossy().into_owned(); let logger = Arc::new(Logger::new_fs_writer(log_path, Level::Trace).unwrap()); @@ -869,36 +607,38 @@ mod tests { let data = vec![42u8; 32]; let key = CHANNEL_MANAGER_PERSISTENCE_KEY; - KVStoreSync::write( - &tier, + tier.write( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, key, data, ) + .await .unwrap(); - KVStoreSync::remove( - &tier, + tier.remove( CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, key, true, ) + .await .unwrap(); - let primary_read = KVStoreSync::read( - &*primary_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - key, - ); - let backup_read = KVStoreSync::read( - &*backup_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - key, - ); + let primary_read = primary_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .await; + let backup_read = backup_store + .read( + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + key, + ) + .await; assert!(primary_read.is_err()); assert!(backup_read.is_err()); diff --git a/src/types.rs b/src/types.rs index 64209430b..1f3f84751 100644 --- a/src/types.rs +++ b/src/types.rs @@ -46,7 +46,7 @@ use crate::message_handler::NodeCustomMessageHandler; use crate::payment::{PaymentDetails, PendingPaymentDetails}; use crate::runtime::RuntimeSpawner; -pub(crate) trait DynStoreTrait: Send + Sync { +pub trait DynStoreTrait: Send + Sync { fn read_async( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin, bitcoin::io::Error>> + Send + 'static>>; @@ -87,7 +87,7 @@ impl<'a> KVStore for dyn DynStoreTrait + 'a { } } -pub(crate) type DynStore = dyn DynStoreTrait; +pub type DynStore = dyn DynStoreTrait; // Newtype wrapper that implements `KVStore` for `Arc`. This is needed because `KVStore` // methods return `impl Future`, which is not object-safe. `DynStoreTrait` works around this by diff --git a/tests/common/mod.rs b/tests/common/mod.rs index adeb327bf..2aa12e4b0 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -461,9 +461,7 @@ impl Default for TestConfig { macro_rules! setup_builder { ($builder:ident, $config:expr) => { - #[cfg(feature = "uniffi")] - let $builder = Builder::from_config($config.clone()); - #[cfg(not(feature = "uniffi"))] + #[allow(unused_mut)] let mut $builder = Builder::from_config($config.clone()); }; } @@ -531,7 +529,17 @@ pub(crate) fn setup_two_nodes_with_store( } pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestNode { + setup_node_with_builder(chain_source, config, |_| {}) +} + +pub(crate) fn setup_node_with_builder( + chain_source: &TestChainSource, config: TestConfig, configure_builder: F, +) -> TestNode +where + F: FnOnce(&mut Builder), +{ setup_builder!(builder, config.node_config); + match chain_source { TestChainSource::Esplora(electrsd) => { let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -590,6 +598,8 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> builder.set_wallet_recovery_mode(); } + configure_builder(&mut builder); + let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); @@ -601,10 +611,6 @@ pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> }, }; - if config.recovery_mode { - builder.set_wallet_recovery_mode(); - } - node.start().unwrap(); assert!(node.status().is_running); assert!(node.status().latest_fee_rate_cache_update_timestamp.is_some()); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 309d5bf4d..f419fb27e 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -30,6 +30,7 @@ use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; +use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::liquidity::LSPS2ServiceConfig; use ldk_node::payment::{ ConfirmationStatus, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, @@ -39,6 +40,7 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +use lightning::util::persist::KVStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; @@ -3033,3 +3035,75 @@ async fn splice_in_with_all_balance() { node_a.stop().unwrap(); node_b.stop().unwrap(); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn builder_configures_sqlite_backup_store() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = random_chain_source(&bitcoind, &electrsd); + + let mut config_a = random_config(true); + config_a.store_type = TestStoreType::Sqlite; + let primary_dir = config_a.node_config.storage_dir_path.clone(); + let backup_dir = common::random_storage_path(); + let node_a = common::setup_node_with_builder(&chain_source, config_a.clone(), |builder| { + builder.set_backup_storage_dir_path(backup_dir.to_str().unwrap().to_owned()); + }); + + let config_b = random_config(true); + let node_b = setup_node(&chain_source, config_b); + + do_channel_full_cycle( + node_a, + node_b, + &bitcoind.client, + &electrsd.client, + false, + true, + true, + false, + ) + .await; + + let primary_store = SqliteStore::new( + primary_dir.into(), + Some(ldk_node::io::sqlite_store::SQLITE_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + let backup_store = SqliteStore::new( + backup_dir, + Some(ldk_node::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), + Some(ldk_node::io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .unwrap(); + + for (pn, sn, key) in [ + ("bdk_wallet", "", "descriptor"), + ("bdk_wallet", "", "change_descriptor"), + ("bdk_wallet", "", "network"), + ("", "", "node_metrics"), + ("", "", "events"), + ("", "", "peers"), + ] { + let primary = primary_store.read(pn, sn, key).await.unwrap(); + let backup = backup_store.read(pn, sn, key).await.unwrap(); + + assert_eq!(backup, primary, "backup mismatch for {pn}/{sn}/{key}"); + } +} + +#[test] +fn sqlite_backup_rejects_primary_storage_path() { + let mut config = random_config(false); + config.store_type = TestStoreType::Sqlite; + + let primary_dir = config.node_config.storage_dir_path.clone(); + + setup_builder!(builder, config.node_config); + builder.set_backup_storage_dir_path(primary_dir); + + let res = builder.build(config.node_entropy.into()); + + assert!(matches!(res, Err(ldk_node::BuildError::BackupStorePathConflict))); +} From 1ef68f2147e830934263e4572aac022dec3b9e8e Mon Sep 17 00:00:00 2001 From: Enigbe Date: Wed, 10 Jun 2026 22:01:15 +0100 Subject: [PATCH 3/6] fixup! Integrate TierStore into NodeBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Remove set_backup_storage_dir_path and set_ephemeral_store from ArcedNodeBuilder — these tiered-storage configuration methods belong to the FFI bindings PR (#871) and should not be introduced here. - Gate builder_configures_sqlite_backup_store and sqlite_backup_rejects_primary_storage_path tests behind not exposed through the arced wrapper. - Revert the setup_node_with_builder helper added to test utils: the backup-related test now configures its builder inline, which also removes the &mut Builder requirement that forced setup_builder! to emit mut unconditionally. It's now correctly immutable when uniffi is enabled. --- src/builder.rs | 32 -------------------------------- tests/common/mod.rs | 15 +++------------ tests/integration_tests_rust.rs | 23 +++++++++++++++++++---- 3 files changed, 22 insertions(+), 48 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index fb8a11c2c..e2d015e9e 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1267,38 +1267,6 @@ impl ArcedNodeBuilder { self.inner.write().expect("lock").set_wallet_recovery_mode(); } - /// Configures a local SQLite backup store for disaster recovery. - /// - /// When building with tiered storage, a SQLite store will be created at the - /// given directory path using [`SQLITE_BACKUP_DB_FILE_NAME`] as its database - /// file name. It receives a second durable copy of data written to the - /// primary store. - /// - /// Writes and removals for primary-backed data only succeed once both the - /// primary and backup SQLite stores complete successfully. - /// - /// The configured path must point to a distinct local directory from the - /// primary storage path. If the backup path equals the primary storage path, - /// building will fail with [`BuildError::BackupStorePathConflict`]. - /// - /// If not set, durable data will be stored only in the primary store. - /// - /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME - pub fn set_backup_storage_dir_path(&self, backup_storage_dir_path: String) { - self.inner.write().expect("lock").set_backup_storage_dir_path(backup_storage_dir_path); - } - - /// Configures the ephemeral store for non-critical, frequently-accessed data. - /// - /// When building with tiered storage, this store is used for ephemeral data like - /// the network graph and scorer data to reduce latency for reads. Data stored here - /// can be rebuilt if lost. - /// - /// If not set, non-critical data will be stored in the primary store. - pub fn set_ephemeral_store(&self, ephemeral_store: Arc) { - self.inner.write().expect("lock").set_ephemeral_store(ephemeral_store); - } - /// Builds a [`Node`] instance with a [`SqliteStore`] backend and according to the options /// previously configured. pub fn build(&self, node_entropy: Arc) -> Result, BuildError> { diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 2aa12e4b0..4998d6a99 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -461,7 +461,9 @@ impl Default for TestConfig { macro_rules! setup_builder { ($builder:ident, $config:expr) => { - #[allow(unused_mut)] + #[cfg(feature = "uniffi")] + let $builder = Builder::from_config($config.clone()); + #[cfg(not(feature = "uniffi"))] let mut $builder = Builder::from_config($config.clone()); }; } @@ -529,15 +531,6 @@ pub(crate) fn setup_two_nodes_with_store( } pub(crate) fn setup_node(chain_source: &TestChainSource, config: TestConfig) -> TestNode { - setup_node_with_builder(chain_source, config, |_| {}) -} - -pub(crate) fn setup_node_with_builder( - chain_source: &TestChainSource, config: TestConfig, configure_builder: F, -) -> TestNode -where - F: FnOnce(&mut Builder), -{ setup_builder!(builder, config.node_config); match chain_source { @@ -598,8 +591,6 @@ where builder.set_wallet_recovery_mode(); } - configure_builder(&mut builder); - let node = match config.store_type { TestStoreType::TestSyncStore => { let kv_store = TestSyncStore::new(config.node_config.storage_dir_path.into()); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index f419fb27e..83f0196b7 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -30,6 +30,7 @@ use electrsd::corepc_node::Node as BitcoinD; use electrsd::ElectrsD; use ldk_node::config::{AsyncPaymentsRole, EsploraSyncConfig}; use ldk_node::entropy::NodeEntropy; +#[cfg(not(feature = "uniffi"))] use ldk_node::io::sqlite_store::SqliteStore; use ldk_node::liquidity::LSPS2ServiceConfig; use ldk_node::payment::{ @@ -40,6 +41,7 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::routing::gossip::{NodeAlias, NodeId}; use lightning::routing::router::RouteParametersConfig; +#[cfg(not(feature = "uniffi"))] use lightning::util::persist::KVStore; use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; @@ -3036,6 +3038,7 @@ async fn splice_in_with_all_balance() { node_b.stop().unwrap(); } +#[cfg(not(feature = "uniffi"))] #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn builder_configures_sqlite_backup_store() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -3045,9 +3048,20 @@ async fn builder_configures_sqlite_backup_store() { config_a.store_type = TestStoreType::Sqlite; let primary_dir = config_a.node_config.storage_dir_path.clone(); let backup_dir = common::random_storage_path(); - let node_a = common::setup_node_with_builder(&chain_source, config_a.clone(), |builder| { - builder.set_backup_storage_dir_path(backup_dir.to_str().unwrap().to_owned()); - }); + + // Build node_a with backup storage configured + setup_builder!(builder_a, config_a.node_config.clone()); + builder_a.set_chain_source_esplora( + format!("http://{}", electrsd.esplora_url.as_ref().unwrap()), + None, + ); + builder_a.set_filesystem_logger(None, None); + builder_a.set_backup_storage_dir_path(backup_dir.to_str().unwrap().to_owned()); + + let node_a = builder_a.build(config_a.node_entropy.into()).unwrap(); + node_a.start().unwrap(); + assert!(node_a.status().is_running); + assert!(node_a.status().latest_fee_rate_cache_update_timestamp.is_some()); let config_b = random_config(true); let node_b = setup_node(&chain_source, config_b); @@ -3093,6 +3107,7 @@ async fn builder_configures_sqlite_backup_store() { } } +#[cfg(not(feature = "uniffi"))] #[test] fn sqlite_backup_rejects_primary_storage_path() { let mut config = random_config(false); @@ -3100,7 +3115,7 @@ fn sqlite_backup_rejects_primary_storage_path() { let primary_dir = config.node_config.storage_dir_path.clone(); - setup_builder!(builder, config.node_config); + setup_builder!(builder, config.node_config.clone()); builder.set_backup_storage_dir_path(primary_dir); let res = builder.build(config.node_entropy.into()); From 6aea057680e2d18b05765bb45d893aca1a35d343 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Wed, 10 Jun 2026 22:31:54 +0100 Subject: [PATCH 4/6] fixup! Implement tiered storage Replace the ephemeral_store() method with direct is_ephemeral_cached_key() checks followed by self.ephemeral_store.as_ref() at each callsite, making the routing decision explicit. --- src/io/tier_store.rs | 106 +++++++++++++++++++++---------------------- 1 file changed, 52 insertions(+), 54 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 775c7e588..68b3ef729 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -275,15 +275,21 @@ impl TierStoreInner { "read", )?; - if let Some(eph_store) = - self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) - { - // We don't retry ephemeral-store reads here. Local failures are treated as - // terminal for this access path rather than falling back to another store. - KVStore::read(eph_store.as_ref(), &primary_namespace, &secondary_namespace, &key).await - } else { - self.read_primary(&primary_namespace, &secondary_namespace, &key).await + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + // We don't retry ephemeral-store reads here. Local failures are treated as + // terminal for this access path rather than falling back to another store. + return KVStore::read( + eph_store.as_ref(), + &primary_namespace, + &secondary_namespace, + &key, + ) + .await; + } } + + self.read_primary(&primary_namespace, &secondary_namespace, &key).await } async fn write_internal( @@ -296,26 +302,26 @@ impl TierStoreInner { "write", )?; - if let Some(eph_store) = - self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) - { - KVStore::write( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await - } else { - self.write_primary_backup_async( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - buf, - ) - .await + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + return KVStore::write( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await; + } } + + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await } async fn remove_internal( @@ -328,26 +334,26 @@ impl TierStoreInner { "remove", )?; - if let Some(eph_store) = - self.ephemeral_store(&primary_namespace, &secondary_namespace, &key) - { - KVStore::remove( - eph_store.as_ref(), - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await - } else { - self.remove_primary_backup_async( - primary_namespace.as_str(), - secondary_namespace.as_str(), - key.as_str(), - lazy, - ) - .await + if is_ephemeral_cached_key(&primary_namespace, &secondary_namespace, &key) { + if let Some(eph_store) = self.ephemeral_store.as_ref() { + return KVStore::remove( + eph_store.as_ref(), + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await; + } } + + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await } async fn list_internal( @@ -379,14 +385,6 @@ impl TierStoreInner { } } - fn ephemeral_store( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, - ) -> Option<&Arc> { - self.ephemeral_store - .as_ref() - .filter(|_s| is_ephemeral_cached_key(primary_namespace, secondary_namespace, key)) - } - fn handle_primary_backup_results( &self, op: &str, primary_namespace: &str, secondary_namespace: &str, key: &str, primary_res: io::Result<()>, backup_res: io::Result<()>, From 0e166a382e1a3976c7f781e726035b6011ca3364 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Thu, 11 Jun 2026 08:22:16 +0100 Subject: [PATCH 5/6] fixup! Implement tiered storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Here we serialize per-key writes in TierStore to prevent out-of-order backup updates. Without serialization, concurrent writes to the same key can interleave across the primary and backup stores (e.g. [w1.primary] → [w2.primary] → [w2.backup] → [w1.backup]), leaving the backup with stale data. By adding a per-key TokioMutex that serializes write and remove operations for a given (namespace, key) tuple, we ensure both the primary and backup stores reflect the same final value. The lock map is cleaned up after each operation when no other in-flight operations hold a reference, following the same pattern used in VssStore. --- src/io/tier_store.rs | 80 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 70 insertions(+), 10 deletions(-) diff --git a/src/io/tier_store.rs b/src/io/tier_store.rs index 68b3ef729..0f0f9b804 100644 --- a/src/io/tier_store.rs +++ b/src/io/tier_store.rs @@ -16,8 +16,10 @@ use lightning::util::persist::{ }; use lightning::{io, log_error}; +use std::collections::HashMap; use std::future::Future; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; +use tokio::sync::Mutex as TokioMutex; /// A 3-tiered [`KVStore`] implementation that routes data across /// storage backends that may be local or remote: @@ -138,13 +140,45 @@ struct TierStoreInner { ephemeral_store: Option>, /// An optional second durable store for primary-backed data. backup_store: Option>, + /// Per-key locks for serializing primary+backup operations. + locks: Mutex>>>, logger: Arc, } impl TierStoreInner { /// Creates a tier store with the primary data store. pub fn new(primary_store: Arc, logger: Arc) -> Self { - Self { primary_store, ephemeral_store: None, backup_store: None, logger } + Self { + primary_store, + ephemeral_store: None, + backup_store: None, + locks: Mutex::new(HashMap::new()), + logger, + } + } + + fn get_key_lock(&self, locking_key: String) -> Arc> { + let mut locks = self.locks.lock().expect("lock"); + Arc::clone(locks.entry(locking_key).or_default()) + } + + fn clean_locks(&self, lock_ref: &Arc>, locking_key: String) { + let mut locks = self.locks.lock().expect("lock"); + let strong_count = Arc::strong_count(lock_ref); + debug_assert!(strong_count >= 2, "Unexpected TierStore lock strong count"); + if strong_count == 2 { + locks.remove(&locking_key); + } + } + + fn build_locking_key( + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + ) -> String { + if primary_namespace.is_empty() { + key.to_owned() + } else { + format!("{}#{}#{}", primary_namespace, secondary_namespace, key) + } } /// Reads from the primary data store. @@ -315,13 +349,26 @@ impl TierStoreInner { } } - self.write_primary_backup_async( + let locking_key = self.build_locking_key( primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str(), - buf, - ) - .await + ); + let key_lock = self.get_key_lock(locking_key.clone()); + + let res = { + let _guard = key_lock.lock().await; + self.write_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + buf, + ) + .await + }; + + self.clean_locks(&key_lock, locking_key); + res } async fn remove_internal( @@ -347,13 +394,26 @@ impl TierStoreInner { } } - self.remove_primary_backup_async( + let locking_key = self.build_locking_key( primary_namespace.as_str(), secondary_namespace.as_str(), key.as_str(), - lazy, - ) - .await + ); + let key_lock = self.get_key_lock(locking_key.clone()); + + let res = { + let _guard = key_lock.lock().await; + self.remove_primary_backup_async( + primary_namespace.as_str(), + secondary_namespace.as_str(), + key.as_str(), + lazy, + ) + .await + }; + + self.clean_locks(&key_lock, locking_key); + res } async fn list_internal( From e53deb6efd04f6158811cf958c2844dc35d512a6 Mon Sep 17 00:00:00 2001 From: Enigbe Date: Thu, 11 Jun 2026 09:30:06 +0100 Subject: [PATCH 6/6] fixup! Integrate TierStore into NodeBuilder Replace Builder::set_ephemeral_store(Arc) with set_ephemeral_storage_dir_path(String), constructing the SQLite store internally to mirror the backup store pattern. Remove BackupStorePathConflict since all three stores use distinct DB file names and cannot collide even in the same directory. --- src/builder.rs | 72 +++++++++++++-------------------- src/io/sqlite_store/mod.rs | 2 + tests/integration_tests_rust.rs | 14 ------- 3 files changed, 29 insertions(+), 59 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index e2d015e9e..1f1af76d9 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -155,21 +155,12 @@ impl std::fmt::Debug for LogWriterConfig { } } -#[derive(Default)] +#[derive(Default, Debug)] struct TierStoreConfig { - ephemeral: Option>, + ephemeral_storage_dir_path: Option, backup_storage_dir_path: Option, } -impl std::fmt::Debug for TierStoreConfig { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("TierStoreConfig") - .field("ephemeral", &self.ephemeral.as_ref().map(|_| "Arc")) - .field("backup_storage_dir_path", &self.backup_storage_dir_path) - .finish() - } -} - /// An error encountered during building a [`Node`]. /// /// [`Node`]: crate::Node @@ -216,11 +207,7 @@ pub enum BuildError { AsyncPaymentsConfigMismatch, /// An attempt to setup a DNS Resolver failed. DNSResolverSetupFailed, - /// The configured backup storage path conflicts with the primary storage path. - /// - /// Backup storage must use a distinct local directory so that the primary and - /// backup stores do not point to the same SQLite database. - BackupStorePathConflict, + } impl fmt::Display for BuildError { @@ -258,12 +245,7 @@ impl fmt::Display for BuildError { Self::DNSResolverSetupFailed => { write!(f, "An attempt to setup a DNS resolver has failed.") }, - Self::BackupStorePathConflict => { - write!( - f, - "The configured backup storage path conflicts with the primary storage path." - ) - }, + } } } @@ -665,10 +647,6 @@ impl NodeBuilder { /// Writes and removals for primary-backed data only succeed once both the /// primary and backup SQLite stores complete successfully. /// - /// The configured path must point to a distinct local directory from the - /// primary storage path. If the backup path equals the primary storage path, - /// building will fail with [`BuildError::BackupStorePathConflict`]. - /// /// If not set, durable data will be stored only in the primary store. /// /// [`SQLITE_BACKUP_DB_FILE_NAME`]: crate::io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME @@ -678,16 +656,17 @@ impl NodeBuilder { self } - /// Configures the ephemeral store for non-critical, frequently-accessed data. + /// Configures the ephemeral storage directory path for non-critical, frequently-accessed data. /// - /// When building with tiered storage, this store is used for ephemeral data like - /// the network graph and scorer data to reduce latency for reads. Data stored here - /// can be rebuilt if lost. + /// When set, a local SQLite store is created at this path for ephemeral data like + /// the network graph and scorer. Data stored here can be rebuilt if lost. /// /// If not set, non-critical data will be stored in the primary store. - pub fn set_ephemeral_store(&mut self, ephemeral_store: Arc) -> &mut Self { + pub fn set_ephemeral_storage_dir_path( + &mut self, ephemeral_storage_dir_path: String, + ) -> &mut Self { let tier_store_config = self.tier_store_config.get_or_insert(TierStoreConfig::default()); - tier_store_config.ephemeral = Some(ephemeral_store); + tier_store_config.ephemeral_storage_dir_path = Some(ephemeral_storage_dir_path.into()); self } @@ -896,9 +875,9 @@ impl NodeBuilder { /// The provided `kv_store` will be used as the primary storage backend. Optionally, /// an ephemeral store for frequently-accessed non-critical data (e.g., network graph, scorer) /// and a local SQLite backup store for disaster recovery can be configured via - /// [`set_ephemeral_store`] and [`set_backup_storage_dir_path`]. + /// [`set_ephemeral_storage_dir_path`] and [`set_backup_storage_dir_path`]. /// - /// [`set_ephemeral_store`]: Self::set_ephemeral_store + /// [`set_ephemeral_storage_dir_path`]: Self::set_ephemeral_storage_dir_path /// [`set_backup_storage_dir_path`]: Self::set_backup_storage_dir_path pub fn build_with_store( &self, node_entropy: NodeEntropy, kv_store: S, @@ -932,18 +911,21 @@ impl NodeBuilder { let primary_store = Arc::new(DynStoreWrapper(kv_store)); let mut tier_store = TierStore::new(primary_store, Arc::clone(&logger)); if let Some(config) = ts_config { - config.ephemeral.as_ref().map(|s| tier_store.set_ephemeral_store(Arc::clone(s))); - if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { - let primary_storage_dir_path = PathBuf::from(&self.config.storage_dir_path); - if primary_storage_dir_path == *backup_storage_dir_path { - log_error!( - logger, - "Backup storage path must differ from primary storage path: {}", - backup_storage_dir_path.display() - ); - return Err(BuildError::BackupStorePathConflict); - } + if let Some(ephemeral_storage_dir_path) = config.ephemeral_storage_dir_path.as_ref() { + let ephemeral_store = SqliteStore::new( + ephemeral_storage_dir_path.clone(), + Some(io::sqlite_store::SQLITE_EPHEMERAL_DB_FILE_NAME.to_string()), + Some(io::sqlite_store::KV_TABLE_NAME.to_string()), + ) + .map_err(|e| { + log_error!(logger, "Failed to setup ephemeral SQLite store: {}", e); + BuildError::KVStoreSetupFailed + })?; + let ephemeral_store: Arc = Arc::new(DynStoreWrapper(ephemeral_store)); + tier_store.set_ephemeral_store(ephemeral_store); + } + if let Some(backup_storage_dir_path) = config.backup_storage_dir_path.as_ref() { let backup_store = SqliteStore::new( backup_storage_dir_path.clone(), Some(io::sqlite_store::SQLITE_BACKUP_DB_FILE_NAME.to_string()), diff --git a/src/io/sqlite_store/mod.rs b/src/io/sqlite_store/mod.rs index 136676799..ddafc3748 100644 --- a/src/io/sqlite_store/mod.rs +++ b/src/io/sqlite_store/mod.rs @@ -26,6 +26,8 @@ mod migrations; pub const SQLITE_DB_FILE_NAME: &str = "ldk_node_data.sqlite"; /// LDK Node's backup database file name. pub const SQLITE_BACKUP_DB_FILE_NAME: &str = "ldk_node_data_backup.sqlite"; +/// LDK Node's ephemeral database file name. +pub const SQLITE_EPHEMERAL_DB_FILE_NAME: &str = "ldk_node_data_ephemeral.sqlite"; /// LDK Node's table in which we store all data. pub const KV_TABLE_NAME: &str = "ldk_node_data"; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 83f0196b7..8e30a0ab5 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -3107,18 +3107,4 @@ async fn builder_configures_sqlite_backup_store() { } } -#[cfg(not(feature = "uniffi"))] -#[test] -fn sqlite_backup_rejects_primary_storage_path() { - let mut config = random_config(false); - config.store_type = TestStoreType::Sqlite; - - let primary_dir = config.node_config.storage_dir_path.clone(); - setup_builder!(builder, config.node_config.clone()); - builder.set_backup_storage_dir_path(primary_dir); - - let res = builder.build(config.node_entropy.into()); - - assert!(matches!(res, Err(ldk_node::BuildError::BackupStorePathConflict))); -}