mirror of
https://github.com/fluencelabs/aquavm
synced 2025-04-25 07:12:16 +00:00
refactor(aquavm): getting rid of CallOutputValue
in call merger (#353)
This PR refactors call merger of `TraceHandler`. Previously it requires `CallOutputValue` to determine a type of call output value (stream or scalar). And internally it checked correspondence between data result and call output type and return a error if they are not equal. Although execution engine component also had a match over these values and does nothing if they are not matched since `TraceHandler` did this job. This PR eliminate such behaviour and improve isolation of AquaVM modules.
This commit is contained in:
parent
c3aa8efa04
commit
a8b227caf5
@ -32,7 +32,7 @@ use utils::*;
|
||||
|
||||
use air_parser::ast::Ap;
|
||||
use air_parser::ast::ApResult;
|
||||
use air_trace_handler::MergerApResult;
|
||||
use air_trace_handler::merger::MergerApResult;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
|
@ -20,7 +20,7 @@ use crate::execution_step::Generation;
|
||||
use air_interpreter_data::ApResult;
|
||||
use air_parser::ast;
|
||||
use air_parser::ast::Ap;
|
||||
use air_trace_handler::MergerApResult;
|
||||
use air_trace_handler::merger::MergerApResult;
|
||||
|
||||
pub(super) fn ap_result_to_generation(ap_result: &MergerApResult) -> Generation {
|
||||
match ap_result {
|
||||
|
@ -18,17 +18,16 @@ use super::*;
|
||||
use crate::execution_step::execution_context::*;
|
||||
use crate::execution_step::Generation;
|
||||
use crate::execution_step::ValueAggregate;
|
||||
use crate::UncatchableError;
|
||||
|
||||
use air_interpreter_data::CallResult;
|
||||
use air_interpreter_data::TracePos;
|
||||
use air_interpreter_data::Value;
|
||||
use air_parser::ast::CallOutputValue;
|
||||
use air_trace_handler::PreparationScheme;
|
||||
use air_trace_handler::merger::ValueSource;
|
||||
use air_trace_handler::TraceHandler;
|
||||
|
||||
/// Writes result of a local `Call` instruction to `ExecutionCtx` at `output`.
|
||||
/// Returns call result.
|
||||
pub(crate) fn set_local_result<'i>(
|
||||
pub(crate) fn populate_context_from_peer_service_result<'i>(
|
||||
executed_result: ValueAggregate,
|
||||
output: &CallOutputValue<'i>,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
@ -46,64 +45,61 @@ pub(crate) fn set_local_result<'i>(
|
||||
.add_stream_value(executed_result, Generation::Last, stream.name, stream.position)?;
|
||||
Ok(CallResult::executed_stream(result_value, generation))
|
||||
}
|
||||
// by the internal conventions if call has no output value,
|
||||
// corresponding data should have scalar type
|
||||
CallOutputValue::None => Ok(CallResult::executed_scalar(result_value)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn set_result_from_value<'i>(
|
||||
value: &mut Value,
|
||||
pub(crate) fn populate_context_from_data<'i>(
|
||||
value: Value,
|
||||
tetraplet: RcSecurityTetraplet,
|
||||
trace_pos: TracePos,
|
||||
scheme: PreparationScheme,
|
||||
value_source: ValueSource,
|
||||
output: &CallOutputValue<'i>,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
) -> ExecutionResult<()> {
|
||||
) -> ExecutionResult<Value> {
|
||||
match (output, value) {
|
||||
(CallOutputValue::Scalar(scalar), Value::Scalar(value)) => {
|
||||
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
|
||||
exec_ctx.scalars.set_scalar_value(scalar.name, result)?;
|
||||
Ok(Value::Scalar(value))
|
||||
}
|
||||
(
|
||||
CallOutputValue::Stream(stream),
|
||||
Value::Stream {
|
||||
value,
|
||||
generation: stream_generation,
|
||||
},
|
||||
) => {
|
||||
(CallOutputValue::Stream(stream), Value::Stream { value, generation }) => {
|
||||
let result = ValueAggregate::new(value.clone(), tetraplet, trace_pos);
|
||||
let generation = match scheme {
|
||||
PreparationScheme::Both | PreparationScheme::Previous => {
|
||||
assert_ne!(*stream_generation, u32::MAX, "Should be valid");
|
||||
Generation::Nth(*stream_generation)
|
||||
}
|
||||
PreparationScheme::Current => {
|
||||
assert_eq!(*stream_generation, u32::MAX, "Shouldn't be valid");
|
||||
Generation::Last
|
||||
}
|
||||
};
|
||||
let generation = exec_ctx
|
||||
let adjusted_generation = maybe_adjust_generation(generation, value_source);
|
||||
let resulted_generation =
|
||||
exec_ctx
|
||||
.streams
|
||||
.add_stream_value(result, generation, stream.name, stream.position)?;
|
||||
// Update value's generation
|
||||
*stream_generation = generation;
|
||||
}
|
||||
// it isn't needed to check there that output and value matches because
|
||||
// it's been already checked in trace handler
|
||||
_ => {}
|
||||
};
|
||||
.add_stream_value(result, adjusted_generation, stream.name, stream.position)?;
|
||||
|
||||
Ok(())
|
||||
let result = Value::Stream {
|
||||
value,
|
||||
generation: resulted_generation,
|
||||
};
|
||||
Ok(result)
|
||||
}
|
||||
// by the internal conventions if call has no output value,
|
||||
// corresponding data should have scalar type
|
||||
(CallOutputValue::None, value @ Value::Scalar(_)) => Ok(value),
|
||||
(_, value) => Err(ExecutionError::Uncatchable(
|
||||
UncatchableError::CallResultNotCorrespondToInstr(value),
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes an executed state of a particle being sent to remote node.
|
||||
pub(crate) fn set_remote_call_result<'i>(
|
||||
peer_pk: String,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) {
|
||||
pub(crate) fn handle_remote_call<'i>(peer_pk: String, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) {
|
||||
exec_ctx.next_peer_pks.push(peer_pk);
|
||||
exec_ctx.subgraph_complete = false;
|
||||
|
||||
let new_call_result = CallResult::sent_peer_id(exec_ctx.run_parameters.current_peer_id.clone());
|
||||
trace_ctx.meet_call_end(new_call_result);
|
||||
}
|
||||
|
||||
fn maybe_adjust_generation(prev_stream_generation: u32, value_source: ValueSource) -> Generation {
|
||||
match value_source {
|
||||
ValueSource::PreviousData => Generation::Nth(prev_stream_generation),
|
||||
ValueSource::CurrentData => Generation::Last,
|
||||
}
|
||||
}
|
||||
|
@ -15,16 +15,15 @@
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use crate::execution_step::air::call::call_result_setter::set_result_from_value;
|
||||
use crate::execution_step::air::call::call_result_setter::populate_context_from_data;
|
||||
use crate::execution_step::CatchableError;
|
||||
use crate::execution_step::RcSecurityTetraplet;
|
||||
|
||||
use air_interpreter_data::CallResult;
|
||||
use air_interpreter_data::Sender;
|
||||
use air_interpreter_data::TracePos;
|
||||
use air_interpreter_interface::CallServiceResult;
|
||||
use air_parser::ast::CallOutputValue;
|
||||
use air_trace_handler::PreparationScheme;
|
||||
use air_trace_handler::merger::MetResult;
|
||||
use air_trace_handler::TraceHandler;
|
||||
|
||||
use fstrings::f;
|
||||
@ -39,31 +38,28 @@ pub(crate) struct StateDescriptor {
|
||||
/// This function looks at the existing call state, validates it,
|
||||
/// and returns Ok(true) if the call should be executed further.
|
||||
pub(super) fn handle_prev_state<'i>(
|
||||
met_result: MetResult,
|
||||
tetraplet: &RcSecurityTetraplet,
|
||||
output: &CallOutputValue<'i>,
|
||||
mut prev_result: CallResult,
|
||||
trace_pos: TracePos,
|
||||
scheme: PreparationScheme,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<StateDescriptor> {
|
||||
use CallResult::*;
|
||||
|
||||
match &mut prev_result {
|
||||
match met_result.result {
|
||||
// this call was failed on one of the previous executions,
|
||||
// here it's needed to bubble this special error up
|
||||
CallServiceFailed(ret_code, err_msg) => {
|
||||
CallServiceFailed(ret_code, ref err_msg) => {
|
||||
exec_ctx.subgraph_complete = false;
|
||||
let ret_code = *ret_code;
|
||||
let err_msg = err_msg.clone();
|
||||
trace_ctx.meet_call_end(prev_result);
|
||||
trace_ctx.meet_call_end(met_result.result);
|
||||
Err(CatchableError::LocalServiceError(ret_code, err_msg).into())
|
||||
}
|
||||
RequestSentBy(Sender::PeerIdWithCallId { peer_id, call_id })
|
||||
RequestSentBy(Sender::PeerIdWithCallId { ref peer_id, call_id })
|
||||
if peer_id.as_str() == exec_ctx.run_parameters.current_peer_id.as_str() =>
|
||||
{
|
||||
// call results are identified by call_id that is saved in data
|
||||
match exec_ctx.call_results.remove(call_id) {
|
||||
match exec_ctx.call_results.remove(&call_id) {
|
||||
Some(call_result) => {
|
||||
update_state_with_service_result(tetraplet.clone(), output, call_result, exec_ctx, trace_ctx)?;
|
||||
Ok(StateDescriptor::executed())
|
||||
@ -71,7 +67,7 @@ pub(super) fn handle_prev_state<'i>(
|
||||
// result hasn't been prepared yet
|
||||
None => {
|
||||
exec_ctx.subgraph_complete = false;
|
||||
Ok(StateDescriptor::not_ready(prev_result))
|
||||
Ok(StateDescriptor::not_ready(met_result.result))
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -79,16 +75,24 @@ pub(super) fn handle_prev_state<'i>(
|
||||
// check whether current node can execute this call
|
||||
let is_current_peer = tetraplet.peer_pk.as_str() == exec_ctx.run_parameters.current_peer_id.as_str();
|
||||
if is_current_peer {
|
||||
return Ok(StateDescriptor::can_execute_now(prev_result));
|
||||
return Ok(StateDescriptor::can_execute_now(met_result.result));
|
||||
}
|
||||
|
||||
exec_ctx.subgraph_complete = false;
|
||||
Ok(StateDescriptor::cant_execute_now(prev_result))
|
||||
Ok(StateDescriptor::cant_execute_now(met_result.result))
|
||||
}
|
||||
// this instruction's been already executed
|
||||
Executed(ref mut value) => {
|
||||
set_result_from_value(&mut *value, tetraplet.clone(), trace_pos, scheme, output, exec_ctx)?;
|
||||
trace_ctx.meet_call_end(prev_result);
|
||||
Executed(value) => {
|
||||
let resulted_value = populate_context_from_data(
|
||||
value,
|
||||
tetraplet.clone(),
|
||||
met_result.trace_pos,
|
||||
met_result.source,
|
||||
output,
|
||||
exec_ctx,
|
||||
)?;
|
||||
let call_result = CallResult::Executed(resulted_value);
|
||||
trace_ctx.meet_call_end(call_result);
|
||||
|
||||
Ok(StateDescriptor::executed())
|
||||
}
|
||||
@ -114,7 +118,7 @@ fn update_state_with_service_result<'i>(
|
||||
let trace_pos = trace_ctx.trace_pos();
|
||||
|
||||
let executed_result = ValueAggregate::new(result, tetraplet, trace_pos);
|
||||
let new_call_result = set_local_result(executed_result, output, exec_ctx)?;
|
||||
let new_call_result = populate_context_from_peer_service_result(executed_result, output, exec_ctx)?;
|
||||
trace_ctx.meet_call_end(new_call_result);
|
||||
|
||||
Ok(())
|
||||
|
@ -30,7 +30,7 @@ use crate::SecurityTetraplet;
|
||||
use air_interpreter_data::CallResult;
|
||||
use air_interpreter_interface::CallRequestParams;
|
||||
use air_parser::ast;
|
||||
use air_trace_handler::MergerCallResult;
|
||||
use air_trace_handler::merger::MergerCallResult;
|
||||
use air_trace_handler::TraceHandler;
|
||||
use air_utils::measure;
|
||||
|
||||
@ -91,7 +91,7 @@ impl<'i> ResolvedCall<'i> {
|
||||
// call can be executed only on peers with such peer_id
|
||||
let tetraplet = &self.tetraplet;
|
||||
if tetraplet.peer_pk.as_str() != exec_ctx.run_parameters.current_peer_id.as_str() {
|
||||
set_remote_call_result(tetraplet.peer_pk.clone(), exec_ctx, trace_ctx);
|
||||
handle_remote_call(tetraplet.peer_pk.clone(), exec_ctx, trace_ctx);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
@ -156,25 +156,13 @@ impl<'i> ResolvedCall<'i> {
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<StateDescriptor> {
|
||||
let prev_result = trace_ctx.meet_call_start(&self.output);
|
||||
let (call_result, trace_pos, scheme) = match trace_to_exec_err!(prev_result, raw_call)? {
|
||||
MergerCallResult::CallResult {
|
||||
value,
|
||||
trace_pos,
|
||||
scheme,
|
||||
} => (value, trace_pos, scheme),
|
||||
MergerCallResult::Empty => return Ok(StateDescriptor::no_previous_state()),
|
||||
};
|
||||
|
||||
handle_prev_state(
|
||||
&self.tetraplet,
|
||||
&self.output,
|
||||
call_result,
|
||||
trace_pos,
|
||||
scheme,
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
)
|
||||
let prev_result = trace_ctx.meet_call_start();
|
||||
match trace_to_exec_err!(prev_result, raw_call)? {
|
||||
MergerCallResult::Met(call_result) => {
|
||||
handle_prev_state(call_result, &self.tetraplet, &self.output, exec_ctx, trace_ctx)
|
||||
}
|
||||
MergerCallResult::NotMet => Ok(StateDescriptor::no_previous_state()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Prepare arguments of this call instruction by resolving and preparing their security tetraplets.
|
||||
|
@ -28,7 +28,7 @@ use crate::UncatchableError;
|
||||
use air_interpreter_data::CanonResult;
|
||||
use air_interpreter_data::TracePos;
|
||||
use air_parser::ast;
|
||||
use air_trace_handler::MergerCanonResult;
|
||||
use air_trace_handler::merger::MergerCanonResult;
|
||||
|
||||
use std::borrow::Cow;
|
||||
|
||||
|
@ -17,7 +17,8 @@
|
||||
use crate::ToErrorCode;
|
||||
|
||||
use air_interpreter_data::TracePos;
|
||||
use air_trace_handler::MergerApResult;
|
||||
use air_interpreter_data::Value;
|
||||
use air_trace_handler::merger::MergerApResult;
|
||||
use air_trace_handler::TraceHandlerError;
|
||||
use strum::IntoEnumIterator;
|
||||
use strum_macros::EnumDiscriminants;
|
||||
@ -51,11 +52,16 @@ pub enum UncatchableError {
|
||||
#[error("multiple iterable values found for iterable name '{0}'")]
|
||||
MultipleIterableValues(String),
|
||||
|
||||
/// Errors occurred when result from data doesn't match to a instruction, f.e. an instruction
|
||||
/// Errors occurred when result from data doesn't match to an ap instruction, f.e. an ap
|
||||
/// could be applied to a stream, but result doesn't contain generation in a source position.
|
||||
#[error("ap result {0:?} doesn't match with corresponding instruction")]
|
||||
ApResultNotCorrespondToInstr(MergerApResult),
|
||||
|
||||
/// Errors occurred when result from data doesn't match to a call instruction, f.e. a call
|
||||
/// could be applied to a stream, but result doesn't contain generation in a source position.
|
||||
#[error("call result value {0:?} doesn't match with corresponding instruction")]
|
||||
CallResultNotCorrespondToInstr(Value),
|
||||
|
||||
/// Variable shadowing is not allowed, usually it's thrown when a AIR tries to assign value
|
||||
/// for a variable not in a fold block or in a global scope but not right after new.
|
||||
#[error("trying to shadow variable '{0}', but shadowing is allowed only inside fold blocks")]
|
||||
|
@ -16,8 +16,9 @@
|
||||
|
||||
use air::UncatchableError;
|
||||
use air_test_utils::prelude::*;
|
||||
use air_trace_handler::merger::CallResultError;
|
||||
use air_trace_handler::merger::MergeError;
|
||||
use air_trace_handler::TraceHandlerError;
|
||||
use air_trace_handler::{CallResultError, MergeError};
|
||||
|
||||
#[test]
|
||||
fn par_early_exit() {
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
use air::UncatchableError;
|
||||
use air_test_utils::prelude::*;
|
||||
use air_trace_handler::MergeError;
|
||||
use air_trace_handler::merger::MergeError;
|
||||
use air_trace_handler::TraceHandlerError;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
@ -18,7 +18,6 @@ use super::*;
|
||||
use merger::*;
|
||||
|
||||
use air_interpreter_data::InterpreterData;
|
||||
use air_parser::ast::CallOutputValue;
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub struct TraceHandler {
|
||||
@ -60,8 +59,8 @@ impl TraceHandler {
|
||||
|
||||
impl TraceHandler {
|
||||
/// Should be called at the beginning of a call execution.
|
||||
pub fn meet_call_start(&mut self, output_value: &CallOutputValue<'_>) -> TraceHandlerResult<MergerCallResult> {
|
||||
try_merge_next_state_as_call(&mut self.data_keeper, output_value).map_err(Into::into)
|
||||
pub fn meet_call_start(&mut self) -> TraceHandlerResult<MergerCallResult> {
|
||||
try_merge_next_state_as_call(&mut self.data_keeper).map_err(Into::into)
|
||||
}
|
||||
|
||||
/// Should be called when a call instruction was executed successfully. It adds the supplied
|
||||
|
@ -29,20 +29,11 @@
|
||||
mod data_keeper;
|
||||
mod errors;
|
||||
mod handler;
|
||||
mod merger;
|
||||
pub mod merger;
|
||||
mod state_automata;
|
||||
|
||||
pub use errors::TraceHandlerError;
|
||||
pub use handler::TraceHandler;
|
||||
pub use merger::ApResultError;
|
||||
pub use merger::CallResultError;
|
||||
pub use merger::FoldResultError;
|
||||
pub use merger::MergeCtxType;
|
||||
pub use merger::MergeError;
|
||||
pub use merger::MergerApResult;
|
||||
pub use merger::MergerCallResult;
|
||||
pub use merger::MergerCanonResult;
|
||||
pub use merger::PreparationScheme;
|
||||
pub use state_automata::SubgraphType;
|
||||
|
||||
pub type TraceHandlerResult<T> = std::result::Result<T, TraceHandlerError>;
|
||||
|
@ -18,7 +18,6 @@ mod utils;
|
||||
|
||||
use super::*;
|
||||
use crate::TracePos;
|
||||
use air_parser::ast::CallOutputValue;
|
||||
use utils::*;
|
||||
|
||||
const EXPECTED_STATE_NAME: &str = "call";
|
||||
@ -26,58 +25,50 @@ const EXPECTED_STATE_NAME: &str = "call";
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum MergerCallResult {
|
||||
/// There is no corresponding state in a trace for this call.
|
||||
Empty,
|
||||
NotMet,
|
||||
|
||||
/// There was a state in at least one of the contexts. If there were two states in
|
||||
/// both contexts, they were successfully merged.
|
||||
CallResult {
|
||||
value: CallResult,
|
||||
trace_pos: TracePos,
|
||||
scheme: PreparationScheme,
|
||||
},
|
||||
Met(MetResult),
|
||||
}
|
||||
|
||||
pub(crate) fn try_merge_next_state_as_call(
|
||||
data_keeper: &mut DataKeeper,
|
||||
output_value: &CallOutputValue<'_>,
|
||||
) -> MergeResult<MergerCallResult> {
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MetResult {
|
||||
pub result: CallResult,
|
||||
pub trace_pos: TracePos,
|
||||
pub source: ValueSource,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
pub enum ValueSource {
|
||||
PreviousData,
|
||||
CurrentData,
|
||||
}
|
||||
|
||||
pub(crate) fn try_merge_next_state_as_call(data_keeper: &mut DataKeeper) -> MergeResult<MergerCallResult> {
|
||||
use ExecutedState::Call;
|
||||
use PreparationScheme::*;
|
||||
|
||||
let prev_state = data_keeper.prev_slider_mut().next_state();
|
||||
let current_state = data_keeper.current_slider_mut().next_state();
|
||||
let value_type = ValueType::from_output_value(output_value);
|
||||
|
||||
let (prev_call, current_call) = match (prev_state, current_state) {
|
||||
(Some(Call(prev_call)), Some(Call(current_call))) => (prev_call, current_call),
|
||||
// this special case is needed to merge stream generation in a right way
|
||||
(None, Some(Call(CallResult::Executed(value)))) => {
|
||||
let call_result = merge_current_executed(value, value_type)?;
|
||||
return Ok(prepare_call_result(call_result, Current, data_keeper));
|
||||
match (prev_state, current_state) {
|
||||
(Some(Call(prev_call)), Some(Call(current_call))) => {
|
||||
let (merged_call, scheme) = merge_call_results(prev_call, current_call)?;
|
||||
Ok(prepare_call_result(merged_call, scheme, data_keeper))
|
||||
}
|
||||
(None, Some(Call(current_call))) => return Ok(prepare_call_result(current_call, Current, data_keeper)),
|
||||
(Some(Call(prev_call)), None) => return Ok(prepare_call_result(prev_call, Previous, data_keeper)),
|
||||
(None, None) => return Ok(MergerCallResult::Empty),
|
||||
(prev_state, current_state) => {
|
||||
return Err(MergeError::incompatible_states(
|
||||
(None, Some(Call(current_call))) => Ok(prepare_call_result(current_call, Current, data_keeper)),
|
||||
(Some(Call(prev_call)), None) => Ok(prepare_call_result(prev_call, Previous, data_keeper)),
|
||||
(None, None) => Ok(MergerCallResult::NotMet),
|
||||
(prev_state, current_state) => Err(MergeError::incompatible_states(
|
||||
prev_state,
|
||||
current_state,
|
||||
EXPECTED_STATE_NAME,
|
||||
))
|
||||
)),
|
||||
}
|
||||
};
|
||||
|
||||
let (merged_call, scheme) = merge_call_result(prev_call, current_call, value_type)?;
|
||||
let call_result = prepare_call_result(merged_call, scheme, data_keeper);
|
||||
try_match_value_type(&call_result, value_type)?;
|
||||
|
||||
Ok(call_result)
|
||||
}
|
||||
|
||||
fn merge_call_result(
|
||||
prev_call: CallResult,
|
||||
current_call: CallResult,
|
||||
value_type: ValueType<'_>,
|
||||
) -> MergeResult<(CallResult, PreparationScheme)> {
|
||||
fn merge_call_results(prev_call: CallResult, current_call: CallResult) -> MergeResult<(CallResult, PreparationScheme)> {
|
||||
use CallResult::*;
|
||||
use PreparationScheme::*;
|
||||
|
||||
@ -90,10 +81,9 @@ fn merge_call_result(
|
||||
(prev @ CallServiceFailed(..), RequestSentBy(_)) => (prev, Previous),
|
||||
// senders shouldn't be checked for equality, for more info please look at
|
||||
// github.com/fluencelabs/aquavm/issues/137
|
||||
(prev @ RequestSentBy(_), RequestSentBy(_)) => (prev, Previous),
|
||||
// this special case is needed to merge stream generation in a right way
|
||||
(RequestSentBy(_), Executed(value)) => (merge_current_executed(value, value_type)?, Current),
|
||||
(prev @ Executed(..), RequestSentBy(_)) => (prev, Previous),
|
||||
(previous @ RequestSentBy(_), RequestSentBy(_)) => (previous, Previous),
|
||||
(RequestSentBy(_), current @ Executed(_)) => (current, Current),
|
||||
(previous @ Executed(..), RequestSentBy(_)) => (previous, Previous),
|
||||
(Executed(prev_value), Executed(current_value)) => (merge_executed(prev_value, current_value)?, Both),
|
||||
(prev_call, current_call) => return Err(CallResultError::incompatible_calls(prev_call, current_call)),
|
||||
};
|
||||
@ -102,41 +92,32 @@ fn merge_call_result(
|
||||
}
|
||||
|
||||
pub(super) fn prepare_call_result(
|
||||
value: CallResult,
|
||||
call_result: CallResult,
|
||||
scheme: PreparationScheme,
|
||||
data_keeper: &mut DataKeeper,
|
||||
) -> MergerCallResult {
|
||||
let trace_pos = data_keeper.result_trace_next_pos();
|
||||
prepare_positions_mapping(scheme, data_keeper);
|
||||
|
||||
MergerCallResult::CallResult {
|
||||
value,
|
||||
let met_result = MetResult::new(call_result, trace_pos, scheme.into());
|
||||
MergerCallResult::Met(met_result)
|
||||
}
|
||||
|
||||
impl From<PreparationScheme> for ValueSource {
|
||||
fn from(scheme: PreparationScheme) -> Self {
|
||||
match scheme {
|
||||
PreparationScheme::Previous | PreparationScheme::Both => ValueSource::PreviousData,
|
||||
PreparationScheme::Current => ValueSource::CurrentData,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl MetResult {
|
||||
pub fn new(result: CallResult, trace_pos: TracePos, source: ValueSource) -> Self {
|
||||
Self {
|
||||
result,
|
||||
trace_pos,
|
||||
scheme,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Copy, Clone)]
|
||||
pub(crate) enum ValueType<'i> {
|
||||
Scalar,
|
||||
Stream(&'i str),
|
||||
}
|
||||
|
||||
impl<'i> ValueType<'i> {
|
||||
pub(self) fn from_output_value(output_value: &'i CallOutputValue<'_>) -> Self {
|
||||
match output_value {
|
||||
CallOutputValue::Stream(stream) => ValueType::Stream(stream.name),
|
||||
_ => ValueType::Scalar,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use std::fmt;
|
||||
impl fmt::Display for ValueType<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
ValueType::Scalar => write!(f, "scalar"),
|
||||
ValueType::Stream(stream_name) => write!(f, "${}", stream_name),
|
||||
source,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -62,25 +62,6 @@ fn are_streams_equal(
|
||||
))
|
||||
}
|
||||
|
||||
/// Merging of value from only current data to a stream is a something special, because it's
|
||||
/// needed to choose generation not from current data, but a maximum from streams on a current peer.
|
||||
/// Maximum versions are tracked in data in a special field called streams.
|
||||
pub(super) fn merge_current_executed(value: Value, value_type: ValueType<'_>) -> MergeResult<CallResult> {
|
||||
match (value, value_type) {
|
||||
(scalar @ Value::Scalar(_), ValueType::Scalar) => Ok(CallResult::Executed(scalar)),
|
||||
(Value::Stream { value, .. }, ValueType::Stream(_)) => {
|
||||
// it is checked by an assertion
|
||||
let canary_generation = u32::MAX;
|
||||
let stream = Value::Stream {
|
||||
value,
|
||||
generation: canary_generation,
|
||||
};
|
||||
Ok(CallResult::Executed(stream))
|
||||
}
|
||||
(value, value_type) => Err(CallResultError::data_not_match(value, value_type)),
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn check_equal(prev_call: &CallResult, current_call: &CallResult) -> MergeResult<()> {
|
||||
if prev_call != current_call {
|
||||
Err(CallResultError::incompatible_calls(
|
||||
@ -91,19 +72,3 @@ pub(super) fn check_equal(prev_call: &CallResult, current_call: &CallResult) ->
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn try_match_value_type(merged_call: &MergerCallResult, value_type: ValueType<'_>) -> MergeResult<()> {
|
||||
if let MergerCallResult::CallResult { value, .. } = merged_call {
|
||||
return match (value, value_type) {
|
||||
(CallResult::Executed(value @ Value::Scalar(_)), ValueType::Stream(_)) => {
|
||||
Err(CallResultError::data_not_match(value.clone(), value_type))
|
||||
}
|
||||
(CallResult::Executed(value @ Value::Stream { .. }), ValueType::Scalar) => {
|
||||
Err(CallResultError::data_not_match(value.clone(), value_type))
|
||||
}
|
||||
_ => Ok(()),
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -14,7 +14,6 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::call_merger::ValueType;
|
||||
use super::ApResult;
|
||||
use super::CallResult;
|
||||
use super::ExecutedState;
|
||||
@ -72,9 +71,6 @@ pub enum CallResultError {
|
||||
prev_call: CallResult,
|
||||
current_call: CallResult,
|
||||
},
|
||||
|
||||
#[error("air scripts has the following value type '{air_type}' while data other '{data_value:?}'")]
|
||||
DataNotMatchAIR { air_type: String, data_value: Value },
|
||||
}
|
||||
|
||||
#[derive(ThisError, Debug)]
|
||||
@ -156,14 +152,6 @@ impl CallResultError {
|
||||
|
||||
MergeError::IncorrectCallResult(call_result_error)
|
||||
}
|
||||
|
||||
pub(crate) fn data_not_match(data_value: Value, air_type: ValueType<'_>) -> MergeError {
|
||||
let air_type = air_type.to_string();
|
||||
|
||||
let call_result_error = CallResultError::DataNotMatchAIR { air_type, data_value };
|
||||
|
||||
MergeError::IncorrectCallResult(call_result_error)
|
||||
}
|
||||
}
|
||||
|
||||
impl CanonResultError {
|
||||
|
@ -24,6 +24,8 @@ mod position_mapping;
|
||||
|
||||
pub use ap_merger::MergerApResult;
|
||||
pub use call_merger::MergerCallResult;
|
||||
pub use call_merger::MetResult;
|
||||
pub use call_merger::ValueSource;
|
||||
pub use canon_merger::MergerCanonResult;
|
||||
pub use fold_merger::MergerFoldResult;
|
||||
pub use par_merger::MergerParResult;
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
use super::KeeperError;
|
||||
use super::ParResult;
|
||||
use crate::MergeCtxType;
|
||||
use crate::merger::MergeCtxType;
|
||||
use crate::ResolvedFold;
|
||||
use crate::TracePos;
|
||||
|
||||
|
@ -15,7 +15,7 @@
|
||||
*/
|
||||
|
||||
use super::*;
|
||||
use crate::MergeCtxType;
|
||||
use crate::merger::MergeCtxType;
|
||||
use crate::ResolvedFold;
|
||||
|
||||
/// This state updater manage to do the same thing as CtxStateHandler in ParFSM,
|
||||
|
@ -31,13 +31,13 @@ pub(super) use fsm_queue::FSMKeeper;
|
||||
pub(super) use par_fsm::ParFSM;
|
||||
|
||||
use super::data_keeper::KeeperError;
|
||||
use super::merger::MergeCtxType;
|
||||
use super::merger::MergerParResult;
|
||||
use super::DataKeeper;
|
||||
use super::ExecutedState;
|
||||
use super::FoldResult;
|
||||
use super::FoldSubTraceLore;
|
||||
use super::MergeCtx;
|
||||
use super::MergeCtxType;
|
||||
use super::MergerFoldResult;
|
||||
use super::ParResult;
|
||||
use super::ResolvedFold;
|
||||
|
@ -1,3 +1,4 @@
|
||||
## Next hardfork changes:
|
||||
- computing subtrace lengths in `FoldFSM` (for more details see [PR 138](https://github.com/fluencelabs/aquavm/pull/138))
|
||||
- change `Sender` struct serialization way in `CallResult::RequestSentBy`
|
||||
- add a separate (empty?) state in `air_interpreter_data::CallResult` for `CallOutputValue::None` for hardening
|
||||
|
Loading…
x
Reference in New Issue
Block a user