Skip to content

Commit

Permalink
Return textual error message for mempool status
Browse files Browse the repository at this point in the history
Add an error message field to MempoolAddTransactionStatus proto
  • Loading branch information
Jiabao Wu authored and calibra-opensource committed Jul 8, 2019
1 parent fdbda1d commit 779ebbe
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 100 deletions.
2 changes: 1 addition & 1 deletion admission_control/admission_control_proto/src/lib.rs
Expand Up @@ -89,7 +89,7 @@ impl FromProto for SubmitTransactionResponse {
};
let mempool_error = if object.has_mempool_status() {
Some(MempoolAddTransactionStatus::from_proto(
object.get_mempool_status(),
object.take_mempool_status(),
)?)
} else {
None
Expand Down
Expand Up @@ -20,7 +20,10 @@ use logger::prelude::*;
use mempool::proto::{
mempool::{AddTransactionWithValidationRequest, HealthCheckRequest},
mempool_client::MempoolClientTrait,
shared::mempool_status::MempoolAddTransactionStatus::{self, MempoolIsFull},
shared::mempool_status::{
MempoolAddTransactionStatus,
MempoolAddTransactionStatusCode::{self, MempoolIsFull},
},
};
use metrics::counters::SVC_COUNTERS;
use proto_conv::{FromProto, IntoProto};
Expand Down Expand Up @@ -81,7 +84,10 @@ where
debug!("Mempool is full");
OP_COUNTERS.inc_by("submit_txn.rejected.mempool_full", 1);
let mut response = SubmitTransactionResponse::new();
response.set_mempool_status(MempoolIsFull);
let mut status = MempoolAddTransactionStatus::new();
status.set_code(MempoolIsFull);
status.set_message("Mempool is full".to_string());
response.set_mempool_status(status);
return Ok(response);
}

