Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(db): Fine-tune state keeper cache performance / RAM usage #1804

Merged
4 changes: 3 additions & 1 deletion .github/workflows/protobuf.yaml
Expand Up @@ -73,4 +73,6 @@ jobs:
with:
github_token: ${{ github.token }}
- name: buf breaking
run: buf breaking './after.binpb' --against './before.binpb' --config '{"version":"v1","breaking":{"use":["WIRE_JSON","WIRE"]}}' --error-format 'github-actions'
run: >
buf breaking './after.binpb' --against './before.binpb' --exclude-path 'zksync/config/experimental.proto'
--config '{"version":"v1","breaking":{"use":["WIRE_JSON","WIRE"]}}' --error-format 'github-actions'
100 changes: 69 additions & 31 deletions core/bin/external_node/src/config/mod.rs
Expand Up @@ -400,19 +400,6 @@ pub(crate) struct OptionalENConfig {
pruning_data_retention_sec: u64,
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct ApiComponentConfig {
/// Address of the tree API used by this EN in case it does not have a
/// local tree component running and in this case needs to send requests
/// to some external tree API.
pub tree_api_remote_url: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct TreeComponentConfig {
pub api_port: Option<u16>,
}

impl OptionalENConfig {
const fn default_filters_limit() -> usize {
10_000
Expand Down Expand Up @@ -725,8 +712,41 @@ impl PostgresConfig {
}
}

/// Experimental part of the external node config. All parameters in this group can change or disappear without notice.
/// Eventually, parameters from this group generally end up in the optional group.
#[derive(Debug, Clone, Deserialize)]
pub(crate) struct ExperimentalENConfig {
// State keeper cache config
/// Block cache capacity of the state keeper RocksDB cache. The default value is 128 MB.
#[serde(default = "ExperimentalENConfig::default_state_keeper_db_block_cache_capacity_mb")]
state_keeper_db_block_cache_capacity_mb: usize,
/// Maximum number of files concurrently opened by state keeper cache RocksDB. Useful to fit into OS limits; can be used
/// as a rudimentary way to control RAM usage of the cache.
pub state_keeper_db_max_open_files: Option<NonZeroU32>,
}

impl ExperimentalENConfig {
const fn default_state_keeper_db_block_cache_capacity_mb() -> usize {
128
}

#[cfg(test)]
fn mock() -> Self {
Self {
state_keeper_db_block_cache_capacity_mb:
Self::default_state_keeper_db_block_cache_capacity_mb(),
state_keeper_db_max_open_files: None,
}
}

/// Returns the size of block cache for the state keeper RocksDB cache in bytes.
pub fn state_keeper_db_block_cache_capacity(&self) -> usize {
self.state_keeper_db_block_cache_capacity_mb * BYTES_IN_MEGABYTE
}
}

pub(crate) fn read_consensus_secrets() -> anyhow::Result<Option<ConsensusSecrets>> {
let Ok(path) = std::env::var("EN_CONSENSUS_SECRETS_PATH") else {
let Ok(path) = env::var("EN_CONSENSUS_SECRETS_PATH") else {
return Ok(None);
};
let cfg = std::fs::read_to_string(&path).context(path)?;
Expand All @@ -736,7 +756,7 @@ pub(crate) fn read_consensus_secrets() -> anyhow::Result<Option<ConsensusSecrets
}

pub(crate) fn read_consensus_config() -> anyhow::Result<Option<ConsensusConfig>> {
let Ok(path) = std::env::var("EN_CONSENSUS_CONFIG_PATH") else {
let Ok(path) = env::var("EN_CONSENSUS_CONFIG_PATH") else {
return Ok(None);
};
let cfg = std::fs::read_to_string(&path).context(path)?;
Expand All @@ -745,20 +765,34 @@ pub(crate) fn read_consensus_config() -> anyhow::Result<Option<ConsensusConfig>>
))
}

/// Configuration for snapshot recovery. Loaded optionally, only if the corresponding command-line argument
/// is supplied to the EN binary.
#[derive(Debug, Clone)]
/// Configuration for snapshot recovery. Loaded optionally, only if snapshot recovery is enabled.
#[derive(Debug)]
pub(crate) struct SnapshotsRecoveryConfig {
pub snapshots_object_store: ObjectStoreConfig,
}

pub(crate) fn read_snapshots_recovery_config() -> anyhow::Result<SnapshotsRecoveryConfig> {
let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")?;
Ok(SnapshotsRecoveryConfig {
snapshots_object_store,
})
impl SnapshotsRecoveryConfig {
pub fn new() -> anyhow::Result<Self> {
let snapshots_object_store = envy::prefixed("EN_SNAPSHOTS_OBJECT_STORE_")
.from_env::<ObjectStoreConfig>()
.context("failed loading snapshot object store config from env variables")?;
Ok(Self {
snapshots_object_store,
})
}
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct ApiComponentConfig {
/// Address of the tree API used by this EN in case it does not have a
/// local tree component running and in this case needs to send requests
/// to some external tree API.
pub tree_api_remote_url: Option<String>,
}

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct TreeComponentConfig {
pub api_port: Option<u16>,
}

/// External Node Config contains all the configuration required for the EN operation.
Expand All @@ -769,6 +803,7 @@ pub(crate) struct ExternalNodeConfig {
pub postgres: PostgresConfig,
pub optional: OptionalENConfig,
pub remote: RemoteENConfig,
pub experimental: ExperimentalENConfig,
pub consensus: Option<ConsensusConfig>,
pub api_component: ApiComponentConfig,
pub tree_component: TreeComponentConfig,
Expand All @@ -780,19 +815,20 @@ impl ExternalNodeConfig {
pub async fn collect() -> anyhow::Result<Self> {
let required = envy::prefixed("EN_")
.from_env::<RequiredENConfig>()
.context("could not load external node config")?;

.context("could not load external node config (required params)")?;
let optional = envy::prefixed("EN_")
.from_env::<OptionalENConfig>()
.context("could not load external node config")?;
.context("could not load external node config (optional params)")?;
let experimental = envy::prefixed("EN_EXPERIMENTAL_")
.from_env::<ExperimentalENConfig>()
.context("could not load external node config (experimental params)")?;

let api_component_config = envy::prefixed("EN_API_")
.from_env::<ApiComponentConfig>()
.context("could not load external node config")?;

.context("could not load external node config (API component params)")?;
let tree_component_config = envy::prefixed("EN_TREE_")
.from_env::<TreeComponentConfig>()
.context("could not load external node config")?;
.context("could not load external node config (tree component params)")?;

let client = L2Client::http(&required.main_node_url()?)
.context("Unable to build HTTP client for main node")?
Expand Down Expand Up @@ -844,6 +880,7 @@ impl ExternalNodeConfig {
postgres,
required,
optional,
experimental,
consensus: read_consensus_config().context("read_consensus_config()")?,
tree_component: tree_component_config,
api_component: api_component_config,
Expand All @@ -857,6 +894,7 @@ impl ExternalNodeConfig {
postgres: PostgresConfig::mock(test_pool),
optional: OptionalENConfig::mock(),
remote: RemoteENConfig::mock(),
experimental: ExperimentalENConfig::mock(),
consensus: None,
api_component: ApiComponentConfig {
tree_api_remote_url: None,
Expand Down
27 changes: 27 additions & 0 deletions core/bin/external_node/src/config/tests.rs
Expand Up @@ -107,3 +107,30 @@ fn parsing_optional_config_from_env() {
L1BatchCommitDataGeneratorMode::Validium
);
}

#[test]
fn parsing_experimental_config_from_empty_env() {
let config: ExperimentalENConfig = envy::prefixed("EN_EXPERIMENTAL_").from_iter([]).unwrap();
assert_eq!(config.state_keeper_db_block_cache_capacity(), 128 << 20);
assert_eq!(config.state_keeper_db_max_open_files, None);
}

#[test]
fn parsing_experimental_config_from_env() {
let env_vars = [
(
"EN_EXPERIMENTAL_STATE_KEEPER_DB_BLOCK_CACHE_CAPACITY_MB",
"64",
),
("EN_EXPERIMENTAL_STATE_KEEPER_DB_MAX_OPEN_FILES", "100"),
];
let env_vars = env_vars
.into_iter()
.map(|(name, value)| (name.to_owned(), value.to_owned()));

let config: ExperimentalENConfig = envy::prefixed("EN_EXPERIMENTAL_")
.from_iter(env_vars)
.unwrap();
assert_eq!(config.state_keeper_db_block_cache_capacity(), 64 << 20);
assert_eq!(config.state_keeper_db_max_open_files, NonZeroU32::new(100));
}
4 changes: 2 additions & 2 deletions core/bin/external_node/src/init.rs
Expand Up @@ -12,7 +12,7 @@ use zksync_shared_metrics::{SnapshotRecoveryStage, APP_METRICS};
use zksync_snapshots_applier::{SnapshotsApplierConfig, SnapshotsApplierTask};
use zksync_web3_decl::client::BoxedL2Client;

use crate::config::read_snapshots_recovery_config;
use crate::config::SnapshotsRecoveryConfig;

#[derive(Debug)]
enum InitDecision {
Expand Down Expand Up @@ -85,7 +85,7 @@ pub(crate) async fn ensure_storage_initialized(
);

tracing::warn!("Proceeding with snapshot recovery. This is an experimental feature; use at your own risk");
let recovery_config = read_snapshots_recovery_config()?;
let recovery_config = SnapshotsRecoveryConfig::new()?;
let blob_store = ObjectStoreFactory::new(recovery_config.snapshots_object_store)
.create_store()
.await;
Expand Down
12 changes: 8 additions & 4 deletions core/bin/external_node/src/main.rs
Expand Up @@ -48,7 +48,7 @@ use zksync_db_connection::{
};
use zksync_eth_client::{clients::QueryClient, EthInterface};
use zksync_health_check::{AppHealthCheck, HealthStatus, ReactiveHealthCheck};
use zksync_state::PostgresStorageCaches;
use zksync_state::{PostgresStorageCaches, RocksdbStorageOptions};
use zksync_storage::RocksDB;
use zksync_types::L2ChainId;
use zksync_utils::wait_for_tasks::ManagedTasks;
Expand Down Expand Up @@ -85,15 +85,19 @@ async fn build_state_keeper(
output_handler: OutputHandler,
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
task_handles: &mut Vec<task::JoinHandle<anyhow::Result<()>>>,
task_handles: &mut Vec<JoinHandle<anyhow::Result<()>>>,
) -> anyhow::Result<ZkSyncStateKeeper> {
// We only need call traces on the external node if the `debug_` namespace is enabled.
let save_call_traces = config.optional.api_namespaces().contains(&Namespace::Debug);

let cache_options = RocksdbStorageOptions {
block_cache_capacity: config.experimental.state_keeper_db_block_cache_capacity(),
max_open_files: config.experimental.state_keeper_db_max_open_files,
};
let (storage_factory, task) =
AsyncRocksdbCache::new(connection_pool.clone(), state_keeper_db_path);
AsyncRocksdbCache::new(connection_pool.clone(), state_keeper_db_path, cache_options);
let mut stop_receiver_clone = stop_receiver.clone();
task_handles.push(tokio::task::spawn(async move {
task_handles.push(tokio::spawn(async move {
let result = task.run(stop_receiver_clone.clone()).await;
stop_receiver_clone.changed().await?;
result
Expand Down
7 changes: 6 additions & 1 deletion core/lib/config/src/configs/database.rs
Expand Up @@ -3,6 +3,8 @@ use std::time::Duration;
use anyhow::Context as _;
use serde::{Deserialize, Serialize};

use crate::configs::ExperimentalDBConfig;

/// Mode of operation for the Merkle tree.
///
/// The mode does not influence how tree data is stored; i.e., a mode can be switched on the fly.
Expand Down Expand Up @@ -110,8 +112,11 @@ pub struct DBConfig {
/// Merkle tree configuration.
#[serde(skip)]
// ^ Filled in separately in `Self::from_env()`. We cannot use `serde(flatten)` because it
// doesn't work with 'envy`.
// doesn't work with `envy`.
pub merkle_tree: MerkleTreeConfig,
/// Experimental parts of the config.
#[serde(skip)] // same reasoning as for `merkle_tree`
pub experimental: ExperimentalDBConfig,
}

impl DBConfig {
Expand Down
35 changes: 35 additions & 0 deletions core/lib/config/src/configs/experimental.rs
@@ -0,0 +1,35 @@
//! Experimental part of configuration.

use std::num::NonZeroU32;

use serde::Deserialize;

#[derive(Debug, Clone, PartialEq, Deserialize)]
pub struct ExperimentalDBConfig {
/// Block cache capacity of the state keeper RocksDB cache. The default value is 128 MB.
#[serde(default = "ExperimentalDBConfig::default_state_keeper_db_block_cache_capacity_mb")]
pub state_keeper_db_block_cache_capacity_mb: usize,
/// Maximum number of files concurrently opened by state keeper cache RocksDB. Useful to fit into OS limits; can be used
/// as a rudimentary way to control RAM usage of the cache.
pub state_keeper_db_max_open_files: Option<NonZeroU32>,
}

impl Default for ExperimentalDBConfig {
fn default() -> Self {
Self {
state_keeper_db_block_cache_capacity_mb:
Self::default_state_keeper_db_block_cache_capacity_mb(),
state_keeper_db_max_open_files: None,
}
}
}

impl ExperimentalDBConfig {
const fn default_state_keeper_db_block_cache_capacity_mb() -> usize {
128
}

pub fn state_keeper_db_block_cache_capacity(&self) -> usize {
self.state_keeper_db_block_cache_capacity_mb * super::BYTES_IN_MEGABYTE
}
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/mod.rs
Expand Up @@ -6,6 +6,7 @@ pub use self::{
database::{DBConfig, PostgresConfig},
eth_sender::{EthConfig, GasAdjusterConfig},
eth_watch::EthWatchConfig,
experimental::ExperimentalDBConfig,
fri_proof_compressor::FriProofCompressorConfig,
fri_prover::FriProverConfig,
fri_prover_gateway::FriProverGatewayConfig,
Expand All @@ -28,6 +29,7 @@ pub mod contracts;
pub mod database;
pub mod eth_sender;
pub mod eth_watch;
mod experimental;
pub mod fri_proof_compressor;
pub mod fri_prover;
pub mod fri_prover_gateway;
Expand Down
10 changes: 10 additions & 0 deletions core/lib/config/src/testonly.rs
Expand Up @@ -284,11 +284,21 @@ impl Distribution<configs::database::MerkleTreeConfig> for EncodeDist {
}
}

impl Distribution<configs::ExperimentalDBConfig> for EncodeDist {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> configs::ExperimentalDBConfig {
configs::ExperimentalDBConfig {
state_keeper_db_block_cache_capacity_mb: self.sample(rng),
state_keeper_db_max_open_files: self.sample(rng),
}
}
}

impl Distribution<configs::database::DBConfig> for EncodeDist {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> configs::database::DBConfig {
configs::database::DBConfig {
state_keeper_db_path: self.sample(rng),
merkle_tree: self.sample(rng),
experimental: self.sample(rng),
}
}
}
Expand Down