Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(api): Improve logging for API server #1792

Merged
merged 9 commits into from
Apr 29, 2024
Merged
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::time::Duration;

use multivm::interface::{VmExecutionResultAndLogs, VmMemoryMetrics};
use vise::{Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, Metrics};
use vise::{
Buckets, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram, LatencyObserver, Metrics,
};
use zksync_shared_metrics::InteractionType;
use zksync_state::StorageViewMetrics;
use zksync_types::{
event::{extract_long_l2_to_l1_messages, extract_published_bytecodes},
fee::TransactionExecutionMetrics,
storage_writes_deduplicator::StorageWritesDeduplicator,
H256,
};
use zksync_utils::bytecode::bytecode_len_in_bytes;

use crate::api_server::utils::ReportFilter;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EncodeLabelValue, EncodeLabelSet)]
#[metrics(label = "type", rename_all = "snake_case")]
enum SizeType {
Expand Down Expand Up @@ -95,6 +100,49 @@ pub(in crate::api_server) enum SubmitTxStage {
DbInsert,
}

#[must_use = "should be `observe()`d"]
#[derive(Debug)]
pub(in crate::api_server) struct SubmitTxLatencyObserver<'a> {
inner: Option<LatencyObserver<'a>>,
tx_hash: H256,
stage: SubmitTxStage,
}

impl SubmitTxLatencyObserver<'_> {
pub fn set_stage(&mut self, stage: SubmitTxStage) {
self.stage = stage;
}

pub fn observe(mut self) {
static FILTER: ReportFilter = report_filter!(Duration::from_secs(10));
const MIN_LOGGED_LATENCY: Duration = Duration::from_secs(1);

let latency = self.inner.take().unwrap().observe();
// ^ `unwrap()` is safe: `LatencyObserver` is only taken out in this method.
if latency > MIN_LOGGED_LATENCY && FILTER.should_report() {
tracing::info!(
"Transaction {:?} submission stage {:?} has high latency: {latency:?}",
self.tx_hash,
self.stage
);
}
}
}

impl Drop for SubmitTxLatencyObserver<'_> {
fn drop(&mut self) {
static FILTER: ReportFilter = report_filter!(Duration::from_secs(10));

if self.inner.is_some() && FILTER.should_report() {
tracing::info!(
"Transaction {:?} submission was dropped at stage {:?} due to error or client disconnecting",
self.tx_hash,
self.stage
);
}
}
}

#[derive(Debug, Metrics)]
#[metrics(prefix = "api_web3")]
pub(in crate::api_server) struct SandboxMetrics {
Expand All @@ -103,11 +151,25 @@ pub(in crate::api_server) struct SandboxMetrics {
#[metrics(buckets = Buckets::linear(0.0..=2_000.0, 200.0))]
pub(super) sandbox_execution_permits: Histogram<usize>,
#[metrics(buckets = Buckets::LATENCIES)]
pub submit_tx: Family<SubmitTxStage, Histogram<Duration>>,
submit_tx: Family<SubmitTxStage, Histogram<Duration>>,
#[metrics(buckets = Buckets::linear(0.0..=30.0, 3.0))]
pub estimate_gas_binary_search_iterations: Histogram<usize>,
}

impl SandboxMetrics {
pub fn start_tx_submit_stage(
&self,
tx_hash: H256,
stage: SubmitTxStage,
) -> SubmitTxLatencyObserver<'_> {
SubmitTxLatencyObserver {
inner: Some(self.submit_tx[&stage].start()),
tx_hash,
stage,
}
}
}

#[vise::register]
pub(in crate::api_server) static SANDBOX_METRICS: vise::Global<SandboxMetrics> =
vise::Global::new();
Expand Down
2 changes: 2 additions & 0 deletions core/lib/zksync_core/src/api_server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Everywhere in this module the word "block" actually means "L2 block".

#[macro_use]
mod utils;
pub mod contract_verification;
pub mod execution_sandbox;
pub mod healthcheck;
Expand Down
53 changes: 22 additions & 31 deletions core/lib/zksync_core/src/api_server/tx_sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,16 +305,17 @@ impl TxSender {
.context("failed acquiring connection to replica DB")
}

