Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (txsink) add tx sink deny list implementation and add new server component for it #1782

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/bin/zksync_server/src/main.rs
Expand Up @@ -11,6 +11,7 @@ use zksync_config::{
},
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
tx_sink::TxSinkConfig,
ContractsConfig, FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig,
FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, ObservabilityConfig,
PrometheusConfig, ProofDataHandlerConfig,
Expand Down Expand Up @@ -272,5 +273,6 @@ fn load_env_config() -> anyhow::Result<TempConfigStore> {
object_store_config: ObjectStoreConfig::from_env().ok(),
observability: ObservabilityConfig::from_env().ok(),
snapshot_creator: SnapshotsCreatorConfig::from_env().ok(),
tx_sink_config: TxSinkConfig::from_env().ok(),
})
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/general.rs
Expand Up @@ -3,6 +3,7 @@ use crate::{
chain::{CircuitBreakerConfig, MempoolConfig, OperationsManagerConfig, StateKeeperConfig},
fri_prover_group::FriProverGroupConfig,
house_keeper::HouseKeeperConfig,
tx_sink::TxSinkConfig,
FriProofCompressorConfig, FriProverConfig, FriProverGatewayConfig,
FriWitnessGeneratorConfig, FriWitnessVectorGeneratorConfig, ObservabilityConfig,
PrometheusConfig, ProofDataHandlerConfig,
Expand Down Expand Up @@ -32,4 +33,5 @@ pub struct GeneralConfig {
pub eth: Option<EthConfig>,
pub snapshot_creator: Option<SnapshotsCreatorConfig>,
pub observability: Option<ObservabilityConfig>,
pub tx_sink_config: Option<TxSinkConfig>,
}
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/mod.rs
Expand Up @@ -17,6 +17,7 @@ pub use self::{
observability::{ObservabilityConfig, OpentelemetryConfig},
proof_data_handler::ProofDataHandlerConfig,
snapshots_creator::SnapshotsCreatorConfig,
tx_sink::TxSinkConfig,
utils::PrometheusConfig,
};

Expand All @@ -41,6 +42,7 @@ pub mod object_store;
pub mod observability;
pub mod proof_data_handler;
pub mod snapshots_creator;
pub mod tx_sink;
pub mod utils;
pub mod wallets;

Expand Down
19 changes: 19 additions & 0 deletions core/lib/config/src/configs/tx_sink.rs
@@ -0,0 +1,19 @@
use std::{collections::HashSet, str::FromStr};

use serde::Deserialize;
use zksync_basic_types::Address;

#[derive(Debug, Deserialize, Clone, PartialEq)]
pub struct TxSinkConfig {
pub deny_list: Option<String>,
}

impl TxSinkConfig {
pub fn deny_list(&self) -> Option<HashSet<Address>> {
self.deny_list.as_ref().map(|list| {
list.split(',')
.map(|element| Address::from_str(element).unwrap())
.collect()
})
}
}
8 changes: 8 additions & 0 deletions core/lib/config/src/testonly.rs
Expand Up @@ -723,3 +723,11 @@ impl Distribution<configs::consensus::ConsensusSecrets> for EncodeDist {
}
}
}

impl Distribution<configs::TxSinkConfig> for EncodeDist {
fn sample<R: Rng + ?Sized>(&self, rng: &mut R) -> configs::TxSinkConfig {
configs::TxSinkConfig {
deny_list: self.sample(rng),
}
}
}
1 change: 1 addition & 0 deletions core/lib/env_config/src/lib.rs
Expand Up @@ -19,6 +19,7 @@ pub mod object_store;
mod observability;
mod proof_data_handler;
mod snapshots_creator;
mod tx_sink;
mod utils;

mod genesis;
Expand Down
35 changes: 35 additions & 0 deletions core/lib/env_config/src/tx_sink.rs
@@ -0,0 +1,35 @@
use zksync_config::configs::TxSinkConfig;

use crate::{envy_load, FromEnv};

