Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions bindings/ldk_node.udl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ typedef dictionary ElectrumSyncConfig;

typedef dictionary TorConfig;

typedef enum ForwardedPaymentTrackingMode;

typedef interface NodeEntropy;

typedef enum WordCount;
Expand Down Expand Up @@ -137,6 +139,19 @@ interface Node {
void remove_payment([ByRef]PaymentId payment_id);
BalanceDetails list_balances();
sequence<PaymentDetails> list_payments();
ForwardedPaymentDetails? forwarded_payment([ByRef]ForwardedPaymentId forwarded_payment_id);
[Throws=NodeError]
ForwardedPaymentDetailsPage list_forwarded_payments(PageToken? page_token);
ForwardedPaymentTrackingMode forwarded_payment_tracking_mode();
ChannelForwardingStats? channel_forwarding_stats([ByRef]ChannelId channel_id);
[Throws=NodeError]
ChannelForwardingStatsPage list_channel_forwarding_stats(PageToken? page_token);
[Throws=NodeError]
ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats(PageToken? page_token);
[Throws=NodeError]
ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats_in_range(u64 start_timestamp, u64 end_timestamp, PageToken? page_token);
[Throws=NodeError]
ChannelPairForwardingStatsPage list_channel_pair_forwarding_stats_for_pair(ChannelId prev_channel_id, ChannelId next_channel_id, PageToken? page_token);
sequence<PeerDetails> list_peers();
sequence<ChannelDetails> list_channels();
NetworkGraph network_graph();
Expand Down Expand Up @@ -379,6 +394,9 @@ typedef string OfferId;
[Custom]
typedef string PaymentId;

[Custom]
typedef string ForwardedPaymentId;

[Custom]
typedef string PaymentHash;

Expand All @@ -391,6 +409,12 @@ typedef string PaymentSecret;
[Custom]
typedef string ChannelId;

[Custom]
typedef string ChannelPairStatsId;

[Custom]
typedef string PageToken;

[Custom]
typedef string UserChannelId;

Expand All @@ -417,3 +441,15 @@ typedef enum Event;
typedef interface HRNResolverConfig;

typedef dictionary HumanReadableNamesConfig;

typedef dictionary ForwardedPaymentDetails;

typedef dictionary ChannelForwardingStats;

typedef dictionary ChannelPairForwardingStats;

typedef dictionary ForwardedPaymentDetailsPage;

typedef dictionary ChannelForwardingStatsPage;

typedef dictionary ChannelPairForwardingStatsPage;
114 changes: 95 additions & 19 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,13 @@ use crate::io::utils::{
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
self, CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE,
CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE,
FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
};
Expand All @@ -77,7 +83,8 @@ use crate::peer_store::PeerStore;
use crate::runtime::{Runtime, RuntimeSpawner};
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreRef, DynStoreWrapper,
AsyncPersister, ChainMonitor, ChannelForwardingStatsStore, ChannelManager,
ChannelPairForwardingStatsStore, DynStore, DynStoreRef, DynStoreWrapper, ForwardedPaymentStore,
GossipSync, Graph, HRNResolver, KeysManager, MessageRouter, OnionMessenger, PaymentStore,
PeerManager, PendingPaymentStore,
};
Expand Down Expand Up @@ -1397,24 +1404,48 @@ fn build_with_store_internal(

let kv_store_ref = Arc::clone(&kv_store);
let logger_ref = Arc::clone(&logger);
let (payment_store_res, node_metris_res, pending_payment_store_res) =
runtime.block_on(async move {
tokio::join!(
read_all_objects(
&*kv_store_ref,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
),
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
read_all_objects(
&*kv_store_ref,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
)
let (
payment_store_res,
forwarded_payment_store_res,
channel_forwarding_stats_res,
channel_pair_forwarding_stats_res,
node_metris_res,
pending_payment_store_res,
) = runtime.block_on(async move {
tokio::join!(
read_all_objects(
&*kv_store_ref,
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
),
read_all_objects(
&*kv_store_ref,
FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
),
read_all_objects(
&*kv_store_ref,
CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
),
read_all_objects(
&*kv_store_ref,
CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
),
read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)),
read_all_objects(
&*kv_store_ref,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
Arc::clone(&logger_ref),
)
});
)
});

// Initialize the status fields.
let node_metrics = match node_metris_res {
Expand Down Expand Up @@ -1443,6 +1474,48 @@ fn build_with_store_internal(
},
};

