Skip to content

Commit

Permalink
feat:authentication analytics - separated from sdk events
Browse files Browse the repository at this point in the history
  • Loading branch information
vsrivatsa-juspay committed Apr 22, 2024
1 parent 493ac36 commit b241cb5
Show file tree
Hide file tree
Showing 24 changed files with 1,252 additions and 9 deletions.
7 changes: 7 additions & 0 deletions crates/analytics/src/auth_events.rs
@@ -0,0 +1,7 @@
pub mod accumulator;
mod core;
pub mod metrics;
pub use accumulator::{AuthEventMetricAccumulator, AuthEventMetricsAccumulator};
pub trait AuthEventAnalytics: metrics::AuthEventMetricAnalytics {}

pub use self::core::get_metrics;
96 changes: 96 additions & 0 deletions crates/analytics/src/auth_events/accumulator.rs
@@ -0,0 +1,96 @@
use api_models::analytics::auth_events::AuthEventMetricsBucketValue;
use router_env::logger;

use super::metrics::AuthEventMetricRow;

#[derive(Debug, Default)]
pub struct AuthEventMetricsAccumulator {
pub three_ds_sdk_count: CountAccumulator,
pub authentication_attempt_count: CountAccumulator,
pub authentication_success_count: CountAccumulator,
pub challenge_flow_count: CountAccumulator,
pub challenge_attempt_count: CountAccumulator,
pub challenge_success_count: CountAccumulator,
pub frictionless_flow_count: CountAccumulator,
}

#[derive(Debug, Default)]
#[repr(transparent)]
pub struct CountAccumulator {
pub count: Option<i64>,
}

#[derive(Debug, Default)]
pub struct AverageAccumulator {
pub total: u32,
pub count: u32,
}

pub trait AuthEventMetricAccumulator {
type MetricOutput;

fn add_metrics_bucket(&mut self, metrics: &AuthEventMetricRow);

fn collect(self) -> Self::MetricOutput;
}

impl AuthEventMetricAccumulator for CountAccumulator {
type MetricOutput = Option<u64>;
#[inline]
fn add_metrics_bucket(&mut self, metrics: &AuthEventMetricRow) {
self.count = match (self.count, metrics.count) {
(None, None) => None,
(None, i @ Some(_)) | (i @ Some(_), None) => i,
(Some(a), Some(b)) => Some(a + b),
}
}
#[inline]
fn collect(self) -> Self::MetricOutput {
self.count.and_then(|i| u64::try_from(i).ok())
}
}

impl AuthEventMetricAccumulator for AverageAccumulator {
type MetricOutput = Option<f64>;

fn add_metrics_bucket(&mut self, metrics: &AuthEventMetricRow) {
let total = metrics
.total
.as_ref()
.and_then(bigdecimal::ToPrimitive::to_u32);
let count = metrics.count.and_then(|total| u32::try_from(total).ok());

match (total, count) {
(Some(total), Some(count)) => {
self.total += total;
self.count += count;
}
_ => {
logger::error!(message="Dropping metrics for average accumulator", metric=?metrics);
}
}
}

fn collect(self) -> Self::MetricOutput {
if self.count == 0 {
None
} else {
Some(f64::from(self.total) / f64::from(self.count))
}
}
}

impl AuthEventMetricsAccumulator {
#[allow(dead_code)]
pub fn collect(self) -> AuthEventMetricsBucketValue {
AuthEventMetricsBucketValue {
three_ds_sdk_count: self.three_ds_sdk_count.collect(),
authentication_attempt_count: self.authentication_attempt_count.collect(),
authentication_success_count: self.authentication_success_count.collect(),
challenge_flow_count: self.challenge_flow_count.collect(),
challenge_attempt_count: self.challenge_attempt_count.collect(),
challenge_success_count: self.challenge_success_count.collect(),
frictionless_flow_count: self.frictionless_flow_count.collect(),
}
}
}
115 changes: 115 additions & 0 deletions crates/analytics/src/auth_events/core.rs
@@ -0,0 +1,115 @@
use std::collections::HashMap;

use api_models::analytics::{
auth_events::{AuthEventMetrics, AuthEventMetricsBucketIdentifier, MetricsBucketResponse},
AnalyticsMetadata, GetAuthEventMetricRequest, MetricsResponse,
};
use error_stack::ResultExt;
use router_env::{instrument, logger, tracing};

use super::AuthEventMetricsAccumulator;
use crate::{
auth_events::AuthEventMetricAccumulator,
errors::{AnalyticsError, AnalyticsResult},
AnalyticsProvider,
};

