implement the first part of a set of changes

This commit is contained in:
vms 2022-09-20 03:17:20 +03:00
parent 20bb230a3a
commit e98fd54ba3
25 changed files with 533 additions and 430 deletions

4
Cargo.lock generated
View File

@ -13,7 +13,7 @@ dependencies = [
[[package]]
name = "air"
version = "0.28.0"
version = "0.29.0"
dependencies = [
"air-execution-info-collector",
"air-interpreter-data",
@ -72,7 +72,7 @@ version = "0.1.0"
[[package]]
name = "air-interpreter"
version = "0.28.0"
version = "0.29.0"
dependencies = [
"air",
"air-interpreter-interface",

View File

@ -38,6 +38,7 @@ impl<'i> super::ExecutableInstruction<'i> for Call<'i> {
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(call, exec_ctx, trace_ctx);
println!("> {}", self);
exec_ctx.tracker.meet_call();
let resolved_call = joinable!(ResolvedCall::new(self, exec_ctx), exec_ctx)

View File

@ -160,6 +160,7 @@ impl<'i> ResolvedCall<'i> {
MergerCallResult::CallResult { value, trace_pos } => (value, trace_pos),
MergerCallResult::Empty => return Ok(StateDescriptor::no_previous_state()),
};
println!(" call result {:?}", call_result);
handle_prev_state(
&self.tetraplet,

View File

@ -22,6 +22,8 @@ use std::rc::Rc;
pub(crate) struct FoldState<'i> {
pub(crate) iterable: IterableValue,
pub(crate) iterable_type: IterableType,
// true of iterator exhausted and reverse execution started
pub(crate) back_iteration_started: bool,
pub(crate) instr_head: Rc<Instruction<'i>>,
}
@ -40,6 +42,7 @@ impl<'i> FoldState<'i> {
Self {
iterable,
iterable_type,
back_iteration_started: false,
instr_head,
}
}

View File

@ -14,6 +14,7 @@
* limitations under the License.
*/
mod completeness_updater;
mod stream_cursor;
use super::fold::*;
@ -25,6 +26,7 @@ use super::TraceHandler;
use crate::execution_step::boxed_value::Stream;
use crate::log_instruction;
use crate::trace_to_exec_err;
use completeness_updater::FoldCompletenessUpdater;
use stream_cursor::StreamCursor;
use air_parser::ast;
@ -38,8 +40,11 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
let iterable = &self.iterable;
let stream = match exec_ctx.streams.get(iterable.name, iterable.position) {
Some(stream) => stream,
// it's possible to met streams without variables at the moment in fold, they are treated as empty
None => return Ok(()),
None => {
// having empty streams means that it haven't been met yet, and it's needed to wait
exec_ctx.subgraph_complete = false;
return Ok(());
}
};
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
@ -47,26 +52,37 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
let mut stream_cursor = StreamCursor::new();
let mut stream_iterable = stream_cursor.construct_iterables(stream);
let mut completeness_updater = FoldCompletenessUpdater::new();
let mut result = Ok(true);
let mut result = Ok(());
// this cycle manages recursive streams
while !stream_iterable.is_empty() {
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
// write operation to this stream to write to this new generation
add_new_generation_if_non_empty(&self.iterable, exec_ctx);
result = execute_iterations(stream_iterable, self, fold_id, exec_ctx, trace_ctx);
result = execute_iterations(
stream_iterable,
self,
fold_id,
&mut completeness_updater,
exec_ctx,
trace_ctx,
);
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
// and likely that stream could be mutably borrowed in execute_iterations
let stream = remove_new_generation_if_non_empty(&self.iterable, exec_ctx);
if should_stop_iteration(&result) {
if result.is_err() {
println!("result is error {:?}", result);
break;
}
stream_iterable = stream_cursor.construct_iterables(stream)
}
completeness_updater.set_completeness(exec_ctx);
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?;
result.map(|_| ())
result
}
}
@ -74,10 +90,11 @@ fn execute_iterations<'i>(
iterables: Vec<IterableValue>,
fold_stream: &FoldStream<'i>,
fold_id: u32,
completeness_updater: &mut FoldCompletenessUpdater,
exec_ctx: &mut ExecutionCtx<'i>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<bool> {
for iterable in iterables {
) -> ExecutionResult<()> {
for iterable in iterables.into_iter() {
let value = match iterable.peek() {
Some(value) => value,
// it's ok, because some generation level of a stream on some point inside execution
@ -96,22 +113,12 @@ fn execute_iterations<'i>(
trace_ctx,
);
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream)?;
completeness_updater.observe_completeness(exec_ctx.subgraph_complete);
result?;
if !exec_ctx.subgraph_complete {
break;
}
}
Ok(exec_ctx.subgraph_complete)
}
fn should_stop_iteration(iteration_result: &ExecutionResult<bool>) -> bool {
match &iteration_result {
Ok(result) if !result => true,
Ok(_) => false,
Err(_) => true,
}
Ok(())
}
/// Safety: this function should be called iff stream is present in context

View File

@ -0,0 +1,37 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionCtx;
pub(super) struct FoldCompletenessUpdater {
subtree_complete: bool,
}
impl FoldCompletenessUpdater {
pub(super) fn new() -> Self {
Self {
subtree_complete: false,
}
}
pub(super) fn observe_completeness(&mut self, completeness: bool) {
self.subtree_complete |= completeness;
}
pub(super) fn set_completeness(self, exec_ctx: &mut ExecutionCtx<'_>) {
exec_ctx.subgraph_complete = self.subtree_complete;
}
}

View File

@ -28,13 +28,21 @@ use air_parser::ast::Next;
impl<'i> super::ExecutableInstruction<'i> for Next<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(next, exec_ctx, trace_ctx);
println!("> next");
let iterator_name = &self.iterator.name;
let fold_state = exec_ctx.scalars.get_iterable_mut(iterator_name)?;
maybe_meet_iteration_end(self, fold_state, trace_ctx)?;
if !fold_state.iterable.next() {
println!("> next back");
maybe_meet_back_iterator(self, fold_state, trace_ctx)?;
if !fold_state.back_iteration_started && matches!(fold_state.iterable_type, IterableType::Stream(_)) {
// this set the last iteration of a next to not executed for fold over streams
// for more info see https://github.com/fluencelabs/aquavm/issues/333
exec_ctx.subgraph_complete = false;
fold_state.back_iteration_started = true;
}
// just do nothing to exit
return Ok(());

View File

@ -33,6 +33,7 @@ use air_trace_handler::SubgraphType;
impl<'i> ExecutableInstruction<'i> for Par<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(par, exec_ctx, trace_ctx);
println!("> par");
let mut completeness_updater = ParCompletenessUpdater::new();
trace_to_exec_err!(trace_ctx.meet_par_start(), self)?;
@ -79,7 +80,7 @@ fn execute_subgraph<'i>(
}
};
completeness_updater.update_completeness(exec_ctx, subgraph_type);
completeness_updater.observe_completeness(exec_ctx, subgraph_type);
Ok(result)
}

