Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: add profiles to in mem #4441

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
105 changes: 84 additions & 21 deletions crates/router/src/db/business_profile.rs
@@ -1,5 +1,7 @@
use error_stack::report;
use router_env::{instrument, tracing};
#[cfg(feature = "accounts_cache")]
use storage_impl::redis::cache::{CacheKind, ACCOUNTS_CACHE};

use super::Store;
use crate::{
Expand Down Expand Up @@ -64,10 +66,21 @@ impl BusinessProfileInterface for Store {
&self,
profile_id: &str,
) -> CustomResult<business_profile::BusinessProfile, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::business_profile::BusinessProfile::find_by_profile_id(&conn, profile_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
let db_func = || async {
let conn = connection::pg_connection_read(self).await?;
storage::business_profile::BusinessProfile::find_by_profile_id(&conn, profile_id)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
};
#[cfg(not(feature = "accounts_cache"))]
{
db_func().await
}
#[cfg(feature = "accounts_cache")]
{
super::cache::get_or_populate_in_memory(self, profile_id, db_func, &ACCOUNTS_CACHE)
.await
}
}

#[instrument(skip_all)]
Expand All @@ -76,14 +89,27 @@ impl BusinessProfileInterface for Store {
profile_name: &str,
merchant_id: &str,
) -> CustomResult<business_profile::BusinessProfile, errors::StorageError> {
let conn = connection::pg_connection_read(self).await?;
storage::business_profile::BusinessProfile::find_by_profile_name_merchant_id(
&conn,
profile_name,
merchant_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
let db_func = || async {
let conn = connection::pg_connection_read(self).await?;
storage::business_profile::BusinessProfile::find_by_profile_name_merchant_id(
&conn,
profile_name,
merchant_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
};

#[cfg(feature = "accounts_cache")]
{
let key = format!("{}_{}", profile_name, merchant_id);
super::cache::get_or_populate_in_memory(self, &key, db_func, &ACCOUNTS_CACHE).await
}

#[cfg(not(feature = "accounts_cache"))]
{
db_func().await
}
}

#[instrument(skip_all)]
Expand All @@ -93,13 +119,19 @@ impl BusinessProfileInterface for Store {
business_profile_update: business_profile::BusinessProfileUpdateInternal,
) -> CustomResult<business_profile::BusinessProfile, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::business_profile::BusinessProfile::update_by_profile_id(
let updated_profile = storage::business_profile::BusinessProfile::update_by_profile_id(
current_state,
&conn,
business_profile_update,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
.map_err(|error| report!(errors::StorageError::from(error)))?;

#[cfg(feature = "accounts_cache")]
{
publish_and_redact_business_profile_cache(self, &updated_profile).await?;
}
Ok(updated_profile)
}

#[instrument(skip_all)]
Expand All @@ -109,13 +141,27 @@ impl BusinessProfileInterface for Store {
merchant_id: &str,
) -> CustomResult<bool, errors::StorageError> {
let conn = connection::pg_connection_write(self).await?;
storage::business_profile::BusinessProfile::delete_by_profile_id_merchant_id(
&conn,
profile_id,
merchant_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
let db_func = || async {
storage::business_profile::BusinessProfile::delete_by_profile_id_merchant_id(
&conn,
profile_id,
merchant_id,
)
.await
.map_err(|error| report!(errors::StorageError::from(error)))
};

#[cfg(not(feature = "accounts_cache"))]
{
db_func().await
}

#[cfg(feature = "accounts_cache")]
{
let business_profile = self.find_business_profile_by_profile_id(profile_id).await?;
publish_and_redact_business_profile_cache(self, &business_profile).await?;
db_func().await
}
}

#[instrument(skip_all)]
Expand All @@ -133,6 +179,23 @@ impl BusinessProfileInterface for Store {
}
}

#[cfg(feature = "accounts_cache")]
async fn publish_and_redact_business_profile_cache(
store: &dyn super::StorageInterface,
business_profile: &business_profile::BusinessProfile,
) -> CustomResult<(), errors::StorageError> {
let key1 = CacheKind::Accounts(business_profile.profile_id.as_str().into());
let str_key = format!(
"{}_{}",
business_profile.profile_name.as_str(),
business_profile.merchant_id.as_str()
);
let key2 = CacheKind::Accounts(str_key.as_str().into());
let keys = vec![key1, key2];
super::cache::publish_into_redact_channel(store, keys).await?;
Ok(())
}

#[async_trait::async_trait]
impl BusinessProfileInterface for MockDb {
async fn insert_business_profile(
Expand Down