Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix logging stack overflow #4423

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/router/src/routes/health.rs
Expand Up @@ -16,6 +16,7 @@ use crate::{
pub async fn health() -> impl actix_web::Responder {
metrics::HEALTH_METRIC.add(&metrics::CONTEXT, 1, &[]);
logger::info!("Health was called");

actix_web::HttpResponse::Ok().body("health is good")
}

Expand Down
107 changes: 53 additions & 54 deletions crates/router_env/src/logger/formatter.rs
Expand Up @@ -58,7 +58,6 @@ const SESSION_ID: &str = "session_id";
pub static IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(|| {
let mut set = rustc_hash::FxHashSet::default();

set.insert(MESSAGE);
set.insert(HOSTNAME);
set.insert(PID);
set.insert(ENV);
Expand All @@ -81,6 +80,7 @@ pub static IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(|| {
pub static EXTRA_IMPLICIT_KEYS: Lazy<rustc_hash::FxHashSet<&str>> = Lazy::new(|| {
let mut set = rustc_hash::FxHashSet::default();

set.insert(MESSAGE);
set.insert(FLOW);
set.insert(MERCHANT_AUTH);
set.insert(MERCHANT_ID);
Expand Down Expand Up @@ -163,7 +163,7 @@ where
pub fn new_with_implicit_entries(
service: &str,
dst_writer: W,
default_fields: HashMap<String, Value>,
mut default_fields: HashMap<String, Value>,
formatter: F,
) -> Self {
let pid = std::process::id();
Expand All @@ -174,6 +174,21 @@ where
#[cfg(feature = "vergen")]
let build = crate::build!().to_string();
let env = crate::env::which().to_string();
default_fields = default_fields
.into_iter()
.filter(|(key, value)| {
if !IMPLICIT_KEYS.contains(key.as_str()) {
true
} else {
tracing::warn!(
?key,
?value,
"Attempting to log a reserved entry. It won't be added to the logs"
);
false
}
})
.collect();

Self {
dst_writer,
Expand All @@ -196,17 +211,15 @@ where
map_serializer: &mut impl SerializeMap<Error = serde_json::Error>,
metadata: &Metadata<'_>,
span: Option<&SpanRef<'_, S>>,
storage: Option<&Storage<'_>>,
storage: &Storage<'_>,
name: &str,
message: &str,
) -> Result<(), std::io::Error>
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let is_extra = |s: &str| !IMPLICIT_KEYS.contains(s);
let is_extra_implicit = |s: &str| is_extra(s) && EXTRA_IMPLICIT_KEYS.contains(s);

map_serializer.serialize_entry(MESSAGE, &message)?;
map_serializer.serialize_entry(HOSTNAME, &self.hostname)?;
map_serializer.serialize_entry(PID, &self.pid)?;
map_serializer.serialize_entry(ENV, &self.env)?;
Expand All @@ -228,30 +241,30 @@ where

// Write down implicit default entries.
for (key, value) in self.default_fields.iter() {
if !IMPLICIT_KEYS.contains(key.as_str()) {
map_serializer.serialize_entry(key, value)?;
} else {
tracing::warn!("{} is a reserved field. Skipping it.", key);
}
map_serializer.serialize_entry(key, value)?;
}

#[cfg(feature = "log_custom_entries_to_extra")]
let mut extra = serde_json::Map::default();
let mut explicit_entries_set: HashSet<&str> = HashSet::default();
// Write down explicit event's entries.
if let Some(storage) = storage {
for (key, value) in storage.values.iter() {
if is_extra_implicit(key) {
#[cfg(feature = "log_extra_implicit_fields")]
map_serializer.serialize_entry(key, value)?;
explicit_entries_set.insert(key);
} else if is_extra(key) {
#[cfg(feature = "log_custom_entries_to_extra")]
extra.insert(key.to_string(), value.clone());
#[cfg(not(feature = "log_custom_entries_to_extra"))]
map_serializer.serialize_entry(key, value)?;
explicit_entries_set.insert(key);
}
for (key, value) in storage.values.iter() {
if is_extra_implicit(key) {
#[cfg(feature = "log_extra_implicit_fields")]
map_serializer.serialize_entry(key, value)?;
explicit_entries_set.insert(key);
} else if is_extra(key) {
#[cfg(feature = "log_custom_entries_to_extra")]
extra.insert(key.to_string(), value.clone());
#[cfg(not(feature = "log_custom_entries_to_extra"))]
map_serializer.serialize_entry(key, value)?;
explicit_entries_set.insert(key);
} else {
tracing::warn!(
?key,
?value,
"Attempting to log a reserved entry. It won't be added to the logs"
);
}
}

Expand All @@ -269,7 +282,11 @@ where
#[cfg(not(feature = "log_custom_entries_to_extra"))]
map_serializer.serialize_entry(key, value)?;
} else {
tracing::debug!("{} is a reserved entry. Skipping it.", key);
tracing::warn!(
?key,
?value,
"Attempting to log a reserved entry. It won't be added to the logs"
);
}
}
}
Expand Down Expand Up @@ -305,14 +322,15 @@ where
serde_json::Serializer::with_formatter(&mut buffer, self.formatter.clone());
let mut map_serializer = serializer.serialize_map(None)?;
let message = Self::span_message(span, ty);
let mut storage = Storage::default();
storage.record_value("message", message.into());

self.common_serialize(
&mut map_serializer,
span.metadata(),
Some(span),
None,
&storage,
span.name(),
&message,
)?;

map_serializer.end()?;
Expand All @@ -337,16 +355,9 @@ where
event.record(&mut storage);

let name = span.map_or("?", SpanRef::name);
let message = Self::event_message(span, event, &storage);
Self::event_message(span, event, &mut storage);

self.common_serialize(
&mut map_serializer,
event.metadata(),
*span,
Some(&storage),
name,
&message,
)?;
self.common_serialize(&mut map_serializer, event.metadata(), *span, &storage, name)?;

map_serializer.end()?;
Ok(buffer)
Expand Down Expand Up @@ -374,32 +385,20 @@ where
fn event_message<S>(
span: &Option<&SpanRef<'_, S>>,
event: &Event<'_>,
storage: &Storage<'_>,
) -> String
where
storage: &mut Storage<'_>,
) where
S: Subscriber + for<'a> LookupSpan<'a>,
{
// Get value of kept "message" or "target" if does not exist.
let mut message = storage
let message = storage
.values
.get("message")
.and_then(|v| match v {
Value::String(s) => Some(s.as_str()),
_ => None,
})
.unwrap_or_else(|| event.metadata().target())
.to_owned();
.entry("message")
.or_insert_with(|| event.metadata().target().into());

// Prepend the span name to the message if span exists.
if let Some(span) = span {
message = format!(
"{} {}",
Self::span_message(span, RecordType::Event),
message,
);
if let (Some(span), Value::String(a)) = (span, message) {
*a = format!("{} {}", Self::span_message(span, RecordType::Event), a,).into();
}

message
}
}

Expand Down
32 changes: 17 additions & 15 deletions crates/router_env/src/logger/storage.rs
Expand Up @@ -28,6 +28,14 @@ impl<'a> Storage<'a> {
pub fn new() -> Self {
Self::default()
}

pub fn record_value(&mut self, key: &'a str, value: serde_json::Value) {
if super::formatter::IMPLICIT_KEYS.contains(key) {
tracing::warn!(value =? value, "{} is a reserved entry. Skipping it.", key);
} else {
self.values.insert(key, value);
}
}
}

/// Default constructor.
Expand All @@ -43,32 +51,27 @@ impl Default for Storage<'_> {
impl Visit for Storage<'_> {
/// A i64.
fn record_i64(&mut self, field: &Field, value: i64) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A u64.
fn record_u64(&mut self, field: &Field, value: u64) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A 64-bit floating point.
fn record_f64(&mut self, field: &Field, value: f64) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A boolean.
fn record_bool(&mut self, field: &Field, value: bool) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// A string.
fn record_str(&mut self, field: &Field, value: &str) {
self.values
.insert(field.name(), serde_json::Value::from(value));
self.record_value(field.name(), serde_json::Value::from(value));
}

/// Otherwise.
Expand All @@ -77,16 +80,15 @@ impl Visit for Storage<'_> {
// Skip fields which are already handled
name if name.starts_with("log.") => (),
name if name.starts_with("r#") => {
self.values.insert(
self.record_value(
#[allow(clippy::expect_used)]
name.get(2..)
.expect("field name must have a minimum of two characters"),
serde_json::Value::from(format!("{value:?}")),
);
}
name => {
self.values
.insert(name, serde_json::Value::from(format!("{value:?}")));
self.record_value(name, serde_json::Value::from(format!("{value:?}")));
}
};
}
Expand Down Expand Up @@ -165,7 +167,7 @@ impl<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>> Layer
span.parent().and_then(|p| {
p.extensions_mut()
.get_mut::<Storage<'_>>()
.map(|s| s.values.insert(k, v.to_owned()))
.map(|s| s.record_value(k, v.to_owned()))
});
}
})
Expand All @@ -178,7 +180,7 @@ impl<S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>> Layer
.expect("No visitor in extensions");

if let Ok(elapsed) = serde_json::to_value(elapsed_milliseconds) {
visitor.values.insert("elapsed_milliseconds", elapsed);
visitor.record_value("elapsed_milliseconds", elapsed);
}
}
}