WIP enable periodic tracing_call in AVM

This commit is contained in:
Roman Nozdrin 2023-11-13 14:27:40 +00:00
parent 973e8aeea5
commit b8283b1b0d
2 changed files with 119 additions and 17 deletions

View File

@ -30,6 +30,8 @@ use fluence_keypair::KeyPair;
use std::ops::Deref; use std::ops::Deref;
use std::ops::DerefMut; use std::ops::DerefMut;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::time::Duration; use std::time::Duration;
use std::time::Instant; use std::time::Instant;
@ -55,6 +57,7 @@ impl DerefMut for SendSafeRunner {
pub struct AVM<E> { pub struct AVM<E> {
runner: SendSafeRunner, runner: SendSafeRunner,
data_store: AVMDataStore<E>, data_store: AVMDataStore<E>,
counter: AtomicUsize,
} }
impl<E> AVM<E> { impl<E> AVM<E> {
@ -73,12 +76,16 @@ impl<E> AVM<E> {
let runner = AVMRunner::new(air_wasm_path, max_heap_size, logging_mask) let runner = AVMRunner::new(air_wasm_path, max_heap_size, logging_mask)
.map_err(AVMError::RunnerError)?; .map_err(AVMError::RunnerError)?;
let runner = SendSafeRunner(runner); let runner = SendSafeRunner(runner);
let avm = Self { runner, data_store }; let counter = AtomicUsize::new(0);
let avm = Self {
runner,
data_store,
counter,
};
Ok(avm) Ok(avm)
} }
#[allow(clippy::result_large_err)]
pub fn call( pub fn call(
&mut self, &mut self,
air: impl Into<String>, air: impl Into<String>,
@ -86,6 +93,27 @@ impl<E> AVM<E> {
particle_parameters: ParticleParameters<'_>, particle_parameters: ParticleParameters<'_>,
call_results: CallResults, call_results: CallResults,
keypair: &KeyPair, keypair: &KeyPair,
) -> AVMResult<AVMOutcome, E> {
let enable_tracing = self.counter.load(Ordering::Relaxed) % 200 == 0;
self.call_(
air,
data,
particle_parameters,
call_results,
keypair,
enable_tracing,
)
}
#[allow(clippy::result_large_err)]
pub fn call_(
&mut self,
air: impl Into<String>,
data: impl Into<Vec<u8>>,
particle_parameters: ParticleParameters<'_>,
call_results: CallResults,
keypair: &KeyPair,
enable_tracing: bool,
) -> AVMResult<AVMOutcome, E> { ) -> AVMResult<AVMOutcome, E> {
let air = air.into(); let air = air.into();
let prev_data = self.data_store.read_data( let prev_data = self.data_store.read_data(
@ -94,23 +122,43 @@ impl<E> AVM<E> {
)?; )?;
let current_data = data.into(); let current_data = data.into();
self.counter.fetch_add(1, Ordering::Relaxed);
let execution_start_time = Instant::now(); let execution_start_time = Instant::now();
let memory_size_before = self.memory_stats().memory_size; let memory_size_before = self.memory_stats().memory_size;
let outcome = self let outcome = if !enable_tracing {
.runner self.runner
.call( .call(
air.clone(), air.clone(),
prev_data, prev_data,
current_data.clone(), current_data.clone(),
particle_parameters.init_peer_id.clone().into_owned(), particle_parameters.init_peer_id.clone().into_owned(),
particle_parameters.timestamp, particle_parameters.timestamp,
particle_parameters.ttl, particle_parameters.ttl,
particle_parameters.current_peer_id.clone(), particle_parameters.current_peer_id.clone(),
call_results.clone(), call_results.clone(),
keypair, keypair,
particle_parameters.particle_id.to_string(), particle_parameters.particle_id.to_string(),
) )
.map_err(AVMError::RunnerError)?; .map_err(AVMError::RunnerError)?
} else {
self.runner
.call_tracing_(
air.clone(),
prev_data,
current_data.clone(),
particle_parameters.init_peer_id.clone().into_owned(),
particle_parameters.timestamp,
particle_parameters.ttl,
particle_parameters.current_peer_id.clone(),
call_results.clone(),
keypair,
"info".into(),
0,
particle_parameters.particle_id.to_string(),
)
.map_err(AVMError::RunnerError)?
};
let execution_time = execution_start_time.elapsed(); let execution_time = execution_start_time.elapsed();
let memory_delta = self.memory_stats().memory_size - memory_size_before; let memory_delta = self.memory_stats().memory_size - memory_size_before;

View File

@ -169,6 +169,60 @@ impl AVMRunner {
Ok(outcome) Ok(outcome)
} }
pub fn call_tracing_(
&mut self,
air: impl Into<String>,
prev_data: impl Into<Vec<u8>>,
data: impl Into<Vec<u8>>,
init_peer_id: impl Into<String>,
timestamp: u64,
ttl: u32,
current_peer_id: impl Into<String>,
call_results: CallResults,
keypair: &KeyPair,
tracing_params: String,
tracing_output_mode: u8,
particle_id: String,
) -> RunnerResult<RawAVMOutcome> {
let key_format = keypair.key_format();
let secret_key_bytes: Vec<u8> = keypair.secret().map_err(RunnerError::KeyError)?;
let mut args = prepare_args(
air,
prev_data,
data,
current_peer_id.into(),
init_peer_id.into(),
timestamp,
ttl,
call_results,
key_format.into(),
secret_key_bytes,
particle_id,
);
args.push(IValue::String(tracing_params));
args.push(IValue::U8(tracing_output_mode));
let result = measure!(
self.marine.call_with_ivalues(
&self.wasm_filename,
"invoke_tracing",
&args,
<_>::default(),
)?,
tracing::Level::INFO,
"marine.call_with_ivalues",
method = "invoke_tracing",
);
let result = try_as_one_value_vec(result)?;
let outcome = InterpreterOutcome::from_ivalue(result)
.map_err(RunnerError::InterpreterResultDeError)?;
let outcome = RawAVMOutcome::from_interpreter_outcome(outcome)?;
Ok(outcome)
}
pub fn memory_stats(&self) -> AVMMemoryStats { pub fn memory_stats(&self) -> AVMMemoryStats {
let stats = self.marine.module_memory_stats(); let stats = self.marine.module_memory_stats();