Skip to content

Commit

Permalink
feat(state-keeper): Refactor persistence in StateKeeper (#1411)
Browse files Browse the repository at this point in the history
## What ❔

- Splits off miniblock and L1 batch persistence from `StateKeeperIO`
into a separate trait.
- Moves tracking current miniblock / L1 batch number from
`StateKeeperIO` to the state keeper.

## Why ❔

- Simplifies further changes to the persistence logic, e.g. toggling
saving protective reads.
- Can be used as a building block for generic retrospective L1 batch
execution.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli committed Mar 26, 2024
1 parent 62deb59 commit e26091a
Show file tree
Hide file tree
Showing 36 changed files with 1,663 additions and 1,488 deletions.
31 changes: 10 additions & 21 deletions core/bin/external_node/src/main.rs
Expand Up @@ -5,7 +5,7 @@ use clap::Parser;
use metrics::EN_METRICS;
use prometheus_exporter::PrometheusExporterConfig;
use tokio::{sync::watch, task};
use zksync_basic_types::{Address, L2ChainId};
use zksync_basic_types::L2ChainId;
use zksync_concurrency::{ctx, limiter, scope, time};
use zksync_config::configs::database::MerkleTreeMode;
use zksync_core::{
Expand All @@ -25,8 +25,8 @@ use zksync_core::{
reorg_detector::ReorgDetector,
setup_sigint_handler,
state_keeper::{
seal_criteria::NoopSealer, BatchExecutor, MainBatchExecutor, MiniblockSealer,
MiniblockSealerHandle, ZkSyncStateKeeper,
seal_criteria::NoopSealer, BatchExecutor, MainBatchExecutor, OutputHandler,
StateKeeperPersistence, ZkSyncStateKeeper,
},
sync_layer::{
batch_status_updater::BatchStatusUpdater, external_io::ExternalIO, ActionQueue,
Expand Down Expand Up @@ -62,25 +62,16 @@ async fn build_state_keeper(
state_keeper_db_path: String,
config: &ExternalNodeConfig,
connection_pool: ConnectionPool<Core>,
sync_state: SyncState,
l2_erc20_bridge_addr: Address,
miniblock_sealer_handle: MiniblockSealerHandle,
output_handler: OutputHandler,
stop_receiver: watch::Receiver<bool>,
chain_id: L2ChainId,
) -> anyhow::Result<ZkSyncStateKeeper> {
// These config values are used on the main node, and depending on these values certain transactions can
// be *rejected* (that is, not included into the block). However, external node only mirrors what the main
// node has already executed, so we can safely set these values to the maximum possible values - if the main
// node has already executed the transaction, then the external node must execute it too.
let max_allowed_l2_tx_gas_limit = u32::MAX.into();
let validation_computational_gas_limit = u32::MAX;
// We only need call traces on the external node if the `debug_` namespace is enabled.
let save_call_traces = config.optional.api_namespaces().contains(&Namespace::Debug);

let batch_executor_base: Box<dyn BatchExecutor> = Box::new(MainBatchExecutor::new(
state_keeper_db_path,
connection_pool.clone(),
max_allowed_l2_tx_gas_limit,
save_call_traces,
false,
config.optional.enum_index_migration_chunk_size,
Expand All @@ -91,13 +82,9 @@ async fn build_state_keeper(
let main_node_client = <dyn MainNodeClient>::json_rpc(&main_node_url)
.context("Failed creating JSON-RPC client for main node")?;
let io = ExternalIO::new(
miniblock_sealer_handle,
connection_pool,
action_queue,
sync_state,
Box::new(main_node_client),
l2_erc20_bridge_addr,
validation_computational_gas_limit,
chain_id,
)
.await
Expand All @@ -107,6 +94,7 @@ async fn build_state_keeper(
stop_receiver,
Box::new(io),
batch_executor_base,
output_handler,
Arc::new(NoopSealer),
))
}
Expand Down Expand Up @@ -134,8 +122,9 @@ async fn init_tasks(
app_health.insert_custom_component(Arc::new(sync_state.clone()));
let (action_queue_sender, action_queue) = ActionQueue::new();

let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(
let (persistence, miniblock_sealer) = StateKeeperPersistence::new(
connection_pool.clone(),
config.remote.l2_erc20_bridge_addr,
config.optional.miniblock_seal_queue_capacity,
);
task_handles.push(tokio::spawn(miniblock_sealer.run()));
Expand All @@ -157,14 +146,14 @@ async fn init_tasks(
}
}));

let output_handler = OutputHandler::new(Box::new(persistence.with_tx_insertion()))
.with_handler(Box::new(sync_state.clone()));
let state_keeper = build_state_keeper(
action_queue,
config.required.state_cache_path.clone(),
config,
connection_pool.clone(),
sync_state.clone(),
config.remote.l2_erc20_bridge_addr,
miniblock_sealer_handle,
output_handler,
stop_receiver.clone(),
config.remote.l2_chain_id,
)
Expand Down

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 4 additions & 18 deletions core/lib/dal/src/protocol_versions_dal.rs
Expand Up @@ -138,15 +138,13 @@ impl ProtocolVersionsDal<'_, '_> {
db_transaction.commit().await.unwrap();
}

pub async fn base_system_contracts_by_timestamp(
pub async fn protocol_version_id_by_timestamp(
&mut self,
current_timestamp: u64,
) -> anyhow::Result<(BaseSystemContracts, ProtocolVersionId)> {
) -> sqlx::Result<ProtocolVersionId> {
let row = sqlx::query!(
r#"
SELECT
bootloader_code_hash,
default_account_code_hash,
id
FROM
protocol_versions
Expand All @@ -160,21 +158,9 @@ impl ProtocolVersionsDal<'_, '_> {
current_timestamp as i64
)
.fetch_one(self.storage.conn())
.await
.context("cannot fetch system contract hashes")?;
.await?;

let protocol_version = (row.id as u16)
.try_into()
.context("bogus protocol version ID")?;
let contracts = self
.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await?;
Ok((contracts, protocol_version))
ProtocolVersionId::try_from(row.id as u16).map_err(|err| sqlx::Error::Decode(err.into()))
}

pub async fn load_base_system_contracts_by_version_id(
Expand Down
51 changes: 26 additions & 25 deletions core/lib/zksync_core/src/consensus/testonly.rs
@@ -1,4 +1,5 @@
//! Utilities for testing the consensus module.

use std::{collections::HashMap, sync::Arc};

use anyhow::Context as _;
Expand All @@ -22,8 +23,10 @@ use crate::{
consensus::{fetcher::P2PConfig, Fetcher, Store},
genesis::{mock_genesis_config, GenesisParams},
state_keeper::{
io::common::IoCursor, seal_criteria::NoopSealer, tests::MockBatchExecutor, MiniblockSealer,
ZkSyncStateKeeper,
io::{IoCursor, L1BatchParams, MiniblockParams},
seal_criteria::NoopSealer,
tests::MockBatchExecutor,
OutputHandler, StateKeeperPersistence, ZkSyncStateKeeper,
},
sync_layer::{
fetcher::FetchedTransaction,
Expand Down Expand Up @@ -154,7 +157,6 @@ pub(super) struct StateKeeper {
fee_per_gas: u64,
gas_per_pubdata: u64,

sync_state: SyncState,
actions_sender: ActionQueueSender,
addr: sync::watch::Receiver<Option<std::net::SocketAddr>>,
store: Store,
Expand All @@ -163,7 +165,6 @@ pub(super) struct StateKeeper {
/// Fake StateKeeper task to be executed in the background.
pub(super) struct StateKeeperRunner {
actions_queue: ActionQueue,
sync_state: SyncState,
store: Store,
addr: sync::watch::Sender<Option<std::net::SocketAddr>>,
}
Expand Down Expand Up @@ -193,7 +194,6 @@ impl StateKeeper {
.await?
.context("pending_batch_exists()")?;
let (actions_sender, actions_queue) = ActionQueue::new();
let sync_state = SyncState::default();
let addr = sync::watch::channel(None).0;
Ok((
Self {
Expand All @@ -204,12 +204,10 @@ impl StateKeeper {
fee_per_gas: 10,
gas_per_pubdata: 100,
actions_sender,
sync_state: sync_state.clone(),
addr: addr.subscribe(),
store: store.clone(),
},
StateKeeperRunner {
sync_state,
actions_queue,
store: store.clone(),
addr,
Expand All @@ -224,22 +222,28 @@ impl StateKeeper {
self.last_timestamp += 5;
self.batch_sealed = false;
SyncAction::OpenBatch {
params: L1BatchParams {
protocol_version: ProtocolVersionId::latest(),
validation_computational_gas_limit: u32::MAX,
operator_address: GenesisParams::mock().config().fee_account,
fee_input: Default::default(),
first_miniblock: MiniblockParams {
timestamp: self.last_timestamp,
virtual_blocks: 1,
},
},
number: self.last_batch,
timestamp: self.last_timestamp,
l1_gas_price: 2,
l2_fair_gas_price: 3,
fair_pubdata_price: Some(24),
operator_address: GenesisParams::mock().config().fee_account,
protocol_version: ProtocolVersionId::latest(),
first_miniblock_info: (self.last_block, 1),
first_miniblock_number: self.last_block,
}
} else {
self.last_block += 1;
self.last_timestamp += 2;
SyncAction::Miniblock {
params: MiniblockParams {
timestamp: self.last_timestamp,
virtual_blocks: 0,
},
number: self.last_block,
timestamp: self.last_timestamp,
virtual_blocks: 0,
}
}
}
Expand All @@ -260,7 +264,7 @@ impl StateKeeper {
pub async fn seal_batch(&mut self) {
// Each batch ends with an empty block (aka fictive block).
let mut actions = vec![self.open_block()];
actions.push(SyncAction::SealBatch { virtual_blocks: 0 });
actions.push(SyncAction::SealBatch);
self.actions_sender.push_actions(actions).await;
self.batch_sealed = true;
}
Expand Down Expand Up @@ -302,7 +306,7 @@ impl StateKeeper {
Fetcher {
store: self.store,
client: Box::new(client),
sync_state: self.sync_state,
sync_state: SyncState::default(),
limiter: unbounded_limiter(ctx),
}
.run_centralized(ctx, self.actions_sender)
Expand All @@ -319,7 +323,7 @@ impl StateKeeper {
Fetcher {
store: self.store,
client: Box::new(client),
sync_state: self.sync_state,
sync_state: SyncState::default(),
limiter: unbounded_limiter(ctx),
}
.run_p2p(ctx, self.actions_sender, cfg)
Expand Down Expand Up @@ -367,17 +371,13 @@ impl StateKeeperRunner {
pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
let res = scope::run!(ctx, |ctx, s| async {
let (stop_send, stop_recv) = sync::watch::channel(false);
let (miniblock_sealer, miniblock_sealer_handle) =
MiniblockSealer::new(self.store.0.clone(), 5);
let (persistence, miniblock_sealer) =
StateKeeperPersistence::new(self.store.0.clone(), Address::repeat_byte(11), 5);

let io = ExternalIO::new(
miniblock_sealer_handle,
self.store.0.clone(),
self.actions_queue,
self.sync_state,
Box::<MockMainNodeClient>::default(),
Address::repeat_byte(11),
u32::MAX,
L2ChainId::default(),
)
.await?;
Expand All @@ -402,6 +402,7 @@ impl StateKeeperRunner {
stop_recv,
Box::new(io),
Box::new(MockBatchExecutor),
OutputHandler::new(Box::new(persistence.with_tx_insertion())),
Arc::new(NoopSealer),
)
.run()
Expand Down
11 changes: 6 additions & 5 deletions core/lib/zksync_core/src/lib.rs
Expand Up @@ -76,7 +76,8 @@ use crate::{
metadata_calculator::{MetadataCalculator, MetadataCalculatorConfig},
metrics::{InitStage, APP_METRICS},
state_keeper::{
create_state_keeper, MempoolFetcher, MempoolGuard, MiniblockSealer, SequencerSealer,
create_state_keeper, MempoolFetcher, MempoolGuard, OutputHandler, SequencerSealer,
StateKeeperPersistence,
},
};

Expand Down Expand Up @@ -765,23 +766,23 @@ async fn add_state_keeper_to_task_futures(
.build()
.await
.context("failed to build miniblock_sealer_pool")?;
let (miniblock_sealer, miniblock_sealer_handle) = MiniblockSealer::new(
let (persistence, miniblock_sealer) = StateKeeperPersistence::new(
miniblock_sealer_pool,
contracts_config.l2_erc20_bridge_addr,
state_keeper_config.miniblock_seal_queue_capacity,
);
let persistence = persistence.with_object_store(object_store);
task_futures.push(tokio::spawn(miniblock_sealer.run()));

let state_keeper = create_state_keeper(
contracts_config,
state_keeper_config,
db_config,
network_config,
mempool_config,
state_keeper_pool.clone(),
mempool.clone(),
batch_fee_input_provider.clone(),
miniblock_sealer_handle,
object_store,
OutputHandler::new(Box::new(persistence)),
stop_receiver.clone(),
)
.await;
Expand Down

0 comments on commit e26091a

Please sign in to comment.