(feat) balance

This commit is contained in:
Rorik Star Platinum 2025-11-07 21:22:52 +03:00
parent 779ae4d498
commit aeb9514aa3
15 changed files with 461 additions and 26 deletions

View file

@ -0,0 +1,22 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT consent_id\n FROM user_consents\n WHERE bank_code = $1 AND expires_at > NOW() AND status = 'active'\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "consent_id",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false
]
},
"hash": "c76f312865fd8d9dd42b09448be7a81c218f77f35fd20e28bf178bc2d8c92d49"
}

View file

@ -0,0 +1,34 @@
{
"db_name": "PostgreSQL",
"query": "\n SELECT amount, currency, balance_type\n FROM balances\n WHERE account_id = $1\n ORDER BY date_time DESC\n LIMIT 1\n ",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "amount",
"type_info": "Text"
},
{
"ordinal": 1,
"name": "currency",
"type_info": "Varchar"
},
{
"ordinal": 2,
"name": "balance_type",
"type_info": "Varchar"
}
],
"parameters": {
"Left": [
"Text"
]
},
"nullable": [
false,
false,
false
]
},
"hash": "ec6e46778012d093e99364fc0e46cbbe03f9a54e22f02837ccd6332ebe66d4cc"
}

View file

@ -0,0 +1,24 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO transactions \n (transaction_id, account_id, bank_code, amount, currency, credit_debit_indicator, \n status, booking_date_time, value_date_time, transaction_information, bank_transaction_code)\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)\n ON CONFLICT (transaction_id, bank_code)\n DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Varchar",
"Varchar",
"Text",
"Varchar",
"Varchar",
"Varchar",
"Timestamptz",
"Timestamptz",
"Text",
"Varchar"
]
},
"nullable": []
},
"hash": "ef51346809b942caa815860988e496f4ad93973efd79c8fd0ddd9861b26a6357"
}

View file

@ -0,0 +1,18 @@
{
"db_name": "PostgreSQL",
"query": "\n INSERT INTO balances \n (account_id, balance_type, amount, currency, date_time)\n VALUES ($1, $2, $3, $4, $5)\n ON CONFLICT (account_id, balance_type)\n DO UPDATE SET amount = $3, date_time = $5\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Varchar",
"Varchar",
"Text",
"Varchar",
"Timestamptz"
]
},
"nullable": []
},
"hash": "f486b903acc5ff7d9adbb0be711fb66876a3190ab7c8e382fb453a8fbf4db77e"
}

View file