View File

@ -31,7 +31,7 @@ impl ParCompletenessUpdater {
}
}
pub(super) fn update_completeness(&mut self, exec_ctx: &ExecutionCtx<'_>, subgraph_type: SubgraphType) {
pub(super) fn observe_completeness(&mut self, exec_ctx: &ExecutionCtx<'_>, subgraph_type: SubgraphType) {
match subgraph_type {
SubgraphType::Left => self.left_subgraph_complete = exec_ctx.subgraph_complete,
SubgraphType::Right => self.right_subgraph_complete = exec_ctx.subgraph_complete,

View File

@ -27,6 +27,7 @@ impl<'i> super::ExecutableInstruction<'i> for Seq<'i> {
exec_ctx.subgraph_complete = true;
self.0.execute(exec_ctx, trace_ctx)?;
println!("> seq {}", exec_ctx.subgraph_complete);
if exec_ctx.subgraph_complete {
self.1.execute(exec_ctx, trace_ctx)?;

View File

@ -1,42 +1,27 @@
(seq
(seq
(call "{0}" ("" "") ["stream_1"] stream_peers_1)
(call "{0}" ("" "") ["stream_2"] stream_peers_2)
)
(seq
(par
(fold stream_peers_1 v1
(par
(seq
(call v1 ("" "") [v1] $stream_1)
(call v1 ("" "") [v1] $stream_1)
)
(next v1)
)
)
(fold stream_peers_2 v2
(par
(seq
(call v2 ("" "") [v2] $stream_2)
(call v2 ("" "") [v2] $stream_2)
)
(next v2)
)
)
)
(fold $stream_1 v1
(seq
(fold $stream_2 v2
(seq
(seq
(call "{1}" ("" "") [v1 v2])
(next v2)
)
(call "{1}" ("" "") [v1 v2])
)
)
(next v1)
)
)
)
)
(seq
(call "{0}" ("" "") ["stream_1"] stream_peers_1)
(call "{0}" ("" "") ["stream_2"] stream_peers_2))
(seq
(par
(fold stream_peers_1 v1
(par
(seq
(call v1 ("" "") [v1] $stream_1)
(call v1 ("" "") [v1] $stream_1))
(next v1)))
(fold stream_peers_2 v2
(par
(seq
(call v2 ("" "") [v2] $stream_2)
(call v2 ("" "") [v2] $stream_2))
(next v2))))
(fold $stream_1 v1
(seq
(fold $stream_2 v2
(seq
(par
(call "{1}" ("" "") [v1 v2])
(next v2))
(call "{1}" ("" "") [v1 v2])))
(next v1)))))

View File

@ -39,26 +39,19 @@ fn ap_with_fold() {
(seq
(seq
(fold permutations pair
(seq
(par
(fold pair.$.[1]! peer_ids
(seq
(par
(ap peer_ids $inner)
(next peer_ids)
)
)
(next pair)
)
)
(next peer_ids)))
(next pair)))
(fold $inner ns
(next ns)
)
)
(par
(next ns)
(null))))
(seq
(call "{local_vm_peer_id}" ("op" "noop") [])
(call "{local_vm_peer_id}" ("return" "") [$inner])
)
)
)
(call "{local_vm_peer_id}" ("return" "") [$inner]))))
"#);
let result = checked_call_vm!(set_variable_vm, <_>::default(), &script, "", "");