#[tracing::instrument(skip(self, tx))]
#[tracing::instrument(level = "debug", skip_all, fields(tx.hash = ?tx.hash()))]
pub async fn submit_tx(&self, tx: L2Tx) -> Result<L2TxSubmissionResult, SubmitTxError> {
let stage_latency = SANDBOX_METRICS.submit_tx[&SubmitTxStage::Validate].start();
let tx_hash = tx.hash();
let stage_latency = SANDBOX_METRICS.start_tx_submit_stage(tx_hash, SubmitTxStage::Validate);
let mut connection = self.acquire_replica_connection().await?;
let protocol_version = pending_protocol_version(&mut connection).await?;
drop(connection);
self.validate_tx(&tx, protocol_version).await?;
stage_latency.observe();

let stage_latency = SANDBOX_METRICS.submit_tx[&SubmitTxStage::DryRun].start();
let stage_latency = SANDBOX_METRICS.start_tx_submit_stage(tx_hash, SubmitTxStage::DryRun);
let shared_args = self.shared_args().await?;
let vm_permit = self.0.vm_concurrency_limiter.acquire().await;
let vm_permit = vm_permit.ok_or(SubmitTxError::ServerShuttingDown)?;
Expand All @@ -336,15 +337,14 @@ impl TxSender {
vec![],
)
.await?;

tracing::info!(
"Submit tx {:?} with execution metrics {:?}",
tx.hash(),
"Submit tx {tx_hash:?} with execution metrics {:?}",
execution_output.metrics
);
stage_latency.observe();

let stage_latency = SANDBOX_METRICS.submit_tx[&SubmitTxStage::VerifyExecute].start();
let stage_latency =
SANDBOX_METRICS.start_tx_submit_stage(tx_hash, SubmitTxStage::VerifyExecute);
let computational_gas_limit = self.0.sender_config.validation_computational_gas_limit;
let validation_result = self
.0
Expand All @@ -367,9 +367,9 @@ impl TxSender {
return Err(SubmitTxError::FailedToPublishCompressedBytecodes);
}

let stage_started_at = Instant::now();
let mut stage_latency =
SANDBOX_METRICS.start_tx_submit_stage(tx_hash, SubmitTxStage::DbInsert);
self.ensure_tx_executable(&tx.clone().into(), &execution_output.metrics, true)?;

let submission_res_handle = self
.0
.tx_sink
Expand All @@ -396,13 +396,12 @@ impl TxSender {
}
L2TxSubmissionResult::InsertionInProgress => Err(SubmitTxError::InsertionInProgress),
L2TxSubmissionResult::Proxied => {
SANDBOX_METRICS.submit_tx[&SubmitTxStage::TxProxy]
.observe(stage_started_at.elapsed());
stage_latency.set_stage(SubmitTxStage::TxProxy);
stage_latency.observe();
Ok(submission_res_handle)
}
_ => {
SANDBOX_METRICS.submit_tx[&SubmitTxStage::DbInsert]
.observe(stage_started_at.elapsed());
stage_latency.observe();
Ok(submission_res_handle)
}
}
Expand Down Expand Up @@ -674,6 +673,10 @@ impl TxSender {
}
}