@ -51,7 +51,7 @@ CREATE TABLE IF NOT EXISTS transactions (
account_id VARCHAR(50) NOT NULL,
bank_code VARCHAR(20) NOT NULL,
amount NUMERIC(15, 2) NOT NULL,
amount TEXT NOT NULL,
currency VARCHAR(3) NOT NULL,
credit_debit_indicator VARCHAR(10) NOT NULL,
status VARCHAR(20) NOT NULL,

View file

@ -0,0 +1,15 @@
-- migrations/XXXXXX_add_balances_table.sql
CREATE TABLE IF NOT EXISTS balances (
id SERIAL PRIMARY KEY,
account_id VARCHAR(50) NOT NULL,
balance_type VARCHAR(50) NOT NULL,
amount TEXT NOT NULL,
currency VARCHAR(3) NOT NULL,
date_time TIMESTAMPTZ NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
UNIQUE(account_id, balance_type)
);
CREATE INDEX IF NOT EXISTS idx_balances_account_id ON balances(account_id);

View file

@ -1,7 +1,9 @@
// src/api/accounts.rs
// Account data retrieval
use super::{client::{BankClient, BankingError}, models::{ApiResponse, AccountData, TransactionData}};
use super::{client::{BankClient, BankingError}, models::{ApiResponse, AccountData, BalanceData, BalanceResponse}};
use tracing::{info, error, debug};
impl BankClient {
pub async fn get_accounts(
@ -9,7 +11,10 @@ impl BankClient {
client_id: &str,
consent_id: &str,
) -> Result<ApiResponse<AccountData>, BankingError> {
info!("📋 Fetching accounts for client_id: {}", client_id);
let token = self.get_token().await?;
debug!("✅ Got bank token");
let response = self.http_client
.get(self.base_url.join("/accounts")?)
@ -20,13 +25,83 @@ impl BankClient {
.send()
.await?;
let status = response.status();
debug!("📥 Response status: {}", status);
match response.status().is_success() {
true => response.json().await.map_err(Into::into),
false => Err(BankingError::ApiError {
status: response.status().as_u16(),
body: response.text().await.unwrap_or_default(),
}),
true => {
let text = response.text().await?;
info!("✅ Accounts response received");
serde_json::from_str::<ApiResponse<AccountData>>(&text)
.map_err(|e| {
error!("❌ Failed to deserialize AccountData: {}", e);
BankingError::ApiError {
status: 500,
body: format!("Deserialization error: {}", e),
}
})
},
false => {
let error_body = response.text().await.unwrap_or_default();
error!("❌ Bank API error status {}: {}", status, error_body);
Err(BankingError::ApiError {
status: status.as_u16(),
body: error_body,
})
}
}
}
pub async fn get_balances(
&self,
account_id: &str,
consent_id: &str,
) -> Result<BalanceResponse, BankingError> { // ← Changed return type!
info!("📊 Fetching balances for account: {}", account_id);
let token = self.get_token().await?;
debug!("✅ Got bank token");
let url = self.base_url.join(&format!("/accounts/{}/balances", account_id))?;
debug!("📍 Calling: {}", url);
let response = self.http_client
.get(url)
.bearer_auth(token)
.header("x-consent-id", consent_id)
.header("x-requesting-bank", self.client_id.as_str())
.send()
.await?;
let status = response.status();
debug!("📥 Response status: {}", status);
match response.status().is_success() {
true => {
let text = response.text().await?;
info!("✅ Balance response received");
serde_json::from_str::<BalanceResponse>(&text) // ← Parse as BalanceResponse
.map_err(|e| {
error!("❌ Failed to deserialize BalanceResponse: {}", e);
error!("Raw: {}", text);
BankingError::ApiError {
status: 500,
body: format!("Deserialization error: {}", e),
}
})
},
false => {
let error_body = response.text().await.unwrap_or_default();
error!("❌ Bank API error status {}: {}", status, error_body);
Err(BankingError::ApiError {
status: status.as_u16(),
body: error_body,
})
}
}
}
}

View file

@ -81,6 +81,11 @@ pub struct AccountIdentification {
pub name: String,
}
#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct BalanceResponse {
pub data: BalanceData,
}
#[derive(Debug, Deserialize, Serialize, Clone)] // Added Serialize here
pub struct BalanceData {
pub balance: Vec<Balance>,

View file

@ -4,6 +4,7 @@ pub mod consents;
pub mod accounts;
pub mod transactions;
pub mod users;
pub mod balances;
use sqlx::postgres::PgPoolOptions;
use sqlx::PgPool;

52
src/db/balances.rs Normal file
View file

@ -0,0 +1,52 @@
// src/db/balances.rs
use sqlx::PgPool;
use chrono::{DateTime, Utc};
pub async fn store_balance(
pool: &PgPool,
account_id: &str,
balance_type: &str,
amount: &str,
currency: &str,
date_time: DateTime<Utc>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO balances
(account_id, balance_type, amount, currency, date_time)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (account_id, balance_type)
DO UPDATE SET amount = $3, date_time = $5
"#,
account_id,
balance_type,
amount,
currency,
date_time,
)
.execute(pool)
.await?;
Ok(())
}
pub async fn get_balance(
pool: &PgPool,
account_id: &str,
) -> Result<Option<(String, String, String)>, sqlx::Error> {
// Returns (amount, currency, balance_type)
sqlx::query!(
r#"
SELECT amount, currency, balance_type
FROM balances
WHERE account_id = $1
ORDER BY date_time DESC
LIMIT 1
"#,
account_id
)
.fetch_optional(pool)
.await
.map(|row| row.map(|r| (r.amount, r.currency, r.balance_type)))
}

View file

@ -57,6 +57,25 @@ pub async fn get_valid_consent(
Ok(result.map(|r| r.consent_id))
}
pub async fn get_first_valid_consent(
pool: &PgPool,
bank_code: &str,
) -> Result<Option<String>, sqlx::Error> {
let result = sqlx::query!(
r#"
SELECT consent_id
FROM user_consents
WHERE bank_code = $1 AND expires_at > NOW() AND status = 'active'
LIMIT 1
"#,
bank_code
)
.fetch_optional(pool)
.await?;
Ok(result.map(|r| r.consent_id))
}
pub async fn delete_consent(
pool: &PgPool,
consent_id: &str,

View file

@ -0,0 +1,70 @@
// src/db/transactions.rs
use sqlx::PgPool;
use chrono::{DateTime, Utc};
use tracing::info;
pub async fn store_transaction(
pool: &PgPool,
transaction_id: &str,
account_id: &str,
bank_code: &str,
amount: &str,
currency: &str,
credit_debit_indicator: &str,
status: &str,
booking_date_time: DateTime<Utc>,
value_date_time: Option<DateTime<Utc>>,
transaction_information: &str,
bank_transaction_code: Option<&str>,
) -> Result<(), sqlx::Error> {
sqlx::query!(
r#"
INSERT INTO transactions
(transaction_id, account_id, bank_code, amount, currency, credit_debit_indicator,
status, booking_date_time, value_date_time, transaction_information, bank_transaction_code)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (transaction_id, bank_code)
DO NOTHING
"#,
transaction_id,
account_id,
bank_code,
amount,
currency,
credit_debit_indicator,
status,
booking_date_time,
value_date_time,
transaction_information,
bank_transaction_code,
)
.execute(pool)
.await?;
info!("✅ Stored transaction: {}", transaction_id);
Ok(())
}
pub async fn get_cached_transactions(
pool: &PgPool,
account_id: &str,
page: u32,
limit: u32,
) -> Result<Vec<String>, sqlx::Error> {
sqlx::query_scalar::<_, String>(
r#"
SELECT transaction_id
FROM transactions
WHERE account_id = $1
ORDER BY booking_date_time DESC
LIMIT $2 OFFSET $3
"#,
)
.bind(account_id)
.bind(limit as i64)
.bind(((page - 1) * limit) as i64)
.fetch_all(pool)
.await
}

View file

@ -26,6 +26,9 @@ pub fn router(app_state: AppState) -> Router {
.route("/api/transactions/{bank}/{user_id}/{account_id}",
get(handlers::get_transactions_handler)
)
.route("/api/balances/{bank}/{account_id}",
get(handlers::get_balances_handler)
)
.layer(middleware::from_fn(auth::auth_middleware));
// Merge both

View file

@ -15,6 +15,8 @@ use crate::{
db,
};
use tracing::info;
// --- Health Check ---
pub async fn health_handler(
@ -185,12 +187,33 @@ pub async fn get_transactions_handler(
let client = state.banking_clients.get_client(bank);
client.get_transactions(&account_id, &consent_id, params.page, params.limit)
let transactions_response = client.get_transactions(&account_id, &consent_id, params.page, params.limit)
.await
.map(|transactions| Json(serde_json::to_value(transactions).unwrap()))
.map_err(map_banking_error)
.map_err(map_banking_error)?;
// ✨ NEW: Save all transactions to cache
for tx in &transactions_response.data.transaction {
let _ = db::transactions::store_transaction(
&state.db_pool,
&tx.transaction_id,
&tx.account_id,
bank.code(),
&tx.amount.amount,
&tx.amount.currency,
&tx.credit_debit_indicator,
&tx.status,
tx.booking_date_time, // Already DateTime<Utc>
tx.value_date_time, // Already Option<DateTime<Utc>>
&tx.transaction_information,
tx.bank_transaction_code.as_ref().map(|b| b.code.as_str()),
).await;
}
Ok(Json(serde_json::to_value(transactions_response).unwrap()))
}
pub async fn delete_consent_handler(
State(state): State<AppState>,
Path((bank_code, user_id)): Path<(String, String)>,
@ -218,6 +241,42 @@ pub async fn delete_consent_handler(
Ok::<_, (StatusCode, Json<serde_json::Value>)>((StatusCode::OK, Json(json!({ "status": "deleted" }))))
}
pub async fn get_balances_handler(
State(state): State<AppState>,
Path((bank_code, account_id)): Path<(String, String)>, // ← account_id, not user_id!
) -> Result<Json<serde_json::Value>, (StatusCode, Json<serde_json::Value>)> {
let bank = bank_code.parse::<Bank>()
.map_err(|_| (StatusCode::BAD_REQUEST, Json(json!({ "error": "Invalid bank code" }))))?;
// We need to get consent from the account's user
// For now, just get the first valid consent for this bank
let consent_id = db::consents::get_first_valid_consent(&state.db_pool, bank.code())
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, Json(json!({ "error": e.to_string() }))))?
.ok_or_else(|| (StatusCode::FORBIDDEN, Json(json!({ "error": "No valid consent" }))))?;
let client = state.banking_clients.get_client(bank);
info!("📊 Fetching balances for account: {} from bank: {}", account_id, bank.code());
let response = client.get_balances(&account_id, &consent_id).await.map_err(map_banking_error)?;
// ✨ Cache balances
for balance in &response.data.balance {
let _ = db::balances::store_balance(
&state.db_pool,
&balance.account_id,
&balance.balance_type,
&balance.amount.amount,
&balance.amount.currency,
balance.date_time,
).await;
}
Ok(Json(serde_json::to_value(response).unwrap()))
}
// --- Error Mapping ---

View file

@ -1,5 +1,6 @@
#!/bin/bash
# Save as test_multiberry.sh
# Extended comprehensive test with DB verification
BASE_URL="http://localhost:3000/api"
@ -8,7 +9,7 @@ echo "1⃣ REGISTER USER"
echo "=========================================="
REGISTER=$(curl -s -X POST $BASE_URL/auth/register \
-H 'Content-Type: application/json' \
-d '{"bank_user_number": 8, "password": "testpass123"}')
-d '{"bank_user_number": 1, "password": "testpass123"}')
echo "$REGISTER" | jq .
TOKEN=$(echo "$REGISTER" | jq -r '.token')
@ -21,11 +22,9 @@ echo ""
echo "=========================================="
echo "2⃣ LOGIN (verify token works)"
echo "=========================================="
LOGIN=$(curl -s -X POST $BASE_URL/auth/login \
curl -s -X POST $BASE_URL/auth/login \
-H 'Content-Type: application/json' \
-d "{\"bank_user_id\": \"$BANK_USER_ID\", \"password\": \"testpass123\"}")
echo "$LOGIN" | jq .
-d "{\"bank_user_id\": \"$BANK_USER_ID\", \"password\": \"testpass123\"}" | jq .
echo ""
echo "=========================================="
@ -58,17 +57,25 @@ echo "✅ Account ID: $ACCOUNT_ID"
echo ""
echo "=========================================="
echo "6⃣ GET BALANCES"
echo "6⃣ GET BALANCES (auto-saved to DB)"
echo "=========================================="
curl -s $BASE_URL/balances/vbank/$BANK_USER_ID \
-H "Authorization: Bearer $TOKEN" | jq .
BALANCES=$(curl -s $BASE_URL/balances/vbank/$ACCOUNT_ID \
-H "Authorization: Bearer $TOKEN")
echo "$BALANCES" | jq .
BALANCE_AMOUNT=$(echo "$BALANCES" | jq -r '.data.balance[0].amount.amount')
echo "✅ Current Balance: $BALANCE_AMOUNT RUB"
echo ""
echo "=========================================="
echo "7⃣ GET TRANSACTIONS (page 1, limit 6)"
echo "7⃣ GET TRANSACTIONS (page 1, limit 6 - auto-saved)"
echo "=========================================="
curl -s "$BASE_URL/transactions/vbank/$BANK_USER_ID/$ACCOUNT_ID?page=1&limit=6" \
-H "Authorization: Bearer $TOKEN" | jq .
TRANS_PAGE1=$(curl -s "$BASE_URL/transactions/vbank/$BANK_USER_ID/$ACCOUNT_ID?page=1&limit=6" \
-H "Authorization: Bearer $TOKEN")
echo "$TRANS_PAGE1" | jq .
TOTAL_RECORDS=$(echo "$TRANS_PAGE1" | jq -r '.meta.totalRecords')
echo "✅ Total Records: $TOTAL_RECORDS"
echo ""
echo "=========================================="
@ -86,11 +93,42 @@ curl -s -X DELETE $BASE_URL/consent/vbank/$BANK_USER_ID \
echo ""
echo "=========================================="
echo "🔟 VERIFY DB (from another terminal)"
echo "🔟 VERIFY DATABASE CACHE"
echo "=========================================="
echo ""
echo "Run in another terminal:"
echo "just psql-exec"
echo "SELECT * FROM users;"
echo "SELECT * FROM user_consents;"
echo "SELECT * FROM accounts;"
echo "SELECT * FROM transactions;"
echo ""
echo "Then execute these queries:"
echo ""
echo "-- Check users registered:"
echo "SELECT bank_user_id, created_at FROM users ORDER BY created_at DESC LIMIT 5;"
echo ""
echo "-- Check consents granted:"
echo "SELECT user_id, bank_code, consent_id, status, expires_at FROM user_consents;"
echo ""
echo "-- Check accounts cached:"
echo "SELECT account_id, user_id, bank_code, nickname, currency FROM accounts;"
echo ""
echo "-- Check balances cached:"
echo "SELECT account_id, balance_type, amount, currency, date_time FROM balances;"
echo ""
echo "-- Check transactions cached (show count by account):"
echo "SELECT account_id, COUNT(*) as tx_count, MIN(booking_date_time) as oldest, MAX(booking_date_time) as newest FROM transactions GROUP BY account_id;"
echo ""
echo "-- Show recent transactions:"
echo "SELECT transaction_id, account_id, amount, currency, credit_debit_indicator, transaction_information, booking_date_time FROM transactions ORDER BY booking_date_time DESC LIMIT 10;"
echo ""
echo "=========================================="
echo "✅ FULL TEST COMPLETE!"
echo "=========================================="
echo ""
echo "Summary:"
echo "✅ Authentication (register/login/auth middleware)"
echo "✅ Consent Management (request consent)"
echo "✅ Account Aggregation (fetch & cache accounts)"
echo "✅ Balance Retrieval (fetch & cache balances)"
echo "✅ Transaction History (fetch & cache transactions)"
echo "✅ Data Persistence (all cached in PostgreSQL)"
echo ""