impl FromEnv for TxSinkConfig {
fn from_env() -> anyhow::Result<Self> {
envy_load("tx_sink", "TX_SINK_")
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::EnvMutex;

static MUTEX: EnvMutex = EnvMutex::new();

fn expected_config() -> TxSinkConfig {
TxSinkConfig {
deny_list: Some("0x1234567890abcdef".to_string()),
}
}

#[test]
fn from_env() {
let mut lock = MUTEX.lock();
let config = r#"
TX_SINK_DENY_LIST="0x1234567890abcdef"
"#;
lock.set_env(config);

let actual = TxSinkConfig::from_env().unwrap();
assert_eq!(actual, expected_config());
}
}
2 changes: 2 additions & 0 deletions core/lib/protobuf_config/src/general.rs
Expand Up @@ -37,6 +37,7 @@ impl ProtoRepr for proto::GeneralConfig {
snapshot_creator: read_optional_repr(&self.snapshot_creator)
.context("snapshot_creator")?,
observability: read_optional_repr(&self.observability).context("observability")?,
tx_sink_config: read_optional_repr(&self.tx_sink).context("tx_sink")?,
})
}

Expand Down Expand Up @@ -68,6 +69,7 @@ impl ProtoRepr for proto::GeneralConfig {
eth: this.eth.as_ref().map(ProtoRepr::build),
snapshot_creator: this.snapshot_creator.as_ref().map(ProtoRepr::build),
observability: this.observability.as_ref().map(ProtoRepr::build),
tx_sink: this.tx_sink_config.as_ref().map(ProtoRepr::build),
}
}
}
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/lib.rs
Expand Up @@ -24,6 +24,7 @@ mod snapshots_creator;
pub mod testonly;
#[cfg(test)]
mod tests;
mod tx_sink;
mod utils;
mod wallets;

Expand Down
6 changes: 4 additions & 2 deletions core/lib/protobuf_config/src/proto/config/general.proto
Expand Up @@ -2,16 +2,17 @@ syntax = "proto3";

package zksync.config.general;

import "zksync/config/prover.proto";
import "zksync/config/api.proto";
import "zksync/config/chain.proto";
import "zksync/config/circuit_breaker.proto";
import "zksync/config/contract_verifier.proto";
import "zksync/config/database.proto";
import "zksync/config/circuit_breaker.proto";
import "zksync/config/eth_sender.proto";
import "zksync/config/house_keeper.proto";
import "zksync/config/observability.proto";
import "zksync/config/prover.proto";
import "zksync/config/snapshots_creator.proto";
import "zksync/config/tx_sink.proto";
import "zksync/config/utils.proto";