View File

@ -124,7 +124,7 @@ fn recursive_stream_many_iterations() {
let expected_fold = executed_state::fold(vec![
executed_state::subtrace_lore(0, subtrace_desc(3, 2), subtrace_desc(5, 0)),
executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)),
executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(9, 0)),
executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)),
executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)),
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)),
executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)),
@ -257,27 +257,21 @@ fn recursive_stream_error_handling() {
(seq
(seq
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
)
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream))
(fold $stream iterator
(seq
(call "{vm_peer_id_1}" ("" "stop") [] value)
(xor
(match value "stop"
(null)
)
(null))
(seq
(ap value $stream)
(next iterator)
)
)
)
)
)
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
)"#);
(next iterator))))))
(call "{vm_peer_id_2}" ("" "") ["{result_value}"]))
"#);
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
println!("\n\n\n----\n");
let result = checked_call_vm!(vm_2, <_>::default(), &script, "", result.data);
let actual_trace = trace_from_result(&result);
let actual_last_state = &actual_trace[10.into()];
@ -356,7 +350,7 @@ fn recursive_stream_inner_fold() {
let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data);
let actual_trace = trace_from_result(&result);
let actual_last_state = &actual_trace[22.into()];
let actual_last_state = &actual_trace[31.into()];
let expected_last_state = executed_state::scalar_string(result_value);
assert_eq!(actual_last_state, &expected_last_state);

View File

@ -1,74 +1,47 @@
(seq
(seq
(seq
(seq
(seq
(call "{0}" ("" "") ["stream_1"] stream_1_ingredients)
(call "{0}" ("" "") ["stream_2"] stream_2_ingredients)
)
(call "{0}" ("" "") ["stream_3"] stream_3_ingredients)
)
(call "{0}" ("" "") ["stream_4"] stream_4_ingredients)
)
(seq
(seq
(seq
(fold stream_1_ingredients v1
(seq
(call "{1}" ("" "") [v1] $stream_1)
(next v1)
)
)
(fold stream_2_ingredients v2
(seq
(call "{1}" ("" "") [v2] $stream_2)
(next v2)
)
)
)
(fold stream_3_ingredients v3
(seq
(call "{1}" ("" "") [v3] $stream_3)
(next v3)
)
)
)
(fold stream_4_ingredients v4
(seq
(call "{1}" ("" "") [v4] $stream_4)
(next v4)
)
)
)
)
(par
(xor
(fold $stream_1 v1
(seq
(fold $stream_2 v2
(seq
(seq
(fold $stream_3 v3
(seq
(fold $stream_4 v4
(seq
(seq
(seq
(call "{0}" ("" "") ["stream_1"] stream_1_ingredients)
(call "{0}" ("" "") ["stream_2"] stream_2_ingredients))
(call "{0}" ("" "") ["stream_3"] stream_3_ingredients))
(call "{0}" ("" "") ["stream_4"] stream_4_ingredients))
(seq
(seq
(seq
(fold stream_1_ingredients v1
(seq
(call "{1}" ("" "") [v1] $stream_1)
(next v1)))
(fold stream_2_ingredients v2
(seq
(call "{1}" ("" "") [v2] $stream_2)
(next v2))))
(fold stream_3_ingredients v3
(seq
(call "{1}" ("" "") [v3] $stream_3)
(next v3))))
(fold stream_4_ingredients v4
(seq
(call "{1}" ("" "") [v4] $stream_4)
(next v4)))))
(par
(xor
(fold $stream_1 v1
(seq
(fold $stream_2 v2
(seq
(seq
(call "{2}" ("" "") [])
(next v4)
)
)
(next v3)
)
)
(call "{3}" ("error" "") []) ; will trigger an error
)
(next v2)
)
)
(next v1)
)
)
(call "{4}" ("" "") [%last_error%])
)
(call "{5}" ("" "") ["last_peer"])
)
)
(fold $stream_3 v3
(par
(fold $stream_4 v4
(par
(call "{2}" ("" "") [])
(next v4)))
(next v3)))
(call "{3}" ("error" "") [])) ; will trigger an error
(next v2)))
(next v1)))
(call "{4}" ("" "") [%last_error%]))
(call "{5}" ("" "") ["last_peer"])))