Expand Down Expand Up @@ -151,13 +157,13 @@ where
&self,
add_transaction_request: AddTransactionWithValidationRequest,
) -> Result<SubmitTransactionResponse> {
let mempool_result = self
let mut mempool_result = self
.mempool_client
.add_transaction_with_validation(&add_transaction_request)?;

debug!("[GRPC] Done with transaction submission request");
let mut response = SubmitTransactionResponse::new();
if mempool_result.get_status() == MempoolAddTransactionStatus::Valid {
if mempool_result.get_status().get_code() == MempoolAddTransactionStatusCode::Valid {
OP_COUNTERS.inc_by("submit_txn.txn_accepted", 1);
response.set_ac_status(AdmissionControlStatus::Accepted);
} else {
Expand All @@ -167,7 +173,7 @@ where
add_transaction_request.get_signed_txn()
);
OP_COUNTERS.inc_by("submit_txn.mempool.failure", 1);
response.set_mempool_status(mempool_result.get_status());
response.set_mempool_status(mempool_result.take_status());
}
Ok(response)
}
Expand Down
Expand Up @@ -13,7 +13,7 @@ use crypto::{
hash::CryptoHash,
signing::{generate_keypair, sign_message},
};
use mempool::MempoolAddTransactionStatus;
use mempool::proto::shared::mempool_status::MempoolAddTransactionStatusCode;
use proto_conv::FromProto;
use protobuf::{Message, UnknownFields};
use std::sync::Arc;
Expand Down Expand Up @@ -224,8 +224,8 @@ fn test_submit_txn_inner_mempool() {
)
.unwrap();
assert_eq!(
response.mempool_error.unwrap(),
MempoolAddTransactionStatus::InsufficientBalance,
response.mempool_error.unwrap().code,
MempoolAddTransactionStatusCode::InsufficientBalance
);
let invalid_seq_add = AccountAddress::new([101; ADDRESS_LENGTH]);
req.set_signed_txn(get_test_signed_txn(
Expand All @@ -240,8 +240,8 @@ fn test_submit_txn_inner_mempool() {
)
.unwrap();
assert_eq!(
response.mempool_error.unwrap(),
MempoolAddTransactionStatus::InvalidSeqNumber,
response.mempool_error.unwrap().code,
MempoolAddTransactionStatusCode::InvalidSeqNumber
);
let sys_error_add = AccountAddress::new([102; ADDRESS_LENGTH]);
req.set_signed_txn(get_test_signed_txn(
Expand All @@ -256,8 +256,8 @@ fn test_submit_txn_inner_mempool() {
)
.unwrap();
assert_eq!(
response.mempool_error.unwrap(),
MempoolAddTransactionStatus::InvalidUpdate,
response.mempool_error.unwrap().code,
MempoolAddTransactionStatusCode::InvalidUpdate
);
let accepted_add = AccountAddress::new([103; ADDRESS_LENGTH]);
req.set_signed_txn(get_test_signed_txn(
Expand Down
Expand Up @@ -7,7 +7,7 @@ use mempool::proto::{
HealthCheckRequest, HealthCheckResponse,
},
mempool_client::MempoolClientTrait,
shared::mempool_status::MempoolAddTransactionStatus,
shared::mempool_status::{MempoolAddTransactionStatus, MempoolAddTransactionStatusCode},
};
use proto_conv::FromProto;
use std::time::SystemTime;
Expand All @@ -33,6 +33,7 @@ impl MempoolClientTrait for LocalMockMempool {
req: &AddTransactionWithValidationRequest,
) -> ::grpcio::Result<AddTransactionWithValidationResponse> {
let mut resp = AddTransactionWithValidationResponse::new();
let mut status = MempoolAddTransactionStatus::new();
let insufficient_balance_add = [100_u8; ADDRESS_LENGTH];
let invalid_seq_add = [101_u8; ADDRESS_LENGTH];
let sys_error_add = [102_u8; ADDRESS_LENGTH];
Expand All @@ -41,16 +42,17 @@ impl MempoolClientTrait for LocalMockMempool {
let signed_txn = SignedTransaction::from_proto(req.get_signed_txn().clone()).unwrap();
let sender = signed_txn.sender();
if sender.as_ref() == insufficient_balance_add {
resp.set_status(MempoolAddTransactionStatus::InsufficientBalance);
status.set_code(MempoolAddTransactionStatusCode::InsufficientBalance);
} else if sender.as_ref() == invalid_seq_add {
resp.set_status(MempoolAddTransactionStatus::InvalidSeqNumber);
status.set_code(MempoolAddTransactionStatusCode::InvalidSeqNumber);
} else if sender.as_ref() == sys_error_add {
resp.set_status(MempoolAddTransactionStatus::InvalidUpdate);
status.set_code(MempoolAddTransactionStatusCode::InvalidUpdate);
} else if sender.as_ref() == accepted_add {
resp.set_status(MempoolAddTransactionStatus::Valid);
status.set_code(MempoolAddTransactionStatusCode::Valid);
} else if sender.as_ref() == mempool_full {
resp.set_status(MempoolAddTransactionStatus::MempoolIsFull);
status.set_code(MempoolAddTransactionStatusCode::MempoolIsFull);
}
resp.set_status(status);
Ok(resp)
}
fn health_check(&self, _req: &HealthCheckRequest) -> ::grpcio::Result<HealthCheckResponse> {
Expand Down
28 changes: 21 additions & 7 deletions mempool/src/core_mempool/mempool.rs
Expand Up @@ -11,6 +11,7 @@ use crate::{
transaction::{MempoolAddTransactionStatus, MempoolTransaction, TimelineState},
transaction_store::TransactionStore,
},
proto::shared::mempool_status::MempoolAddTransactionStatusCode,
OP_COUNTERS,
};
use chrono::Utc;
Expand Down Expand Up @@ -90,10 +91,8 @@ impl Mempool {
}
}

fn check_balance(&mut self, txn: &SignedTransaction, balance: u64, gas_amount: u64) -> bool {
let required_balance = txn.gas_unit_price() * gas_amount
+ self.transactions.get_required_balance(&txn.sender());
balance >= required_balance
fn get_required_balance(&mut self, txn: &SignedTransaction, gas_amount: u64) -> u64 {
txn.gas_unit_price() * gas_amount + self.transactions.get_required_balance(&txn.sender())
}

/// Used to add a transaction to the Mempool
Expand All @@ -111,8 +110,16 @@ impl Mempool {
&txn.sender(),
db_sequence_number
);
if !self.check_balance(&txn, balance, gas_amount) {
return MempoolAddTransactionStatus::InsufficientBalance;

let required_balance = self.get_required_balance(&txn, gas_amount);
if balance < required_balance {
return MempoolAddTransactionStatus::new(
MempoolAddTransactionStatusCode::InsufficientBalance,
format!(
"balance: {}, required_balance: {}, gas_amount: {}",
balance, required_balance, gas_amount
),
);
}

let cached_value = self.sequence_number_cache.get_mut(&txn.sender());
Expand All @@ -125,7 +132,14 @@ impl Mempool {

// don't accept old transactions (e.g. seq is less than account's current seq_number)
if txn.sequence_number() < sequence_number {
return MempoolAddTransactionStatus::InvalidSeqNumber;
return MempoolAddTransactionStatus::new(
MempoolAddTransactionStatusCode::InvalidSeqNumber,
format!(
"transaction sequence number is {}, current sequence number is {}",
txn.sequence_number(),
sequence_number,
),
);
}

let expiration_time = SystemTime::now()
Expand Down
77 changes: 25 additions & 52 deletions mempool/src/core_mempool/transaction.rs
@@ -1,7 +1,7 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::proto::shared::mempool_status::MempoolAddTransactionStatus as ProtoMempoolAddTransactionStatus;
use crate::proto::shared::mempool_status::MempoolAddTransactionStatusCode;
use failure::prelude::*;
use proto_conv::{FromProto, IntoProto};
use std::time::Duration;
Expand Down Expand Up @@ -56,68 +56,41 @@ pub enum TimelineState {

/// Status of transaction insertion operation
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum MempoolAddTransactionStatus {
/// Transaction was successfully sent to Mempool
Valid,
/// The sender does not have enough balance for the transaction
InsufficientBalance,
/// Transaction sequence number is invalid(e.g. too old)
InvalidSeqNumber,
/// Mempool is full (reached max global capacity)
MempoolIsFull,
/// Account reached max capacity per account
TooManyTransactions,
/// Invalid update. Only gas price increase is allowed
InvalidUpdate,
pub struct MempoolAddTransactionStatus {
/// Status code of the transaction insertion operation
pub code: MempoolAddTransactionStatusCode,
/// Message to give more details about the transaction insertion operation
pub message: String,
}

impl MempoolAddTransactionStatus {
/// Create a new MempoolAddTransactionStatus
pub fn new(code: MempoolAddTransactionStatusCode, message: String) -> Self {
Self { code, message }
}
}

//***********************************
// Decoding/Encoding to Protobuffers
//***********************************
impl IntoProto for MempoolAddTransactionStatus {
type ProtoType = crate::proto::shared::mempool_status::MempoolAddTransactionStatus;

fn into_proto(self) -> Self::ProtoType {
match self {
MempoolAddTransactionStatus::Valid => ProtoMempoolAddTransactionStatus::Valid,
MempoolAddTransactionStatus::InsufficientBalance => {
ProtoMempoolAddTransactionStatus::InsufficientBalance
}
MempoolAddTransactionStatus::InvalidSeqNumber => {
ProtoMempoolAddTransactionStatus::InvalidSeqNumber
}
MempoolAddTransactionStatus::InvalidUpdate => {
ProtoMempoolAddTransactionStatus::InvalidUpdate
}
MempoolAddTransactionStatus::MempoolIsFull => {
ProtoMempoolAddTransactionStatus::MempoolIsFull
}
MempoolAddTransactionStatus::TooManyTransactions => {
ProtoMempoolAddTransactionStatus::TooManyTransactions
}
}
let mut mempool_add_transaction_status = Self::ProtoType::new();
mempool_add_transaction_status.set_message(self.message);
mempool_add_transaction_status.set_code(self.code);
mempool_add_transaction_status
}
}

impl FromProto for MempoolAddTransactionStatus {
type ProtoType = crate::proto::shared::mempool_status::MempoolAddTransactionStatus;

fn from_proto(object: Self::ProtoType) -> Result<Self> {
let ret = match object {
ProtoMempoolAddTransactionStatus::Valid => MempoolAddTransactionStatus::Valid,
ProtoMempoolAddTransactionStatus::InsufficientBalance => {
MempoolAddTransactionStatus::InsufficientBalance
}
ProtoMempoolAddTransactionStatus::InvalidSeqNumber => {
MempoolAddTransactionStatus::InvalidSeqNumber
}
ProtoMempoolAddTransactionStatus::InvalidUpdate => {
MempoolAddTransactionStatus::InvalidUpdate
}
ProtoMempoolAddTransactionStatus::MempoolIsFull => {
MempoolAddTransactionStatus::MempoolIsFull
}
ProtoMempoolAddTransactionStatus::TooManyTransactions => {
MempoolAddTransactionStatus::TooManyTransactions
}
};
Ok(ret)
fn from_proto(proto: Self::ProtoType) -> Result<Self> {
Ok(MempoolAddTransactionStatus::new(
proto.get_code(),
proto.get_message().to_string(),
))
}
}
35 changes: 30 additions & 5 deletions mempool/src/core_mempool/transaction_store.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{
},
transaction::{MempoolAddTransactionStatus, MempoolTransaction, TimelineState},
},
proto::shared::mempool_status::MempoolAddTransactionStatusCode,
OP_COUNTERS,
};
use config::config::MempoolConfig;
Expand Down Expand Up @@ -88,7 +89,14 @@ impl TransactionStore {
return status;
}
if self.check_if_full() {
return MempoolAddTransactionStatus::MempoolIsFull;
return MempoolAddTransactionStatus::new(
MempoolAddTransactionStatusCode::MempoolIsFull,
format!(
"mempool size: {}, capacity: {}",
self.system_ttl_index.size(),
self.capacity,
),
);
}

let address = txn.get_sender();
Expand All @@ -101,7 +109,14 @@ impl TransactionStore {
if let Some(txns) = self.transactions.get_mut(&address) {
// capacity check
if txns.len() >= self.capacity_per_user {
return MempoolAddTransactionStatus::TooManyTransactions;
return MempoolAddTransactionStatus::new(
MempoolAddTransactionStatusCode::TooManyTransactions,
format!(
"txns length: {} capacity per user: {}",
txns.len(),
self.capacity_per_user,
),
);
}

// insert into storage and other indexes
Expand All @@ -111,7 +126,7 @@ impl TransactionStore {
OP_COUNTERS.set("txn.system_ttl_index", self.system_ttl_index.size());
}
self.process_ready_transactions(&address, current_sequence_number);
MempoolAddTransactionStatus::Valid
MempoolAddTransactionStatus::new(MempoolAddTransactionStatusCode::Valid, "".to_string())
}

/// Check whether the queue size >= threshold in config.
Expand Down Expand Up @@ -143,14 +158,24 @@ impl TransactionStore {
txn: &MempoolTransaction,
) -> (bool, MempoolAddTransactionStatus) {
let mut is_update = false;
let mut status = MempoolAddTransactionStatus::Valid;
let mut status = MempoolAddTransactionStatus::new(
MempoolAddTransactionStatusCode::Valid,
"".to_string(),
);

if let Some(txns) = self.transactions.get_mut(&txn.get_sender()) {
if let Some(current_version) = txns.get_mut(&txn.get_sequence_number()) {
is_update = true;
// TODO: do we need to ensure the rest of content hasn't changed
if txn.get_gas_price() <= current_version.get_gas_price() {
status = MempoolAddTransactionStatus::InvalidUpdate;
status = MempoolAddTransactionStatus::new(
MempoolAddTransactionStatusCode::InvalidUpdate,
format!(
"txn gas price: {}, current_version gas price: {}",
txn.get_gas_price(),
current_version.get_gas_price(),
),
);
} else {
self.priority_index.remove(&current_version);
current_version.txn = txn.txn.clone();
Expand Down
12 changes: 9 additions & 3 deletions mempool/src/core_mempool/unit_tests/common.rs
@@ -1,7 +1,10 @@
// Copyright (c) The Libra Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::core_mempool::{CoreMempool, MempoolAddTransactionStatus, TimelineState, TxnPointer};
use crate::{
core_mempool::{CoreMempool, TimelineState, TxnPointer},
proto::shared::mempool_status::MempoolAddTransactionStatusCode,
};
use config::config::NodeConfigHelpers;
use crypto::signing::generate_keypair_for_testing;
use failure::prelude::*;
Expand Down Expand Up @@ -106,8 +109,11 @@ pub(crate) fn add_txns_to_mempool(

pub(crate) fn add_txn(pool: &mut CoreMempool, transaction: TestTransaction) -> Result<()> {
let txn = transaction.make_signed_transaction();
match pool.add_txn(txn.clone(), 0, 0, 1000, TimelineState::NotReady) {
MempoolAddTransactionStatus::Valid => Ok(()),
match pool
.add_txn(txn.clone(), 0, 0, 1000, TimelineState::NotReady)
.code
{
MempoolAddTransactionStatusCode::Valid => Ok(()),
_ => Err(format_err!("insertion failure")),
}
}
Expand Down

0 comments on commit 779ebbe

Please sign in to comment.