#[instrument(skip_all)]
pub async fn get_metrics(
pool: &AnalyticsProvider,
merchant_id: &String,
publishable_key: Option<&String>,
req: GetAuthEventMetricRequest,
) -> AnalyticsResult<MetricsResponse<MetricsBucketResponse>> {
let mut metrics_accumulator: HashMap<
AuthEventMetricsBucketIdentifier,
AuthEventMetricsAccumulator,
> = HashMap::new();

if let Some(publishable_key) = publishable_key {
let mut set = tokio::task::JoinSet::new();
for metric_type in req.metrics.iter().cloned() {
let req = req.clone();
let merchant_id_scoped = merchant_id.to_owned();
let publishable_key_scoped = publishable_key.to_owned();
let pool = pool.clone();
set.spawn(async move {
let data = pool
.get_auth_event_metrics(
&metric_type,
&merchant_id_scoped,
&publishable_key_scoped,
&req.time_series.map(|t| t.granularity),
&req.time_range,
)
.await
.change_context(AnalyticsError::UnknownError);
(metric_type, data)
});
}

while let Some((metric, data)) = set
.join_next()
.await
.transpose()
.change_context(AnalyticsError::UnknownError)?
{
logger::info!("Logging Result {:?}", data);
for (id, value) in data? {
let metrics_builder = metrics_accumulator.entry(id).or_default();
match metric {
AuthEventMetrics::ThreeDsSdkCount => metrics_builder
.three_ds_sdk_count
.add_metrics_bucket(&value),
AuthEventMetrics::AuthenticationAttemptCount => metrics_builder
.authentication_attempt_count
.add_metrics_bucket(&value),
AuthEventMetrics::AuthenticationSuccessCount => metrics_builder
.authentication_success_count
.add_metrics_bucket(&value),
AuthEventMetrics::ChallengeFlowCount => metrics_builder
.challenge_flow_count
.add_metrics_bucket(&value),
AuthEventMetrics::ChallengeAttemptCount => metrics_builder
.challenge_attempt_count
.add_metrics_bucket(&value),
AuthEventMetrics::ChallengeSuccessCount => metrics_builder
.challenge_success_count
.add_metrics_bucket(&value),
AuthEventMetrics::FrictionlessFlowCount => metrics_builder
.frictionless_flow_count
.add_metrics_bucket(&value),
}
}

logger::debug!(
"Analytics Accumulated Results: metric: {}, results: {:#?}",
metric,
metrics_accumulator
);
}

let query_data: Vec<MetricsBucketResponse> = metrics_accumulator
.into_iter()
.map(|(id, val)| MetricsBucketResponse {
values: val.collect(),
dimensions: id,
})
.collect();

Ok(MetricsResponse {
query_data,
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
} else {
logger::error!("Publishable key not present for merchant ID");
Ok(MetricsResponse {
query_data: vec![],
meta_data: [AnalyticsMetadata {
current_time_range: req.time_range,
}],
})
}
}
114 changes: 114 additions & 0 deletions crates/analytics/src/auth_events/metrics.rs
@@ -0,0 +1,114 @@
use api_models::analytics::{
auth_events::{AuthEventMetrics, AuthEventMetricsBucketIdentifier},
Granularity, TimeRange,
};
use time::PrimitiveDateTime;

use crate::{
query::{Aggregate, GroupByClause, ToSql, Window},
types::{AnalyticsCollection, AnalyticsDataSource, LoadRow, MetricsResult},
};

mod authentication_attempt_count;
mod authentication_success_count;
mod challenge_attempt_count;
mod challenge_flow_count;
mod challenge_success_count;
mod frictionless_flow_count;
mod three_ds_sdk_count;

use authentication_attempt_count::AuthenticationAttemptCount;
use authentication_success_count::AuthenticationSuccessCount;
use challenge_attempt_count::ChallengeAttemptCount;
use challenge_flow_count::ChallengeFlowCount;
use challenge_success_count::ChallengeSuccessCount;
use frictionless_flow_count::FrictionlessFlowCount;
use three_ds_sdk_count::ThreeDsSdkCount;

#[derive(Debug, PartialEq, Eq, serde::Deserialize)]
pub struct AuthEventMetricRow {
pub total: Option<bigdecimal::BigDecimal>,
pub count: Option<i64>,
pub time_bucket: Option<String>,
pub payment_method: Option<String>,
pub platform: Option<String>,
pub browser_name: Option<String>,
pub source: Option<String>,
pub component: Option<String>,
pub payment_experience: Option<String>,
}

pub trait AuthEventMetricAnalytics: LoadRow<AuthEventMetricRow> {}

#[async_trait::async_trait]
pub trait AuthEventMetric<T>
where
T: AnalyticsDataSource + AuthEventMetricAnalytics,
{
async fn load_metrics(
&self,
merchant_id: &str,
publishable_key: &str,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>>;
}

#[async_trait::async_trait]
impl<T> AuthEventMetric<T> for AuthEventMetrics
where
T: AnalyticsDataSource + AuthEventMetricAnalytics,
PrimitiveDateTime: ToSql<T>,
AnalyticsCollection: ToSql<T>,
Granularity: GroupByClause<T>,
Aggregate<&'static str>: ToSql<T>,
Window<&'static str>: ToSql<T>,
{
async fn load_metrics(
&self,
merchant_id: &str,
publishable_key: &str,
granularity: &Option<Granularity>,
time_range: &TimeRange,
pool: &T,
) -> MetricsResult<Vec<(AuthEventMetricsBucketIdentifier, AuthEventMetricRow)>> {
match self {
Self::ThreeDsSdkCount => {
ThreeDsSdkCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
Self::AuthenticationAttemptCount => {
AuthenticationAttemptCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
Self::AuthenticationSuccessCount => {
AuthenticationSuccessCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
Self::ChallengeFlowCount => {
ChallengeFlowCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
Self::ChallengeAttemptCount => {
ChallengeAttemptCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
Self::ChallengeSuccessCount => {
ChallengeSuccessCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
Self::FrictionlessFlowCount => {
FrictionlessFlowCount
.load_metrics(merchant_id, publishable_key, granularity, time_range, pool)
.await
}
}
}
}

0 comments on commit b241cb5

Please sign in to comment.