View File

@ -1,74 +1,47 @@
(seq
(seq
(seq
(seq
(seq
(call "{0}" ("" "") ["stream_1"] stream_1_ingredients)
(call "{0}" ("" "") ["stream_2"] stream_2_ingredients)
)
(call "{0}" ("" "") ["stream_3"] stream_3_ingredients)
)
(call "{0}" ("" "") ["stream_4"] stream_4_ingredients)
)
(seq
(seq
(seq
(fold stream_1_ingredients v1
(seq
(call "{1}" ("" "") [v1] $stream_1)
(next v1)
)
)
(fold stream_2_ingredients v2
(seq
(call "{1}" ("" "") [v2] $stream_2)
(next v2)
)
)
)
(fold stream_3_ingredients v3
(seq
(call "{1}" ("" "") [v3] $stream_3)
(next v3)
)
)
)
(fold stream_4_ingredients v4
(seq
(call "{1}" ("" "") [v4] $stream_4)
(next v4)
)
)
)
)
(par
(xor
(fold $stream_1 v1
(par
(fold $stream_2 v2
(par
(par
(fold $stream_3 v3
(par
(fold $stream_4 v4
(seq
(seq
(seq
(call "{0}" ("" "") ["stream_1"] stream_1_ingredients)
(call "{0}" ("" "") ["stream_2"] stream_2_ingredients))
(call "{0}" ("" "") ["stream_3"] stream_3_ingredients))
(call "{0}" ("" "") ["stream_4"] stream_4_ingredients))
(seq
(seq
(seq
(fold stream_1_ingredients v1
(seq
(call "{1}" ("" "") [v1] $stream_1)
(next v1)))
(fold stream_2_ingredients v2
(seq
(call "{1}" ("" "") [v2] $stream_2)
(next v2))))
(fold stream_3_ingredients v3
(seq
(call "{1}" ("" "") [v3] $stream_3)
(next v3))))
(fold stream_4_ingredients v4
(seq
(call "{1}" ("" "") [v4] $stream_4)
(next v4)))))
(par
(xor
(fold $stream_1 v1
(par
(fold $stream_2 v2
(par
(par
(call "{2}" ("" "") [])
(next v4)
)
)
(next v3)
)
)
(call "{3}" ("error" "") []) ; will trigger an error
)
(next v2)
)
)
(next v1)
)
)
(call "{4}" ("" "") [%last_error%])
)
(call "{5}" ("" "") [])
)
)
(fold $stream_3 v3
(par
(fold $stream_4 v4
(par
(call "{2}" ("" "") [])
(next v4)))
(next v3)))
(call "{3}" ("error" "") [])) ; will trigger an error
(next v2)))
(next v1)))
(call "{4}" ("" "") [%last_error%]))
(call "{5}" ("" "") [])))

View File

@ -1,31 +1,22 @@
(seq
(xor
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(seq
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{3}" ("error" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{3}" ("error" "") [] $stream)
)
(call "{3}" ("error" "") [] $stream)
)
)
(null)
)
(call "{0}" ("" "") []) ;; this one is needed to check check that sliders switched correctly
(xor
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(seq
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{3}" ("error" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{3}" ("error" "") [] $stream))
(call "{3}" ("error" "") [] $stream)))
(null))
(call "{0}" ("" "") []) ;; this one is needed to check check that sliders switched correctly
)