let forwarded_payment_store = match forwarded_payment_store_res {
Ok(forwarded_payments) => Arc::new(ForwardedPaymentStore::new(
forwarded_payments,
FORWARDED_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
FORWARDED_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read forwarded payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let channel_forwarding_stats_store = match channel_forwarding_stats_res {
Ok(stats) => Arc::new(ChannelForwardingStatsStore::new(
stats,
CHANNEL_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
CHANNEL_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read channel forwarding stats from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let channel_pair_forwarding_stats_store = match channel_pair_forwarding_stats_res {
Ok(stats) => Arc::new(ChannelPairForwardingStatsStore::new(
stats,
CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
CHANNEL_PAIR_FORWARDING_STATS_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read channel pair forwarding stats from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let (chain_source, chain_tip_opt) = match chain_data_source_config {
Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => {
let sync_config = sync_config.unwrap_or(EsploraSyncConfig::default());
Expand Down Expand Up @@ -2245,6 +2318,9 @@ fn build_with_store_internal(
scorer,
peer_store,
payment_store,
forwarded_payment_store,
channel_forwarding_stats_store,
channel_pair_forwarding_stats_store,
lnurl_auth,
is_running,
node_metrics,
Expand Down
27 changes: 25 additions & 2 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,25 @@ pub(crate) const HRN_RESOLUTION_TIMEOUT_SECS: u64 = 5;
// The timeout after which we abort an LNURL-auth operation.
pub(crate) const LNURL_AUTH_TIMEOUT_SECS: u64 = 15;

/// The mode used for tracking forwarded payments.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Enum))]
pub enum ForwardedPaymentTrackingMode {
/// Store individual forwarded payments until they are aggregated into channel-pair buckets.
Detailed {
/// Number of minutes to retain individual forwarded payments before aggregation.
retention_minutes: u64,
},
/// Track only per-channel aggregate statistics.
Stats,
}

impl Default for ForwardedPaymentTrackingMode {
fn default() -> Self {
Self::Stats
}
}

#[derive(Debug, Clone)]
#[cfg_attr(feature = "uniffi", derive(uniffi::Record))]
/// Represents the configuration of an [`Node`] instance.
Expand All @@ -130,9 +149,10 @@ pub(crate) const LNURL_AUTH_TIMEOUT_SECS: u64 = 15;
/// | `route_parameters` | None |
/// | `tor_config` | None |
/// | `hrn_config` | HumanReadableNamesConfig::default() |
/// | `forwarded_payment_tracking_mode` | Stats |
///
/// See [`AnchorChannelsConfig`] and [`RouteParametersConfig`] for more information regarding their
/// respective default values.
/// See [`AnchorChannelsConfig`], [`RouteParametersConfig`], and
/// [`ForwardedPaymentTrackingMode`] for more information regarding their respective default values.
///
/// [`Node`]: crate::Node
pub struct Config {
Expand Down Expand Up @@ -205,6 +225,8 @@ pub struct Config {
///
/// [BIP 353]: https://github.com/bitcoin/bips/blob/master/bip-0353.mediawiki
pub hrn_config: HumanReadableNamesConfig,
/// The mode used for tracking forwarded payments.
pub forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode,
}

impl Default for Config {
Expand All @@ -221,6 +243,7 @@ impl Default for Config {
route_parameters: None,
node_alias: None,
hrn_config: HumanReadableNamesConfig::default(),
forwarded_payment_tracking_mode: ForwardedPaymentTrackingMode::default(),
}
}
}
Expand Down
63 changes: 61 additions & 2 deletions src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Mutex};

use lightning::util::persist::KVStore;
use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore};
use lightning::util::ser::{Readable, Writeable};

use crate::logger::{log_error, LdkLogger};
use crate::types::DynStore;
use crate::types::{DynStore, DynStoreRef};
use crate::Error;

pub(crate) trait StorableObject: Clone + Readable + Writeable {
Expand Down Expand Up @@ -179,6 +179,65 @@ where
self.objects.lock().expect("lock").values().filter(f).cloned().collect::<Vec<SO>>()
}

pub(crate) async fn list_page(
&self, page_token: Option<PageToken>,
) -> Result<(Vec<SO>, Option<PageToken>), Error> {
let response = PaginatedKVStore::list_paginated(
&DynStoreRef(Arc::clone(&self.kv_store)),
&self.primary_namespace,
&self.secondary_namespace,
page_token,
)
.await
.map_err(|e| {
log_error!(
self.logger,
"Listing object data under {}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
e
);
Error::PersistenceFailed
})?;

let mut objects = Vec::with_capacity(response.keys.len());
for key in response.keys {
let data = KVStore::read(
&DynStoreRef(Arc::clone(&self.kv_store)),
&self.primary_namespace,
&self.secondary_namespace,
&key,
)
.await
.map_err(|e| {
log_error!(
self.logger,
"Reading object data for key {}/{}/{} failed due to: {}",
&self.primary_namespace,
&self.secondary_namespace,
key,
e
);
Error::PersistenceFailed
})?;

let object = SO::read(&mut &data[..]).map_err(|e| {
log_error!(
self.logger,
"Failed to deserialize object data for key {}/{}/{}: {}",
&self.primary_namespace,
&self.secondary_namespace,
key,
e
);
Error::PersistenceFailed
})?;
objects.push(object);
}

Ok((objects, response.next_page_token))
}

async fn persist(&self, object: &SO) -> Result<(), Error> {
let (store_key, data) = Self::encode_object(object);
self.persist_encoded(store_key, data).await
Expand Down
Loading
Loading