#[tracing::instrument(level = "debug", skip_all, fields(
initiator = ?tx.initiator_account(),
nonce = ?tx.nonce(),
))]
pub async fn get_txs_fee_in_wei(
&self,
mut tx: Transaction,
Expand Down Expand Up @@ -792,15 +795,9 @@ impl TxSender {
// the transaction succeeds
let mut lower_bound = 0;
let mut upper_bound = MAX_L2_TX_GAS_LIMIT;
let tx_id = format!(
"{:?}-{}",
tx.initiator_account(),
tx.nonce().unwrap_or(Nonce(0))
);
tracing::trace!(
"fee estimation tx {:?}: preparation took {:?}, starting binary search",
tx_id,
estimation_started_at.elapsed(),
"preparation took {:?}, starting binary search",
estimation_started_at.elapsed()
);

let mut number_of_iterations = 0usize;
Expand Down Expand Up @@ -832,12 +829,8 @@ impl TxSender {
}

tracing::trace!(
"fee estimation tx {:?}: iteration {} took {:?}. lower_bound: {}, upper_bound: {}",
tx_id,
number_of_iterations,
iteration_started_at.elapsed(),
lower_bound,
upper_bound,
"iteration {number_of_iterations} took {:?}. lower_bound: {lower_bound}, upper_bound: {upper_bound}",
iteration_started_at.elapsed()
);
number_of_iterations += 1;
}
Expand Down Expand Up @@ -896,10 +889,8 @@ impl TxSender {
let estimated_gas_for_pubdata =
(gas_for_pubdata as f64 * estimated_fee_scale_factor) as u64;

tracing::info!(
initiator = ?tx.initiator_account(),
nonce = %tx.nonce().unwrap_or(Nonce(0)),
"fee estimation: gas for pubdata: {estimated_gas_for_pubdata}, computational gas: {}, overhead gas: {overhead} \
tracing::debug!(
"gas for pubdata: {estimated_gas_for_pubdata}, computational gas: {}, overhead gas: {overhead} \
(with params base_fee: {base_fee}, gas_per_pubdata_byte: {gas_per_pubdata_byte}) \
estimated_fee_scale_factor: {estimated_fee_scale_factor}",
suggested_gas_limit - estimated_gas_for_pubdata,
Expand Down
53 changes: 53 additions & 0 deletions core/lib/zksync_core/src/api_server/utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
//! Utils specific to the API server.

use std::{
cell::Cell,
thread,
time::{Duration, Instant},
};

/// Allows filtering events (e.g., for logging) so that they are reported no more frequently than with a configurable interval.
///
/// Current implementation uses thread-local vars in order to not rely on mutexes or other cross-thread primitives.
/// I.e., it only really works if the number of threads accessing it is limited (which is the case for the API server;
/// the number of worker threads is congruent to the CPU count).
#[derive(Debug)]
pub(super) struct ReportFilter {
interval: Duration,
last_timestamp: &'static thread::LocalKey<Cell<Option<Instant>>>,
}

impl ReportFilter {
// Should only be used from the `report_filter!` macro.
pub const fn new(
interval: Duration,
last_timestamp: &'static thread::LocalKey<Cell<Option<Instant>>>,
) -> Self {
Self {
interval,
last_timestamp,
}
}

/// Should be called sparingly, since it involves moderately heavy operations (getting current time).
pub fn should_report(&self) -> bool {
let timestamp = self.last_timestamp.get();
let now = Instant::now();
if timestamp.map_or(true, |ts| now - ts > self.interval) {
self.last_timestamp.set(Some(now));
true
} else {
false
}
}
}

/// Creates a new filter with the specified reporting interval *per thread*.
macro_rules! report_filter {
($interval:expr) => {{
thread_local! {
static LAST_TIMESTAMP: std::cell::Cell<Option<std::time::Instant>> = std::cell::Cell::new(None);
}
ReportFilter::new($interval, &LAST_TIMESTAMP)
}};
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,13 @@ where
static CORRELATION_ID_RNG: RefCell<SmallRng> = RefCell::new(SmallRng::from_entropy());
}

// Unlike `MetadataMiddleware`, we don't need to extend the method lifetime to `'static`;
// `tracing` span instantiation allocates a `String` for supplied `&str`s in any case.
let method = request.method_name();
// Wrap a call into a span with unique correlation ID, so that events occurring in the span can be easily filtered.
// This works as a cheap alternative to Open Telemetry tracing with its trace / span IDs.
let correlation_id = CORRELATION_ID_RNG.with(|rng| rng.borrow_mut().next_u64());
let call_span = tracing::debug_span!("rpc_call", correlation_id);
let call_span = tracing::debug_span!("rpc_call", method, correlation_id);
self.inner.call(request).instrument(call_span)
}
}
Expand Down
48 changes: 3 additions & 45 deletions core/lib/zksync_core/src/api_server/web3/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
//! Metrics for the JSON-RPC server.

use std::{
borrow::Cow,
cell::Cell,
fmt, thread,
time::{Duration, Instant},
};
use std::{borrow::Cow, fmt, time::Duration};

use vise::{
Buckets, Counter, DurationAsSecs, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Histogram,
Expand All @@ -18,44 +13,7 @@ use super::{
backend_jsonrpsee::MethodMetadata, ApiTransport, InternalApiConfig, OptionalApiParams,
TypedFilter,
};

/// Allows filtering events (e.g., for logging) so that they are reported no more frequently than with a configurable interval.
///
/// Current implementation uses thread-local vars in order to not rely on mutexes or other cross-thread primitives.
/// I.e., it only really works if the number of threads accessing it is limited (which is the case for the API server;
/// the number of worker threads is congruent to the CPU count).
#[derive(Debug)]
struct ReportFilter {
interval: Duration,
last_timestamp: &'static thread::LocalKey<Cell<Option<Instant>>>,
}

impl ReportFilter {
/// Should be called sparingly, since it involves moderately heavy operations (getting current time).
fn should_report(&self) -> bool {
let timestamp = self.last_timestamp.get();
let now = Instant::now();
if timestamp.map_or(true, |ts| now - ts > self.interval) {
self.last_timestamp.set(Some(now));
true
} else {
false
}
}
}

/// Creates a new filter with the specified reporting interval *per thread*.
macro_rules! report_filter {
($interval:expr) => {{
thread_local! {
static LAST_TIMESTAMP: Cell<Option<Instant>> = Cell::new(None);
}
ReportFilter {
interval: $interval,
last_timestamp: &LAST_TIMESTAMP,
}
}};
}
use crate::api_server::utils::ReportFilter;

/// Observed version of RPC parameters. Have a bounded upper-limit size (256 bytes), so that we don't over-allocate.
#[derive(Debug)]
Expand Down Expand Up @@ -429,7 +387,7 @@ impl ApiMetrics {
// Log internal error details.
match err {
Web3Error::InternalError(err) => {
tracing::error!("Internal error in method `{method}`: {err}");
tracing::error!("Internal error in method `{method}`: {err:#}");
}
Web3Error::ProxyError(err) => {
tracing::warn!("Error proxying call to main node in method `{method}`: {err}");
Expand Down