View File

@ -1,33 +1,22 @@
(seq
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{2}" ("" "") [] $stream)
)
)
(fold $stream v
(seq
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{2}" ("" "") [] $stream)))
(fold $stream v
(seq
(call "{4}" ("" "") [v])
(call "{4}" ("" "") [v])
)
(next v)
)
)
)
(seq
(call "{4}" ("" "") [v])
(call "{4}" ("" "") [v]))
(next v))))

View File

@ -1,33 +1,22 @@
(seq
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{2}" ("" "") [] $stream)
)
)
(fold $stream v
(seq
(seq
(call "{4}" ("" "") [v])
(next v)
)
(call "{4}" ("" "") [v])
)
)
)
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{2}" ("" "") [] $stream)))
(fold $stream v
(par
(seq
(call "{4}" ("" "") [v])
(next v))
(call "{4}" ("" "") [v]))))

View File

@ -1,33 +1,22 @@
(seq
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{3}" ("" "") [] $stream)
)
(call "{1}" ("" "") [] $stream)
)
(call "{2}" ("" "") [] $stream)
)
)
(fold $stream v
(seq
(seq
(next v)
(call "{4}" ("" "") [v])
)
(call "{4}" ("" "") [v])
)
)
)
(seq
(call "{0}" ("" "") []) ;; initiator that should send data to stream generators
(par
(par
(par
(par
(par
(par
(call "{1}" ("" "") [] $stream)
(call "{2}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{3}" ("" "") [] $stream))
(call "{1}" ("" "") [] $stream))
(call "{2}" ("" "") [] $stream)))
(fold $stream v
(par
(seq
(next v)
(call "{4}" ("" "") [v 1]))
(call "{4}" ("" "") [v]))))

View File

@ -237,12 +237,15 @@ fn stream_merging_v1() {
executed_state::stream_string("1", 0),
executed_state::request_sent_by(initiator_id),
executed_state::fold(vec![
executed_state::subtrace_lore(7, subtrace_desc(15, 1), subtrace_desc(20, 1)),
executed_state::subtrace_lore(9, subtrace_desc(16, 1), subtrace_desc(19, 1)),
executed_state::subtrace_lore(12, subtrace_desc(17, 1), subtrace_desc(18, 1)),
executed_state::subtrace_lore(7, subtrace_desc(15, 2), subtrace_desc(23, 1)),
executed_state::subtrace_lore(9, subtrace_desc(17, 2), subtrace_desc(22, 1)),
executed_state::subtrace_lore(12, subtrace_desc(19, 2), subtrace_desc(21, 1)),
]),
executed_state::par(7, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(4, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
@ -275,19 +278,24 @@ fn stream_merging_v1() {
executed_state::stream_string("1", 0),
executed_state::stream_string("2", 1),
executed_state::fold(vec![
executed_state::subtrace_lore(7, subtrace_desc(15, 1), subtrace_desc(20, 1)),
executed_state::subtrace_lore(9, subtrace_desc(16, 1), subtrace_desc(19, 1)),
executed_state::subtrace_lore(12, subtrace_desc(17, 1), subtrace_desc(18, 1)),
executed_state::subtrace_lore(8, subtrace_desc(21, 1), subtrace_desc(24, 1)),
executed_state::subtrace_lore(13, subtrace_desc(22, 1), subtrace_desc(23, 1)),
executed_state::subtrace_lore(7, subtrace_desc(15, 2), subtrace_desc(23, 1)),
executed_state::subtrace_lore(9, subtrace_desc(17, 2), subtrace_desc(22, 1)),
executed_state::subtrace_lore(12, subtrace_desc(19, 2), subtrace_desc(21, 1)),
executed_state::subtrace_lore(8, subtrace_desc(24, 2), subtrace_desc(29, 1)),
executed_state::subtrace_lore(13, subtrace_desc(26, 2), subtrace_desc(28, 1)),
]),
executed_state::par(7, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(4, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(4, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
@ -319,25 +327,32 @@ fn stream_merging_v1() {
executed_state::stream_string("1", 0),
executed_state::stream_string("2", 1),
executed_state::fold(vec![
executed_state::subtrace_lore(7, subtrace_desc(15, 1), subtrace_desc(20, 1)),
executed_state::subtrace_lore(9, subtrace_desc(16, 1), subtrace_desc(19, 1)),
executed_state::subtrace_lore(12, subtrace_desc(17, 1), subtrace_desc(18, 1)),
executed_state::subtrace_lore(8, subtrace_desc(21, 1), subtrace_desc(24, 1)),
executed_state::subtrace_lore(13, subtrace_desc(22, 1), subtrace_desc(23, 1)),
executed_state::subtrace_lore(10, subtrace_desc(25, 1), subtrace_desc(28, 1)),
executed_state::subtrace_lore(11, subtrace_desc(26, 1), subtrace_desc(27, 1)),
executed_state::subtrace_lore(7, subtrace_desc(15, 2), subtrace_desc(23, 1)),
executed_state::subtrace_lore(9, subtrace_desc(17, 2), subtrace_desc(22, 1)),
executed_state::subtrace_lore(12, subtrace_desc(19, 2), subtrace_desc(21, 1)),
executed_state::subtrace_lore(8, subtrace_desc(24, 2), subtrace_desc(29, 1)),
executed_state::subtrace_lore(13, subtrace_desc(26, 2), subtrace_desc(28, 1)),
executed_state::subtrace_lore(10, subtrace_desc(30, 2), subtrace_desc(35, 1)),
executed_state::subtrace_lore(11, subtrace_desc(32, 2), subtrace_desc(34, 1)),
]),
executed_state::par(7, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(4, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(4, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(4, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 1),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
@ -366,9 +381,11 @@ fn stream_merging_v2() {
let initiator_result = checked_call_vm!(initiator, <_>::default(), &script, "", "");
let setter_1_res = checked_call_vm!(setter_1, <_>::default(), &script, "", initiator_result.data.clone());
let setter_2_res = checked_call_vm!(setter_2, <_>::default(), &script, "", initiator_result.data.clone());
let setter_3_res = checked_call_vm!(setter_3, <_>::default(), &script, "", initiator_result.data);
//let setter_2_res = checked_call_vm!(setter_2, <_>::default(), &script, "", initiator_result.data.clone());
//let setter_3_res = checked_call_vm!(setter_3, <_>::default(), &script, "", initiator_result.data);
println!("\n\n\n\n\n----\n\n");
print_trace(&setter_1_res, "setter 1");
let executor_result_1 = checked_call_vm!(executor, <_>::default(), &script, "", setter_1_res.data);
let actual_trace_1 = trace_from_result(&executor_result_1);
@ -389,9 +406,9 @@ fn stream_merging_v2() {
executed_state::stream_string("1", 0),
executed_state::request_sent_by(initiator_id),
executed_state::fold(vec![
executed_state::subtrace_lore(7, subtrace_desc(15, 0), subtrace_desc(19, 2)),
executed_state::subtrace_lore(9, subtrace_desc(15, 0), subtrace_desc(17, 2)),
executed_state::subtrace_lore(12, subtrace_desc(15, 0), subtrace_desc(15, 2)),
executed_state::subtrace_lore(7, subtrace_desc(15, 1), subtrace_desc(21, 2)),
executed_state::subtrace_lore(9, subtrace_desc(16, 1), subtrace_desc(19, 2)),
executed_state::subtrace_lore(12, subtrace_desc(17, 1), subtrace_desc(18, 2)),
]),
executed_state::scalar_string(unit_call_service_result),
executed_state::scalar_string(unit_call_service_result),
@ -402,6 +419,7 @@ fn stream_merging_v2() {
];
assert_eq!(actual_trace_1, expected_trace_1);
/*
let executor_result_2 = checked_call_vm!(
executor,
<_>::default(),
@ -495,4 +513,6 @@ fn stream_merging_v2() {
executed_state::scalar_string(unit_call_service_result),
];
assert_eq!(actual_trace_3, expected_trace_3);
*/
}

View File

@ -225,6 +225,7 @@ fn fold_early_exit() {
"",
last_error_receiver_result.data
);
print_trace(&last_peer_checker_result, "");
let actual_trace = trace_from_result(&last_peer_checker_result);
let unit_call_service_result = "result from unit_call_service";
@ -241,32 +242,38 @@ fn fold_early_exit() {
executed_state::stream_string("c2", 0),
executed_state::stream_string("d1", 0),
executed_state::stream_string("d2", 0),
executed_state::par(11, 1),
executed_state::par(17, 1),
executed_state::fold(vec![executed_state::subtrace_lore(
4,
subtrace_desc(14, 9),
subtrace_desc(23, 0),
subtrace_desc(14, 15),
subtrace_desc(29, 0),
)]),
executed_state::fold(vec![executed_state::subtrace_lore(
6,
subtrace_desc(15, 8),
subtrace_desc(23, 0),
subtrace_desc(15, 14),
subtrace_desc(29, 0),
)]),
executed_state::fold(vec![
executed_state::subtrace_lore(8, subtrace_desc(16, 3), subtrace_desc(22, 0)),
executed_state::subtrace_lore(9, subtrace_desc(19, 3), subtrace_desc(22, 0)),
executed_state::subtrace_lore(8, subtrace_desc(16, 6), subtrace_desc(28, 0)),
executed_state::subtrace_lore(9, subtrace_desc(22, 6), subtrace_desc(28, 0)),
]),
executed_state::par(5, 6),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(17, 1), subtrace_desc(19, 0)),
executed_state::subtrace_lore(11, subtrace_desc(18, 1), subtrace_desc(19, 0)),
executed_state::subtrace_lore(10, subtrace_desc(18, 2), subtrace_desc(22, 0)),
executed_state::subtrace_lore(11, subtrace_desc(20, 2), subtrace_desc(22, 0)),
]),
executed_state::par(1, 2),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(5, 0),
executed_state::fold(vec![
executed_state::subtrace_lore(10, subtrace_desc(20, 1), subtrace_desc(22, 0)),
executed_state::subtrace_lore(11, subtrace_desc(21, 1), subtrace_desc(22, 0)),
executed_state::subtrace_lore(10, subtrace_desc(24, 2), subtrace_desc(28, 0)),
executed_state::subtrace_lore(11, subtrace_desc(26, 2), subtrace_desc(28, 0)),
]),
executed_state::par(1, 2),
executed_state::scalar_string(unit_call_service_result),
executed_state::par(1, 0),
executed_state::scalar_string(unit_call_service_result),
executed_state::service_failed(1, "failed result from fallible_call_service"),
executed_state::scalar(json!({

View File

@ -422,3 +422,153 @@ fn shadowing_scope() {
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_waits_on_empty_stream() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(echo_call_service(), vm_peer_id);
let script = f!(r#"
(par
(call "" ("" "") [] $stream)
(fold $stream iterator
(seq
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
(next iterator))))
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![executed_state::par(1, 0), executed_state::request_sent_by(vm_peer_id)];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_seq_next_never_completes() {
let vm_peer_id = "vm_peer_id";
let mut vm = create_avm(set_variable_call_service(json!(1)), vm_peer_id);
let script = f!(r#"
(seq
(call "{vm_peer_id}" ("" "") [] $stream)
(seq
(fold $stream iterator
(seq
(call "{vm_peer_id}" ("" "") [iterator] $new_stream)
(next iterator)))
(call "{vm_peer_id}" ("" "") [])))
"#);
let result = checked_call_vm!(vm, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let expected_trace = vec![
executed_state::stream_number(1, 0),
executed_state::fold(vec![subtrace_lore(
0,
SubTraceDesc::new(2.into(), 1),
SubTraceDesc::new(3.into(), 0),
)]),
executed_state::stream_number(1, 0),
];
assert_eq!(actual_trace, expected_trace);
}
#[test]
fn fold_par_next_completes() {
let vm_1_peer_id = "vm_1_peer_id";
let mut vm_1 = create_avm(set_variable_call_service(json!(1)), vm_1_peer_id);
let vm_2_peer_id = "vm_2_peer_id";
let mut vm_2 = create_avm(set_variable_call_service(json!(1)), vm_2_peer_id);
let vm_3_peer_id = "vm_3_peer_id";
let mut vm_3 = create_avm(set_variable_call_service(json!(1)), vm_3_peer_id);
let vm_4_peer_id = "vm_4_peer_id";
let mut vm_4 = create_avm(set_variable_call_service(json!(1)), vm_4_peer_id);
let script = f!(r#"
(seq
(seq
(seq
(ap "{vm_2_peer_id}" $stream)
(ap "{vm_3_peer_id}" $stream))
(ap "{vm_4_peer_id}" $stream))
(seq
(fold $stream peer_id
(par
(call peer_id ("" "") [] $new_stream)
(next peer_id)))
(call "{vm_1_peer_id}" ("" "") []) ; this call should be executed if any of these three peers is reached
)
)
"#);
let result_1 = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
print_trace(&result_1, "");
let result_2 = checked_call_vm!(vm_2, <_>::default(), &script, "", result_1.data.clone());
let actual_trace = trace_from_result(&result_2);
let expected_trace = vec![
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::fold(vec![
subtrace_lore(0, SubTraceDesc::new(4.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(1, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(2, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)),
]),
executed_state::par(1, 4),
executed_state::stream_number(1, 0),
executed_state::par(1, 2),
executed_state::request_sent_by(vm_1_peer_id),
executed_state::par(1, 0),
executed_state::request_sent_by(vm_1_peer_id),
executed_state::request_sent_by(vm_2_peer_id),
];
assert_eq!(actual_trace, expected_trace);
let result_3 = checked_call_vm!(vm_3, <_>::default(), &script, "", result_1.data.clone());
let actual_trace = trace_from_result(&result_3);
let expected_trace = vec![
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::fold(vec![
subtrace_lore(0, SubTraceDesc::new(4.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(1, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(2, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)),
]),
executed_state::par(1, 4),
executed_state::request_sent_by(vm_1_peer_id),
executed_state::par(1, 2),
executed_state::stream_number(1, 0),
executed_state::par(1, 0),
executed_state::request_sent_by(vm_1_peer_id),
executed_state::request_sent_by(vm_3_peer_id),
];
assert_eq!(actual_trace, expected_trace);
let result_4 = checked_call_vm!(vm_4, <_>::default(), &script, "", result_1.data);
let actual_trace = trace_from_result(&result_4);
let expected_trace = vec![
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::ap(Some(0)),
executed_state::fold(vec![
subtrace_lore(0, SubTraceDesc::new(4.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(1, SubTraceDesc::new(6.into(), 2), SubTraceDesc::new(10.into(), 0)),
subtrace_lore(2, SubTraceDesc::new(8.into(), 2), SubTraceDesc::new(10.into(), 0)),
]),
executed_state::par(1, 4),
executed_state::request_sent_by(vm_1_peer_id),
executed_state::par(1, 2),
executed_state::request_sent_by(vm_1_peer_id),
executed_state::par(1, 0),
executed_state::stream_number(1, 0),
executed_state::request_sent_by(vm_4_peer_id),
];
assert_eq!(actual_trace, expected_trace);
}

View File

@ -44,7 +44,7 @@ fn new_with_global_streams_seq() {
(seq
(new $stream
(seq
(seq
(par
(call "{local_vm_peer_id_1}" ("" "") [i] $stream)
(next i)
)

View File

@ -1,34 +1,22 @@
(seq
(seq
(seq
(call "client_id" ("" "") ["relay"] relay)
(call "client_id" ("" "") ["client"] client)
)
(seq
(call relay ("dht" "neighborhood") [relay] neighs_top)
(seq
(fold neighs_top n
(seq
(call n ("dht" "neighborhood") [n] $neighs_inner)
(next n)
)
)
(fold $neighs_inner ns
(seq
(fold ns n
(seq
(call n ("op" "identify") [] $services)
(next n)
)
)
(next ns)
)
)
)
)
)
(seq
(call relay ("op" "identity") [])
(call client ("return" "") [$services $neighs_inner neighs_top])
)
)
(seq
(seq
(call "client_id" ("" "") ["relay"] relay)
(call "client_id" ("" "") ["client"] client))
(seq
(call relay ("dht" "neighborhood") [relay] neighs_top) ;
(seq
(fold neighs_top n
(seq
(call n ("dht" "neighborhood") [n] $neighs_inner)
(next n)))
(fold $neighs_inner ns
(seq
(fold ns n
(seq
(call n ("op" "identify") [] $services)
(next n)))
(next ns))))))
(seq
(call relay ("op" "identity") [])
(call client ("return" "") [$services $neighs_inner neighs_top])))

View File

@ -22,6 +22,7 @@ use crate::wasm_test_runner::WasmAirRunner as AirRunnerImpl;
use super::CallServiceClosure;
use avm_server::avm_runner::*;
use crate::print_trace;
use std::collections::HashMap;
use std::collections::HashSet;
@ -75,6 +76,7 @@ impl<R: AirRunner> TestRunner<R> {
let mut next_peer_pks = HashSet::new();
loop {
println!("call_results: {:?}", call_results);
let mut outcome: RawAVMOutcome = self
.runner
.call(
@ -87,6 +89,7 @@ impl<R: AirRunner> TestRunner<R> {
call_results,
)
.map_err(|e| e.to_string())?;
print_trace(&outcome, "");
next_peer_pks.extend(outcome.next_peer_pks);