From bf8aee7f1525bdd33b3440f09dc292ee1ce0aece Mon Sep 17 00:00:00 2001 From: Mike Voronov Date: Tue, 11 Oct 2022 01:41:22 +0300 Subject: [PATCH] fix(execution-engine): fix invalid iteration over stream (#362) This PR is mostly a revertion of #357, that is needed to make stream work correctly in fold itrerations. Closes #363. --- CHANGELOG.md | 11 ++ Cargo.lock | 6 +- air-interpreter/Cargo.toml | 2 +- air/Cargo.toml | 2 +- air/src/execution_step/air/new.rs | 8 +- air/src/execution_step/boxed_value/stream.rs | 45 +------ .../errors/uncatchable_errors.rs | 5 - .../execution_context/context.rs | 14 +- .../execution_context/streams_variables.rs | 61 ++------- .../streams_variables/utils.rs | 22 +-- air/src/farewell_step/outcome.rs | 18 +-- air/src/preparation_step/preparation.rs | 5 +- .../features/data_merging/data_merge.rs | 4 +- .../test_module/features/streams/merging.rs | 127 ++++++++++++++++++ air/tests/test_module/features/streams/mod.rs | 1 + .../features/streams/streams_early_exit.rs | 38 +++--- air/tests/test_module/instructions/canon.rs | 2 +- air/tests/test_module/issues/issue_302.rs | 2 +- air/tests/test_module/issues/issue_363.rs | 112 +++++++++++++++ air/tests/test_module/issues/mod.rs | 1 + crates/air-lib/interpreter-data/CHANGELOG.md | 5 + crates/air-lib/interpreter-data/Cargo.toml | 2 +- crates/air-lib/trace-handler/src/errors.rs | 24 ---- crates/air-lib/trace-handler/src/handler.rs | 28 ---- crates/air-lib/trace-handler/src/lib.rs | 1 - 25 files changed, 319 insertions(+), 227 deletions(-) create mode 100644 air/tests/test_module/features/streams/merging.rs create mode 100644 air/tests/test_module/issues/issue_363.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 22c869eb..b3fccfef 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +## Version 0.31.0 (2022-10-11) + +[PR 362](https://github.com/fluencelabs/aquavm/pull/362): +Partially revert [PR 340](https://github.com/fluencelabs/aquavm/pull/340) due to fixes in [PR 358](https://github.com/fluencelabs/aquavm/pull/358) + +[PR 360](https://github.com/fluencelabs/aquavm/pull/360): +Allow using stream without insertion + +[PR 358](https://github.com/fluencelabs/aquavm/pull/358): +Implement a temporary fix for fold with canon + ## Version 0.30.0 (2022-09-28) [PR 340](https://github.com/fluencelabs/aquavm/pull/340): diff --git a/Cargo.lock b/Cargo.lock index 01a08ddd..af6e9537 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,7 +13,7 @@ dependencies = [ [[package]] name = "air" -version = "0.30.0" +version = "0.31.0" dependencies = [ "air-execution-info-collector", "air-interpreter-data", @@ -75,7 +75,7 @@ dependencies = [ [[package]] name = "air-interpreter" -version = "0.30.0" +version = "0.31.0" dependencies = [ "air", "air-interpreter-interface", @@ -91,7 +91,7 @@ dependencies = [ [[package]] name = "air-interpreter-data" -version = "0.3.0" +version = "0.4.0" dependencies = [ "air-parser", "air-utils", diff --git a/air-interpreter/Cargo.toml b/air-interpreter/Cargo.toml index eed3d366..26549e63 100644 --- a/air-interpreter/Cargo.toml +++ b/air-interpreter/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "air-interpreter" -version = "0.30.0" +version = "0.31.0" description = "Crate-wrapper for air" authors = ["Fluence Labs"] edition = "2018" diff --git a/air/Cargo.toml b/air/Cargo.toml index bcbfeac2..68ce7cfd 100644 --- a/air/Cargo.toml +++ b/air/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "air" -version = "0.30.0" +version = "0.31.0" description = "Interpreter of AIR scripts intended to coordinate request flow in the Fluence network" authors = ["Fluence Labs"] edition = "2018" diff --git a/air/src/execution_step/air/new.rs b/air/src/execution_step/air/new.rs index 0e3f3793..e7d175d5 100644 --- a/air/src/execution_step/air/new.rs +++ b/air/src/execution_step/air/new.rs @@ -32,7 +32,7 @@ impl<'i> super::ExecutableInstruction<'i> for New<'i> { // any error. It's highly important to distinguish between global and restricted streams // at the end of execution to make a correct data. let instruction_result = self.instruction.execute(exec_ctx, trace_ctx); - let epilog_result = epilog(self, exec_ctx, trace_ctx); + let epilog_result = epilog(self, exec_ctx); match (instruction_result, epilog_result) { (Ok(()), Ok(())) => Ok(()), @@ -62,13 +62,11 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) { exec_ctx.tracker.meet_new(position); } -fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { +fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) -> ExecutionResult<()> { let position = new.span.left; match &new.argument { NewArgument::Stream(stream) => { - exec_ctx - .streams - .meet_scope_end(stream.name.to_string(), position, trace_ctx)?; + exec_ctx.streams.meet_scope_end(stream.name.to_string(), position)?; Ok(()) } NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name), diff --git a/air/src/execution_step/boxed_value/stream.rs b/air/src/execution_step/boxed_value/stream.rs index b0ed04c6..51793d3b 100644 --- a/air/src/execution_step/boxed_value/stream.rs +++ b/air/src/execution_step/boxed_value/stream.rs @@ -17,13 +17,10 @@ use super::ExecutionResult; use super::ValueAggregate; use crate::execution_step::CatchableError; -use crate::ExecutionError; use crate::JValue; -use crate::UncatchableError; use air_interpreter_data::TracePos; use air_trace_handler::merger::ValueSource; -use air_trace_handler::TraceHandler; use std::collections::HashMap; use std::fmt::Formatter; @@ -38,25 +35,18 @@ pub struct Stream { /// obtained values from a current_data that were not present in prev_data becomes a new generation. values: Vec>, - /// Count of values from previous data. - previous_gens_count: usize, - /// This map is intended to support canonicalized stream creation, such streams has /// corresponding value positions in a data and this field are used to create such streams. values_by_pos: HashMap, } impl Stream { - pub(crate) fn from_generations_count(previous_count: usize, current_count: usize) -> Self { + pub(crate) fn from_generations_count(previous_count: usize) -> Self { let last_generation_count = 1; // TODO: bubble up an overflow error instead of expect - let overall_count = previous_count - .checked_add(current_count) - .and_then(|value| value.checked_add(last_generation_count)) - .expect("it shouldn't overflow"); + let overall_gens_count = previous_count + last_generation_count; Self { - values: vec![vec![]; overall_count], - previous_gens_count: previous_count, + values: vec![vec![]; overall_gens_count], values_by_pos: HashMap::new(), } } @@ -69,7 +59,6 @@ impl Stream { }; Self { values: vec![vec![value]], - previous_gens_count: 0, values_by_pos, } } @@ -85,7 +74,7 @@ impl Stream { let generation = match (generation, source) { (Generation::Last, _) => self.values.len() - 1, (Generation::Nth(previous_gen), ValueSource::PreviousData) => previous_gen as usize, - (Generation::Nth(current_gen), ValueSource::CurrentData) => self.previous_gens_count + current_gen as usize, + (_, ValueSource::CurrentData) => self.values.len() - 1, }; if generation >= self.values.len() { @@ -206,26 +195,6 @@ impl Stream { Some(iter) } - - /// Removes empty generations updating data and returns final generation count. - pub(crate) fn compactify(mut self, trace_ctx: &mut TraceHandler) -> ExecutionResult { - self.remove_empty_generations(); - - for (generation, values) in self.values.iter().enumerate() { - for value in values.iter() { - trace_ctx - .update_generation(value.trace_pos, generation as u32) - .map_err(|e| ExecutionError::Uncatchable(UncatchableError::GenerationCompatificationError(e)))?; - } - } - - Ok(self.values.len()) - } - - /// Removes empty generations from current values. - fn remove_empty_generations(&mut self) { - self.values.retain(|values| !values.is_empty()); - } } #[derive(Clone, Copy, Debug, Eq, PartialEq)] @@ -327,7 +296,7 @@ mod test { fn test_slice_iter() { let value_1 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into()); let value_2 = ValueAggregate::new(Rc::new(json!("value")), <_>::default(), 1.into()); - let mut stream = Stream::from_generations_count(2, 0); + let mut stream = Stream::from_generations_count(2); stream .add_value(value_1, Generation::Nth(0), ValueSource::PreviousData) @@ -351,7 +320,7 @@ mod test { #[test] fn test_slice_on_empty_stream() { - let stream = Stream::from_generations_count(2, 0); + let stream = Stream::from_generations_count(2); let slice = stream.slice_iter(Generation::Nth(0), Generation::Nth(1)); assert!(slice.is_none()); @@ -370,7 +339,7 @@ mod test { fn generation_from_current_data() { let value_1 = ValueAggregate::new(Rc::new(json!("value_1")), <_>::default(), 1.into()); let value_2 = ValueAggregate::new(Rc::new(json!("value_2")), <_>::default(), 2.into()); - let mut stream = Stream::from_generations_count(5, 5); + let mut stream = Stream::from_generations_count(5); stream .add_value(value_1.clone(), Generation::Nth(2), ValueSource::CurrentData) diff --git a/air/src/execution_step/errors/uncatchable_errors.rs b/air/src/execution_step/errors/uncatchable_errors.rs index f2c23e52..8293d654 100644 --- a/air/src/execution_step/errors/uncatchable_errors.rs +++ b/air/src/execution_step/errors/uncatchable_errors.rs @@ -20,7 +20,6 @@ use crate::ToErrorCode; use air_interpreter_data::TracePos; use air_interpreter_data::Value; use air_trace_handler::merger::MergerApResult; -use air_trace_handler::GenerationCompatificationError; use air_trace_handler::TraceHandlerError; use strum::IntoEnumIterator; use strum_macros::EnumDiscriminants; @@ -42,10 +41,6 @@ pub enum UncatchableError { instruction: String, }, - /// These errors are related to internal bug in the interpreter when result trace is corrupted. - #[error(transparent)] - GenerationCompatificationError(#[from] GenerationCompatificationError), - /// Fold state wasn't found for such iterator name. #[error("fold state not found for this iterable '{0}'")] FoldStateNotFound(String), diff --git a/air/src/execution_step/execution_context/context.rs b/air/src/execution_step/execution_context/context.rs index 5652e5ac..9642ad7f 100644 --- a/air/src/execution_step/execution_context/context.rs +++ b/air/src/execution_step/execution_context/context.rs @@ -66,19 +66,9 @@ pub(crate) struct ExecutionCtx<'i> { } impl<'i> ExecutionCtx<'i> { - pub(crate) fn new( - prev_data: &InterpreterData, - current_data: &InterpreterData, - call_results: CallResults, - run_parameters: RunParameters, - ) -> Self { + pub(crate) fn new(prev_data: &InterpreterData, call_results: CallResults, run_parameters: RunParameters) -> Self { let run_parameters = RcRunParameters::from_run_parameters(run_parameters); - let streams = Streams::from_data( - &prev_data.global_streams, - ¤t_data.global_streams, - prev_data.restricted_streams.clone(), - current_data.restricted_streams.clone(), - ); + let streams = Streams::from_data(&prev_data.global_streams, prev_data.restricted_streams.clone()); Self { run_parameters, diff --git a/air/src/execution_step/execution_context/streams_variables.rs b/air/src/execution_step/execution_context/streams_variables.rs index f6d34558..f0b06fa0 100644 --- a/air/src/execution_step/execution_context/streams_variables.rs +++ b/air/src/execution_step/execution_context/streams_variables.rs @@ -20,7 +20,6 @@ mod utils; use crate::execution_step::ExecutionResult; use crate::execution_step::Stream; -use crate::ExecutionError; use stream_descriptor::*; pub(crate) use stream_value_descriptor::StreamValueDescriptor; @@ -28,7 +27,6 @@ use air_interpreter_data::GlobalStreamGens; use air_interpreter_data::RestrictedStreamGens; use air_parser::ast::Span; use air_parser::AirPos; -use air_trace_handler::TraceHandler; use std::collections::hash_map::Entry::{Occupied, Vacant}; use std::collections::HashMap; @@ -44,10 +42,6 @@ pub(crate) struct Streams { /// should have at the scope start. previous_restricted_stream_gens: RestrictedStreamGens, - /// Contains stream generations from current data that a restricted stream - /// should have at the scope start. - current_restricted_stream_gens: RestrictedStreamGens, - /// Contains stream generations that each private stream had at the scope end. /// Then it's placed into data new_restricted_stream_gens: RestrictedStreamGens, @@ -56,16 +50,13 @@ pub(crate) struct Streams { impl Streams { pub(crate) fn from_data( previous_global_streams: &GlobalStreamGens, - current_global_streams: &GlobalStreamGens, previous_restricted_stream_gens: RestrictedStreamGens, - current_restricted_stream_gens: RestrictedStreamGens, ) -> Self { - let streams = utils::merge_global_streams(previous_global_streams, current_global_streams); + let streams = utils::prepare_global_streams(previous_global_streams); Self { streams, previous_restricted_stream_gens, - current_restricted_stream_gens, new_restricted_stream_gens: <_>::default(), } } @@ -112,10 +103,9 @@ impl Streams { pub(crate) fn meet_scope_start(&mut self, name: impl Into, span: Span, iteration: u32) { let name = name.into(); - let (prev_gens_count, current_gens_count) = - self.stream_generation_from_data(&name, span.left, iteration as usize); + let prev_gens_count = self.stream_generation_from_data(&name, span.left, iteration as usize); - let new_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize); + let new_stream = Stream::from_generations_count(prev_gens_count as usize); let new_descriptor = StreamDescriptor::restricted(new_stream, span); match self.streams.entry(name) { Occupied(mut entry) => { @@ -127,12 +117,7 @@ impl Streams { } } - pub(crate) fn meet_scope_end( - &mut self, - name: String, - position: AirPos, - trace_ctx: &mut TraceHandler, - ) -> ExecutionResult<()> { + pub(crate) fn meet_scope_end(&mut self, name: String, position: AirPos) -> ExecutionResult<()> { // unwraps are safe here because met_scope_end must be called after met_scope_start let stream_descriptors = self.streams.get_mut(&name).unwrap(); // delete a stream after exit from a scope @@ -141,57 +126,37 @@ impl Streams { // streams should contain only non-empty stream embodiments self.streams.remove(&name); } - let gens_count = last_descriptor.stream.compactify(trace_ctx)?; - self.collect_stream_generation(name, position, gens_count as u32); + self.collect_stream_generation(name, position, last_descriptor.stream.generations_count() as u32); Ok(()) } /// This method must be called at the end of execution, because it contains logic to collect /// all global streams depending on their presence in a streams field. - pub(crate) fn into_streams_data( - self, - trace_ctx: &mut TraceHandler, - ) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> { + pub(crate) fn into_streams_data(self) -> (GlobalStreamGens, RestrictedStreamGens) { // since it's called at the end of execution, streams contains only global ones, // because all private's been deleted after exiting a scope let global_streams = self .streams .into_iter() - .map(|(name, mut descriptors)| -> Result<_, ExecutionError> { + .map(|(name, mut descriptors)| { // unwrap is safe here because of invariant that streams contains non-empty vectors, // moreover it must contain only one value, because this method is called at the end // of the execution let stream = descriptors.pop().unwrap().stream; - let gens_count = stream.compactify(trace_ctx)?; - Ok((name, gens_count as u32)) + (name, stream.generations_count() as u32) }) - .collect::>()?; + .collect::(); - Ok((global_streams, self.new_restricted_stream_gens)) + (global_streams, self.new_restricted_stream_gens) } - fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> (u32, u32) { - let previous_generation = - Self::restricted_stream_generation(&self.previous_restricted_stream_gens, name, position, iteration) - .unwrap_or_default(); - let current_generation = - Self::restricted_stream_generation(&self.current_restricted_stream_gens, name, position, iteration) - .unwrap_or_default(); - - (previous_generation, current_generation) - } - - fn restricted_stream_generation( - restricted_stream_gens: &RestrictedStreamGens, - name: &str, - position: AirPos, - iteration: usize, - ) -> Option { - restricted_stream_gens + fn stream_generation_from_data(&self, name: &str, position: AirPos, iteration: usize) -> u32 { + self.previous_restricted_stream_gens .get(name) .and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration))) .copied() + .unwrap_or_default() } fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: u32) { diff --git a/air/src/execution_step/execution_context/streams_variables/utils.rs b/air/src/execution_step/execution_context/streams_variables/utils.rs index 9755dddf..8bf60798 100644 --- a/air/src/execution_step/execution_context/streams_variables/utils.rs +++ b/air/src/execution_step/execution_context/streams_variables/utils.rs @@ -21,29 +21,15 @@ use air_interpreter_data::GlobalStreamGens; use std::collections::HashMap; -pub(super) fn merge_global_streams( +pub(super) fn prepare_global_streams( previous_global_streams: &GlobalStreamGens, - current_global_streams: &GlobalStreamGens, ) -> HashMap> { - let mut global_streams = previous_global_streams + previous_global_streams .iter() .map(|(stream_name, &prev_gens_count)| { - let current_gens_count = current_global_streams.get(stream_name).cloned().unwrap_or_default(); - let global_stream = Stream::from_generations_count(prev_gens_count as usize, current_gens_count as usize); + let global_stream = Stream::from_generations_count(prev_gens_count as usize); let descriptor = StreamDescriptor::global(global_stream); (stream_name.to_string(), vec![descriptor]) }) - .collect::>(); - - for (stream_name, ¤t_gens_count) in current_global_streams { - if previous_global_streams.contains_key(stream_name) { - continue; - } - - let global_stream = Stream::from_generations_count(0, current_gens_count as usize); - let descriptor = StreamDescriptor::global(global_stream); - global_streams.insert(stream_name.clone(), vec![descriptor]); - } - - global_streams + .collect::>() } diff --git a/air/src/farewell_step/outcome.rs b/air/src/farewell_step/outcome.rs index b99df4d5..49a63286 100644 --- a/air/src/farewell_step/outcome.rs +++ b/air/src/farewell_step/outcome.rs @@ -17,7 +17,6 @@ use super::FarewellError; use crate::execution_step::ExecutionCtx; use crate::execution_step::TraceHandler; -use crate::ExecutionError; use crate::InterpreterOutcome; use crate::ToErrorCode; use crate::INTERPRETER_SUCCESS; @@ -82,18 +81,11 @@ pub(crate) fn from_execution_error( #[tracing::instrument(skip(exec_ctx, trace_handler), level = "info")] fn populate_outcome_from_contexts( exec_ctx: ExecutionCtx<'_>, - mut trace_handler: TraceHandler, + trace_handler: TraceHandler, ret_code: i64, error_message: String, ) -> InterpreterOutcome { - let maybe_gens = exec_ctx - .streams - .into_streams_data(&mut trace_handler) - .map_err(execution_error_into_outcome); - let (global_streams, restricted_streams) = match maybe_gens { - Ok(gens) => gens, - Err(outcome) => return outcome, - }; + let (global_streams, restricted_streams) = exec_ctx.streams.into_streams_data(); let data = InterpreterData::from_execution_result( trace_handler.into_result_trace(), @@ -116,12 +108,6 @@ fn populate_outcome_from_contexts( InterpreterOutcome::new(ret_code, error_message, data, next_peer_pks, call_requests) } -// this method is called only if there is an internal error in the interpreter and -// new execution trace was corrupted -fn execution_error_into_outcome(error: ExecutionError) -> InterpreterOutcome { - InterpreterOutcome::new(error.to_error_code(), error.to_string(), vec![], vec![], vec![]) -} - /// Deduplicate values in a supplied vector. fn dedup(mut vec: Vec) -> Vec { use std::collections::HashSet; diff --git a/air/src/preparation_step/preparation.rs b/air/src/preparation_step/preparation.rs index 1bafc70d..0f4c7108 100644 --- a/air/src/preparation_step/preparation.rs +++ b/air/src/preparation_step/preparation.rs @@ -45,7 +45,7 @@ pub(crate) fn prepare<'i>( let air: Instruction<'i> = *air_parser::parse(raw_air).map_err(PreparationError::AIRParseError)?; - let exec_ctx = make_exec_ctx(&prev_data, ¤t_data, call_results, run_parameters)?; + let exec_ctx = make_exec_ctx(&prev_data, call_results, run_parameters)?; let trace_handler = TraceHandler::from_data(prev_data, current_data); let result = PreparationDescriptor { @@ -66,13 +66,12 @@ fn try_to_data(raw_data: &[u8]) -> PreparationResult { #[tracing::instrument(skip_all)] fn make_exec_ctx( prev_data: &InterpreterData, - current_data: &InterpreterData, call_results: &[u8], run_parameters: RunParameters, ) -> PreparationResult> { let call_results = serde_json::from_slice(call_results) .map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?; - let ctx = ExecutionCtx::new(prev_data, current_data, call_results, run_parameters); + let ctx = ExecutionCtx::new(prev_data, call_results, run_parameters); Ok(ctx) } diff --git a/air/tests/test_module/features/data_merging/data_merge.rs b/air/tests/test_module/features/data_merging/data_merge.rs index 292254dc..cbde0ab6 100644 --- a/air/tests/test_module/features/data_merging/data_merge.rs +++ b/air/tests/test_module/features/data_merging/data_merge.rs @@ -290,8 +290,8 @@ fn fold_merge() { .get("$stream_2") .expect("$stream_2 should be present in data"); - assert_eq!(*stream_1_generations, 8); - assert_eq!(*stream_2_generations, 6); + assert_eq!(*stream_1_generations, 4); + assert_eq!(*stream_2_generations, 3); let mut fold_states_count = 0; let mut calls_count = HashMap::new(); diff --git a/air/tests/test_module/features/streams/merging.rs b/air/tests/test_module/features/streams/merging.rs new file mode 100644 index 00000000..b022250f --- /dev/null +++ b/air/tests/test_module/features/streams/merging.rs @@ -0,0 +1,127 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use air_test_framework::TestExecutor; +use air_test_utils::prelude::*; + +#[test] +fn merging_fold_iterations_extensively() { + let script = r#" + (seq + (seq + (call "client" ("get" "data") [] permutations) ; ok = [["p1",[[["p1",1],["p2",2],["p3",3]],[["p1",4],["p3",5],["p2",6]]]],["p2",[[["p2",7],["p1",8],["p3",9]],[["p2",10],["p3",11],["p1",12]]]],["p3",[[["p3",13],["p1",14],["p2",15]],[["p3",16],["p2",17],["p1",18]]]]] + (seq + (fold permutations pair + (seq + (fold pair.$.[1] peer_ids + (seq + (seq + (call pair.$.[0] ("op" "noop") []) ; ok = null + (ap peer_ids $inner) + ) + (next peer_ids) + ) + ) + (next pair) + ) + ) + (seq + (canon "relay" $inner #inner) + (fold $inner ns + (par + (fold ns pair + (seq + (seq + (call pair.$.[0] ("op" "noop") []) ; ok = null + (ap pair.$.[1] $result) + ) + (next pair) + ) + ) + (next ns) + ) + ) + ) + ) + ) + (seq + (new $monotonic_stream + (seq + (fold $result elem + (seq + (ap elem $monotonic_stream) + (seq + (canon "relay" $monotonic_stream #canon_stream) + (xor + (match #canon_stream.length 18 + (null) + ) + (next elem) + ) + ) + ) + ) + (canon "relay" $result #joined_result) + ) + ) + (call "client" ("return" "") [#inner #joined_result]) ; ok = null + ) + ) + "#; + + let engine = TestExecutor::new( + TestRunParameters::from_init_peer_id("client"), + vec![], + vec!["relay", "p1", "p2", "p3"].into_iter().map(Into::into), + &script, + ) + .unwrap(); + + let mut queue = std::collections::vec_deque::VecDeque::new(); + let mut relay_outcomes = Vec::::new(); + queue.push_back("client".to_string()); + while !queue.is_empty() { + let peer = queue.pop_front().unwrap(); + if let Some(outcomes) = engine.execution_iter(peer.as_str()) { + for outcome in outcomes { + assert_eq!(outcome.ret_code, 0, "{:?}", outcome); + + for peer in &outcome.next_peer_pks { + queue.push_back(peer.clone()); + } + + if peer == "relay" { + relay_outcomes.push(outcome); + } + } + } else { + println!("peer: {}, no executions", peer); + } + } + + let last_relay_data = relay_outcomes.last().unwrap(); + let last_relay_trace = trace_from_result(last_relay_data); + let last_fold = last_relay_trace + .iter() + .filter_map(|state| match state { + ExecutedState::Fold(fold_result) => Some(fold_result), + _ => None, + }) + .last() + .unwrap(); + + assert_eq!(last_fold.lore.len(), 18); +} diff --git a/air/tests/test_module/features/streams/mod.rs b/air/tests/test_module/features/streams/mod.rs index 582dbd56..79829d52 100644 --- a/air/tests/test_module/features/streams/mod.rs +++ b/air/tests/test_module/features/streams/mod.rs @@ -15,6 +15,7 @@ */ mod ap_with_fold; +mod merging; mod recursive_streams; mod streams; mod streams_early_exit; diff --git a/air/tests/test_module/features/streams/streams_early_exit.rs b/air/tests/test_module/features/streams/streams_early_exit.rs index e6b938bb..c61efa55 100644 --- a/air/tests/test_module/features/streams/streams_early_exit.rs +++ b/air/tests/test_module/features/streams/streams_early_exit.rs @@ -115,9 +115,9 @@ fn par_early_exit() { executed_state::par(5, 1), executed_state::par(3, 1), executed_state::par(1, 1), - executed_state::stream_string("1", 1), - executed_state::stream_string("2", 2), - executed_state::stream_string("1", 1), + executed_state::stream_string("1", 0), + executed_state::stream_string("2", 0), + executed_state::stream_string("1", 0), executed_state::stream_string("success result from fallible_call_service", 0), executed_state::service_failed(1, "failed result from fallible_call_service"), executed_state::stream_string("success result from fallible_call_service", 0), @@ -155,7 +155,7 @@ fn par_early_exit() { trace_error: TraceHandlerError::MergeError(MergeError::IncorrectCallResult(CallResultError::ValuesNotEqual { prev_value: Value::Stream { value: rc!(json!("1")), - generation: 1, + generation: 0, }, current_value: Value::Stream { value: rc!(json!("non_exist_value")), @@ -302,44 +302,44 @@ fn fold_par_early_exit() { executed_state::scalar_string_array(vec!["c1", "c2"]), executed_state::scalar_string_array(vec!["d1", "d2"]), executed_state::stream_string("a1", 0), - executed_state::stream_string("a2", 1), + executed_state::stream_string("a2", 0), executed_state::stream_string("b1", 0), - executed_state::stream_string("b2", 1), + executed_state::stream_string("b2", 0), executed_state::stream_string("c1", 0), - executed_state::stream_string("c2", 1), + executed_state::stream_string("c2", 0), executed_state::stream_string("d1", 0), - executed_state::stream_string("d2", 1), + executed_state::stream_string("d2", 0), executed_state::par(69, 1), executed_state::fold(vec![ - executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(48, 0)), + executed_state::subtrace_lore(4, subtrace_desc(14, 34), subtrace_desc(82, 0)), executed_state::subtrace_lore(5, subtrace_desc(48, 34), subtrace_desc(82, 0)), ]), - executed_state::par(33, 0), + executed_state::par(33, 34), executed_state::fold(vec![ - executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(32, 0)), + executed_state::subtrace_lore(6, subtrace_desc(16, 16), subtrace_desc(48, 0)), executed_state::subtrace_lore(7, subtrace_desc(32, 16), subtrace_desc(48, 0)), ]), - executed_state::par(15, 0), + executed_state::par(15, 16), executed_state::par(13, 1), executed_state::fold(vec![ - executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(25, 0)), + executed_state::subtrace_lore(8, subtrace_desc(19, 6), subtrace_desc(31, 0)), executed_state::subtrace_lore(9, subtrace_desc(25, 6), subtrace_desc(31, 0)), ]), - executed_state::par(5, 0), + executed_state::par(5, 6), executed_state::fold(vec![ - executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(23, 0)), + executed_state::subtrace_lore(10, subtrace_desc(21, 2), subtrace_desc(25, 0)), executed_state::subtrace_lore(11, subtrace_desc(23, 2), subtrace_desc(25, 0)), ]), - executed_state::par(1, 0), + executed_state::par(1, 2), executed_state::scalar_string(unit_call_service_result), executed_state::par(1, 0), executed_state::scalar_string(unit_call_service_result), executed_state::par(5, 0), executed_state::fold(vec![ - executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(29, 0)), + executed_state::subtrace_lore(10, subtrace_desc(27, 2), subtrace_desc(31, 0)), executed_state::subtrace_lore(11, subtrace_desc(29, 2), subtrace_desc(31, 0)), ]), - executed_state::par(1, 0), + executed_state::par(1, 2), executed_state::scalar_string(unit_call_service_result), executed_state::par(1, 0), executed_state::scalar_string(unit_call_service_result), @@ -347,7 +347,7 @@ fn fold_par_early_exit() { executed_state::par(15, 0), executed_state::par(13, 1), executed_state::fold(vec![ - executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(41, 0)), + executed_state::subtrace_lore(8, subtrace_desc(35, 6), subtrace_desc(47, 0)), executed_state::subtrace_lore(9, subtrace_desc(41, 6), subtrace_desc(47, 0)), ]), ]; diff --git a/air/tests/test_module/instructions/canon.rs b/air/tests/test_module/instructions/canon.rs index 23f6c68a..2645f7bb 100644 --- a/air/tests/test_module/instructions/canon.rs +++ b/air/tests/test_module/instructions/canon.rs @@ -130,7 +130,7 @@ fn canon_fixes_stream_correct() { executed_state::stream_number(1, 0), executed_state::par(1, 1), executed_state::stream_number(2, 1), - executed_state::stream_number(3, 2), + executed_state::stream_number(3, 1), executed_state::scalar_number(4), executed_state::canon( json!({"tetraplet": {"function_name": "", "json_path": "", "peer_pk": "peer_id_3", "service_id": ""}, diff --git a/air/tests/test_module/issues/issue_302.rs b/air/tests/test_module/issues/issue_302.rs index a4c55652..ba3204e9 100644 --- a/air/tests/test_module/issues/issue_302.rs +++ b/air/tests/test_module/issues/issue_302.rs @@ -55,7 +55,7 @@ fn issue_302() { executed_state::par(1, 3), executed_state::stream_number(2, 1), executed_state::stream_number(1, 0), - executed_state::stream_number(0, 2), + executed_state::stream_number(0, 1), executed_state::scalar(json!([1, 2, 0])), ]; assert_eq!(actual_trace.deref(), expected_trace); diff --git a/air/tests/test_module/issues/issue_363.rs b/air/tests/test_module/issues/issue_363.rs new file mode 100644 index 00000000..9926734e --- /dev/null +++ b/air/tests/test_module/issues/issue_363.rs @@ -0,0 +1,112 @@ +/* + * Copyright 2022 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use air_test_utils::prelude::*; + +//https://github.com/fluencelabs/aquavm/issues/363 +#[test] +fn issue_363() { + let client_peer_id = "client"; + let mut client_vm = create_avm( + set_variable_call_service(json!([ + ["p1", [[["p1", 1], ["p2", 3]], [["p1", 4], ["p2", 5]]]], + ["p2", [[["p2", 13], ["p1", 14]], [["p2", 16], ["p1", 18]]]] + ])), + client_peer_id, + ); + + let p1_peer_id = "p1"; + let mut p1_vm = create_avm(set_variable_call_service(json!("p1")), p1_peer_id); + let p2_peer_id = "p2"; + let mut p2_vm = create_avm(set_variable_call_service(json!("p2")), p2_peer_id); + + let script = r#" + (seq + (seq + (call "client" ("get" "data") [] permutations) + (seq + (fold permutations pair + (seq + (fold pair.$.[1] peer_ids + (seq + (seq + (call pair.$.[0] ("op" "noop") []) + (ap peer_ids $inner) + ) + (next peer_ids) + ) + ) + (next pair) + ) + ) + (seq + (null) + (fold $inner ns + (par + (fold ns pair + (seq + (seq + (call pair.$.[0] ("op" "noop") []) + (ap pair.$.[1] $result) + ) + (next pair) + ) + ) + (next ns) + ) + ) + ) + ) + ) (null) + ) + "#; + + let client_result = checked_call_vm!(client_vm, <_>::default(), script, "", ""); + let p1_result_1 = checked_call_vm!(p1_vm, <_>::default(), script, "", client_result.data.clone()); + let p2_result_1 = checked_call_vm!(p2_vm, <_>::default(), script, "", p1_result_1.data.clone()); + + let p2_trace_1 = trace_from_result(&p2_result_1); + let fold_position = TracePos::from(9); + let fold_p2 = p2_trace_1.get(fold_position).unwrap(); + if let ExecutedState::Fold(fold) = fold_p2 { + assert_eq!(fold.lore.len(), 4); + assert_eq!(fold.lore[0].subtraces_desc[0].subtrace_len, 2); + assert_eq!(fold.lore[1].subtraces_desc[0].subtrace_len, 2); + assert_eq!(fold.lore[2].subtraces_desc[0].subtrace_len, 4); + assert_eq!(fold.lore[3].subtraces_desc[0].subtrace_len, 4); + } else { + panic!("expected fold at pos 9") + } + + let p1_result_2 = checked_call_vm!( + p1_vm, + <_>::default(), + script, + p1_result_1.data.clone(), + p2_result_1.data.clone() + ); + let p1_trace_2 = trace_from_result(&p1_result_2); + let fold_p1 = p1_trace_2.get(fold_position).unwrap(); + if let ExecutedState::Fold(fold) = fold_p1 { + assert_eq!(fold.lore.len(), 4); + assert_eq!(fold.lore[0].subtraces_desc[0].subtrace_len, 4); + assert_eq!(fold.lore[1].subtraces_desc[0].subtrace_len, 4); + assert_eq!(fold.lore[2].subtraces_desc[0].subtrace_len, 5); + assert_eq!(fold.lore[3].subtraces_desc[0].subtrace_len, 5); + } else { + panic!("expected fold at pos 9") + } +} diff --git a/air/tests/test_module/issues/mod.rs b/air/tests/test_module/issues/mod.rs index f6b7b2ec..7b2ef9d3 100644 --- a/air/tests/test_module/issues/mod.rs +++ b/air/tests/test_module/issues/mod.rs @@ -35,3 +35,4 @@ mod issue_331; mod issue_346; mod issue_348; mod issue_356; +mod issue_363; diff --git a/crates/air-lib/interpreter-data/CHANGELOG.md b/crates/air-lib/interpreter-data/CHANGELOG.md index 68e24a3c..212a7604 100644 --- a/crates/air-lib/interpreter-data/CHANGELOG.md +++ b/crates/air-lib/interpreter-data/CHANGELOG.md @@ -1,3 +1,8 @@ +## Version 0.4.0 + +[PR 356](https://github.com/fluencelabs/aquavm/pull/358): +- temporary fix of a bug with fold and canon compatibility + ## Version 0.3.0 [PR 292](https://github.com/fluencelabs/aquavm/pull/292): diff --git a/crates/air-lib/interpreter-data/Cargo.toml b/crates/air-lib/interpreter-data/Cargo.toml index 9d0ab98b..276c2e2b 100644 --- a/crates/air-lib/interpreter-data/Cargo.toml +++ b/crates/air-lib/interpreter-data/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "air-interpreter-data" description = "Data format of the AIR interpreter" -version = "0.3.0" +version = "0.4.0" authors = ["Fluence Labs"] edition = "2018" license = "Apache-2.0" diff --git a/crates/air-lib/trace-handler/src/errors.rs b/crates/air-lib/trace-handler/src/errors.rs index dba1ad83..1201aca4 100644 --- a/crates/air-lib/trace-handler/src/errors.rs +++ b/crates/air-lib/trace-handler/src/errors.rs @@ -18,8 +18,6 @@ use super::data_keeper::KeeperError; use super::merger::MergeError; use super::state_automata::StateFSMError; -use air_interpreter_data::ExecutedState; -use air_interpreter_data::TracePos; use thiserror::Error as ThisError; /// Errors arose out of merging previous data with a new. @@ -35,25 +33,3 @@ pub enum TraceHandlerError { #[error(transparent)] StateFSMError(#[from] StateFSMError), } - -#[derive(ThisError, Debug)] -#[allow(clippy::enum_variant_names)] -pub enum GenerationCompatificationError { - #[error("trying to change generation of an invalid trace position {0}")] - TracePosPointsToNowhere(TracePos), - - #[error( - "trying to change generation of a state {state} on {position} position, the state doesn't contain generation" - )] - TracePosPointsToInvalidState { position: TracePos, state: ExecutedState }, -} - -impl GenerationCompatificationError { - pub fn points_to_nowhere(position: TracePos) -> Self { - GenerationCompatificationError::TracePosPointsToNowhere(position) - } - - pub fn points_to_invalid_state(position: TracePos, state: ExecutedState) -> Self { - GenerationCompatificationError::TracePosPointsToInvalidState { position, state } - } -} diff --git a/crates/air-lib/trace-handler/src/handler.rs b/crates/air-lib/trace-handler/src/handler.rs index 0e909b2f..d33a97c9 100644 --- a/crates/air-lib/trace-handler/src/handler.rs +++ b/crates/air-lib/trace-handler/src/handler.rs @@ -55,34 +55,6 @@ impl TraceHandler { (prev_len, current_len) } - - pub fn update_generation( - &mut self, - trace_pos: TracePos, - generation: u32, - ) -> Result<(), GenerationCompatificationError> { - let state = self - .data_keeper - .result_trace - .get_mut(trace_pos) - .ok_or_else(|| GenerationCompatificationError::points_to_nowhere(trace_pos))?; - - match state { - ExecutedState::Ap(ap_result) => ap_result.res_generations = vec![generation], - ExecutedState::Call(CallResult::Executed(Value::Stream { - generation: call_generation, - .. - })) => *call_generation = generation, - state => { - return Err(GenerationCompatificationError::points_to_invalid_state( - trace_pos, - state.clone(), - )) - } - } - - Ok(()) - } } impl TraceHandler { diff --git a/crates/air-lib/trace-handler/src/lib.rs b/crates/air-lib/trace-handler/src/lib.rs index 77c1c67e..0610f191 100644 --- a/crates/air-lib/trace-handler/src/lib.rs +++ b/crates/air-lib/trace-handler/src/lib.rs @@ -32,7 +32,6 @@ mod handler; pub mod merger; mod state_automata; -pub use errors::GenerationCompatificationError; pub use errors::TraceHandlerError; pub use handler::TraceHandler; pub use state_automata::SubgraphType;