message GeneralConfig {
Expand All @@ -35,4 +36,5 @@ message GeneralConfig {
optional config.prover.ProverGateway prover_gateway = 30;
optional config.snapshot_creator.SnapshotsCreator snapshot_creator = 31;
optional config.observability.Observability observability = 32;
optional config.tx_sink.TxSink tx_sink = 33;
}
7 changes: 7 additions & 0 deletions core/lib/protobuf_config/src/proto/config/tx_sink.proto
@@ -0,0 +1,7 @@
syntax = "proto3";

package zksync.config.tx_sink;

message TxSink {
optional string deny_list = 1; // optional
}
1 change: 1 addition & 0 deletions core/lib/protobuf_config/src/tests.rs
Expand Up @@ -39,6 +39,7 @@ fn test_encoding() {
test_encode_all_formats::<ReprConv<proto::prover::ProofDataHandler>>(rng);
test_encode_all_formats::<ReprConv<proto::snapshot_creator::SnapshotsCreator>>(rng);
test_encode_all_formats::<ReprConv<proto::observability::Observability>>(rng);
test_encode_all_formats::<ReprConv<proto::tx_sink::TxSink>>(rng);
}

pub fn decode_yaml_repr<T: ProtoRepr>(
Expand Down
19 changes: 19 additions & 0 deletions core/lib/protobuf_config/src/tx_sink.rs
@@ -0,0 +1,19 @@
use zksync_config::configs;
use zksync_protobuf::repr::ProtoRepr;

use crate::proto::tx_sink as proto;

impl ProtoRepr for proto::TxSink {
type Type = configs::tx_sink::TxSinkConfig;
fn read(&self) -> anyhow::Result<Self::Type> {
Ok(Self::Type {
deny_list: self.deny_list.clone(),
})
}

fn build(this: &Self::Type) -> Self {
Self {
deny_list: this.deny_list.clone(),
}
}
}
@@ -0,0 +1,83 @@
use std::collections::{
hash_map::{Entry, HashMap},
HashSet,
};

use tokio::sync::Mutex;
use zksync_dal::{transactions_dal::L2TxSubmissionResult, ConnectionPool, Core, CoreDal};
use zksync_shared_metrics::{TxStage, APP_METRICS};
use zksync_types::{fee::TransactionExecutionMetrics, l2::L2Tx, Address, Nonce, H256};

use super::{tx_sink::TxSink, SubmitTxError};
use crate::api_server::web3::metrics::API_METRICS;

/// Wrapper for the master DB pool that allows to submit transactions to the mempool.
#[derive(Debug)]
pub struct DenyListPoolSink {
deny_list_pool: ConnectionPool<Core>,
deny_list: HashSet<Address>,
inflight_requests: Mutex<HashMap<(Address, Nonce), H256>>,
}

impl DenyListPoolSink {
pub fn new(deny_list_pool: ConnectionPool<Core>, deny_list: HashSet<Address>) -> Self {
Self {
deny_list_pool,
deny_list,
inflight_requests: Mutex::new(HashMap::new()),
}
}
}

#[async_trait::async_trait]
impl TxSink for DenyListPoolSink {
async fn submit_tx(
&self,
tx: &L2Tx,
execution_metrics: TransactionExecutionMetrics,
) -> Result<L2TxSubmissionResult, SubmitTxError> {
let address_and_nonce = (tx.initiator_account(), tx.nonce());
if self.deny_list.contains(&address_and_nonce.0) {
return Err(SubmitTxError::SenderInDenyList);
}

let mut lock = self.inflight_requests.lock().await;
match lock.entry(address_and_nonce) {
Entry::Occupied(entry) => {
let submission_res_handle = if entry.get() == &tx.hash() {
L2TxSubmissionResult::Duplicate
} else {
L2TxSubmissionResult::InsertionInProgress
};
APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
return Ok(submission_res_handle);
}
Entry::Vacant(entry) => {
entry.insert(tx.hash());
API_METRICS.inflight_tx_submissions.inc_by(1);
}
};
drop(lock);

let result = match self.deny_list_pool.connection_tagged("api").await {
Ok(mut connection) => connection
.transactions_dal()
.insert_transaction_l2(tx, execution_metrics)
.await
.map(|submission_res_handle| {
APP_METRICS.processed_txs[&TxStage::Mempool(submission_res_handle)].inc();
submission_res_handle
})
.map_err(|err| err.generalize().into()),
Err(err) => Err(err.generalize().into()),
};

self.inflight_requests
.lock()
.await
.remove(&address_and_nonce);
API_METRICS.inflight_tx_submissions.dec_by(1);

result
}
}
1 change: 1 addition & 0 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Expand Up @@ -45,6 +45,7 @@ use crate::{
utils::pending_protocol_version,
};

pub mod deny_list_pool_sink;
pub mod master_pool_sink;
pub mod proxy;
mod result;
Expand Down
3 changes: 3 additions & 0 deletions core/lib/zksync_core/src/api_server/tx_sender/result.rs
Expand Up @@ -75,6 +75,8 @@ pub enum SubmitTxError {
/// Catch-all internal error (e.g., database error) that should not be exposed to the caller.
#[error("internal error")]
Internal(#[from] anyhow::Error),
#[error("sender is in deny list")]
SenderInDenyList,
}

impl SubmitTxError {
Expand Down Expand Up @@ -108,6 +110,7 @@ impl SubmitTxError {
Self::ProxyError(_) => "proxy-error",
Self::FailedToPublishCompressedBytecodes => "failed-to-publish-compressed-bytecodes",
Self::Internal(_) => "internal",
Self::SenderInDenyList => "sender-in-deny-list",
}
}

Expand Down
13 changes: 9 additions & 4 deletions core/lib/zksync_core/src/api_server/tx_sender/tests.rs
@@ -1,5 +1,4 @@
//! Tests for the transaction sender.

use zksync_config::configs::wallets::Wallets;
use zksync_types::{get_nonce_key, L1BatchNumber, L2BlockNumber, StorageLog};

Expand Down Expand Up @@ -27,10 +26,16 @@ pub(crate) async fn create_test_tx_sender(

let storage_caches = PostgresStorageCaches::new(1, 1);
let batch_fee_model_input_provider = Arc::new(MockBatchFeeParamsProvider::default());

let config = crate::TxSenderBuilderConfigs {
tx_sender_config: tx_sender_config.clone(),
web3_json_config: web3_config.clone(),
state_keeper_config: state_keeper_config.clone(),
tx_sink_config: None,
};

let (mut tx_sender, vm_barrier) = crate::build_tx_sender(
&tx_sender_config,
&web3_config,
&state_keeper_config,
config,
pool.clone(),
pool,
batch_fee_model_input_provider,
Expand Down