mirror of
https://github.com/fluencelabs/aquavm
synced 2025-07-30 21:42:04 +00:00
BREAKING CHANGE: 1. Call values in the trace have CID references to structures that have call arguments' hash and CID references to values and tetraplets. 2. If call value is unused, it is serialized with `Unused` variant, and CID references are not stored. Previous data scheme was (Scalar as an example, other cases are similar): ``` Scalar(CID<JValue>) ---<value_store>----> JValue ``` New data scheme is much more sophisticated: ``` Scalar(CID<ServiceResultAggregate>) ---+ | +----<service_result_store>----------+ | +-------> ServiceResultAggregate: value_cid ------------<value_store>----> JValue tetraplet_cid --------<tetraplet_store>----> SecurityTetraplet argument_hash: String ``` `Stream` variant is similar, however, `Unused` is different: it has value CID only, but the value is not stored into the `value_store`: ``` Unused(Rc<CID<JValue>>) ---> X ``` Co-authored-by: Mike Voronov <michail.vms@gmail.com>
457 lines
17 KiB
Rust
457 lines
17 KiB
Rust
/*
|
|
* Copyright 2022 Fluence Labs Limited
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*/
|
|
|
|
use air_interpreter_data::ExecutionTrace;
|
|
use air_test_utils::prelude::*;
|
|
|
|
use pretty_assertions::assert_eq;
|
|
|
|
#[test]
|
|
fn recursive_stream_with_early_exit() {
|
|
let vm_peer_id = "vm_peer_id";
|
|
let variable_mappings = maplit::hashmap! {
|
|
"stream_value".to_string() => json!(1),
|
|
"stop".to_string() => json!("stop"),
|
|
};
|
|
let mut vm = create_avm(
|
|
set_variables_call_service(variable_mappings, VariableOptionSource::FunctionName),
|
|
vm_peer_id,
|
|
);
|
|
|
|
let script = f!(r#"
|
|
(seq
|
|
(seq
|
|
(call "{vm_peer_id}" ("" "stream_value") [] $stream)
|
|
(call "{vm_peer_id}" ("" "stream_value") [] $stream)
|
|
)
|
|
(fold $stream iterator
|
|
(seq
|
|
(call "{vm_peer_id}" ("" "stop") [] value)
|
|
(xor
|
|
(match value "stop"
|
|
(null)
|
|
)
|
|
(seq
|
|
(ap value $stream)
|
|
(next iterator)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)"#);
|
|
|
|
let result = checked_call_vm!(vm, <_>::default(), script, "", "");
|
|
let actual_trace = trace_from_result(&result);
|
|
let expected_state = vec![
|
|
stream!(1, 0, peer = vm_peer_id, function = "stream_value"),
|
|
stream!(1, 1, peer = vm_peer_id, function = "stream_value"),
|
|
executed_state::fold(vec![
|
|
executed_state::subtrace_lore(0, subtrace_desc(3, 1), subtrace_desc(4, 0)),
|
|
executed_state::subtrace_lore(1, subtrace_desc(4, 1), subtrace_desc(5, 0)),
|
|
]),
|
|
scalar!("stop", peer = vm_peer_id, function = "stop"),
|
|
scalar!("stop", peer = vm_peer_id, function = "stop"),
|
|
];
|
|
|
|
assert_eq!(actual_trace, expected_state);
|
|
}
|
|
|
|
#[test]
|
|
fn recursive_stream_many_iterations() {
|
|
let vm_peer_id_1 = "vm_peer_id_1";
|
|
|
|
let request_id = std::cell::Cell::new(0);
|
|
let stop_request_id = 10;
|
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
|
let uncelled_request_id = request_id.get();
|
|
|
|
let result = if uncelled_request_id >= stop_request_id {
|
|
CallServiceResult::ok(json!("stop"))
|
|
} else {
|
|
CallServiceResult::ok(json!("non_stop"))
|
|
};
|
|
|
|
request_id.set(uncelled_request_id + 1);
|
|
result
|
|
});
|
|
|
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
|
|
|
let vm_peer_id_2 = "vm_peer_id_2";
|
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
|
|
|
let result_value = "result_value";
|
|
let script = f!(r#"
|
|
(seq
|
|
(seq
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
|
)
|
|
(fold $stream iterator
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
|
(xor
|
|
(match value "stop"
|
|
(null)
|
|
)
|
|
(seq
|
|
(ap value $stream)
|
|
(next iterator)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
|
|
)"#);
|
|
|
|
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
|
|
let actual_trace = trace_from_result(&result);
|
|
let actual_fold = &actual_trace[2.into()];
|
|
let expected_fold_v1 = executed_state::fold(vec![
|
|
executed_state::subtrace_lore(0, subtrace_desc(3, 2), subtrace_desc(5, 0)),
|
|
executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)),
|
|
executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)),
|
|
executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)),
|
|
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)),
|
|
executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)),
|
|
executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(19, 0)),
|
|
executed_state::subtrace_lore(14, subtrace_desc(17, 2), subtrace_desc(19, 0)),
|
|
executed_state::subtrace_lore(16, subtrace_desc(19, 1), subtrace_desc(20, 0)),
|
|
]);
|
|
|
|
let expected_fold_v2 = executed_state::fold(vec![
|
|
executed_state::subtrace_lore(0, subtrace_desc(3, 2), subtrace_desc(5, 0)),
|
|
executed_state::subtrace_lore(1, subtrace_desc(5, 2), subtrace_desc(7, 0)),
|
|
executed_state::subtrace_lore(4, subtrace_desc(7, 2), subtrace_desc(11, 0)),
|
|
executed_state::subtrace_lore(6, subtrace_desc(9, 2), subtrace_desc(11, 0)),
|
|
executed_state::subtrace_lore(8, subtrace_desc(11, 2), subtrace_desc(15, 0)),
|
|
executed_state::subtrace_lore(10, subtrace_desc(13, 2), subtrace_desc(15, 0)),
|
|
executed_state::subtrace_lore(12, subtrace_desc(15, 2), subtrace_desc(18, 0)),
|
|
executed_state::subtrace_lore(14, subtrace_desc(17, 1), subtrace_desc(18, 0)),
|
|
executed_state::subtrace_lore(16, subtrace_desc(18, 2), subtrace_desc(20, 0)),
|
|
executed_state::subtrace_lore(19, subtrace_desc(20, 1), subtrace_desc(21, 0)),
|
|
]);
|
|
|
|
let test_passed = (actual_fold == &expected_fold_v1) || (actual_fold == &expected_fold_v2);
|
|
assert!(test_passed);
|
|
|
|
let actual_last_state = actual_trace.last().unwrap();
|
|
let expected_last_state = executed_state::request_sent_by(vm_peer_id_1);
|
|
assert_eq!(actual_last_state, &expected_last_state);
|
|
|
|
let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data);
|
|
let actual_trace = trace_from_result(&result);
|
|
let actual_last_state = actual_trace.last().unwrap();
|
|
let expected_last_state = unused!(result_value, peer = vm_peer_id_2, args = [result_value]);
|
|
assert_eq!(actual_last_state, &expected_last_state);
|
|
}
|
|
|
|
#[test]
|
|
fn recursive_stream_join() {
|
|
let vm_peer_id_1 = "vm_peer_id_1";
|
|
|
|
let request_id = std::cell::Cell::new(0);
|
|
let stop_request_id = 5;
|
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
|
let uncelled_request_id = request_id.get();
|
|
|
|
let result = if uncelled_request_id >= stop_request_id {
|
|
CallServiceResult::ok(json!("join"))
|
|
} else {
|
|
CallServiceResult::ok(json!("non_join"))
|
|
};
|
|
|
|
request_id.set(uncelled_request_id + 1);
|
|
result
|
|
});
|
|
|
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
|
|
|
let vm_peer_id_2 = "vm_peer_id_2";
|
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
|
let vm_peer_id_3 = "vm_peer_id_3";
|
|
let mut vm_3 = create_avm(echo_call_service(), vm_peer_id_3);
|
|
|
|
let result_value = "result_value";
|
|
let script = f!(r#"
|
|
(seq
|
|
(seq
|
|
(par
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
|
(call "{vm_peer_id_3}" ("" "stream_value") [""] join_variable)
|
|
)
|
|
(fold $stream iterator
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "") [""] value)
|
|
(xor
|
|
(match value "join"
|
|
(call "{vm_peer_id_2}" ("" "") [join_variable])
|
|
)
|
|
(seq
|
|
(ap value $stream)
|
|
(next iterator)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"])
|
|
)"#);
|
|
|
|
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
|
|
let result = checked_call_vm!(vm_3, <_>::default(), &script, "", result.data);
|
|
let result = checked_call_vm!(vm_2, <_>::default(), &script, "", result.data);
|
|
let actual_trace = trace_from_result(&result);
|
|
let expected_trace = ExecutionTrace::from(vec![
|
|
executed_state::par(1, 1),
|
|
stream!("non_join", 0, peer = vm_peer_id_1, function = "stream_value"),
|
|
scalar!("", peer = vm_peer_id_3, function = "stream_value", args = [""]),
|
|
executed_state::fold(vec![
|
|
executed_state::subtrace_lore(1, subtrace_desc(4, 2), subtrace_desc(6, 0)),
|
|
executed_state::subtrace_lore(5, subtrace_desc(6, 2), subtrace_desc(8, 0)),
|
|
executed_state::subtrace_lore(7, subtrace_desc(8, 2), subtrace_desc(10, 0)),
|
|
executed_state::subtrace_lore(9, subtrace_desc(10, 2), subtrace_desc(12, 0)),
|
|
executed_state::subtrace_lore(11, subtrace_desc(12, 2), subtrace_desc(14, 0)),
|
|
]),
|
|
scalar!("non_join", peer = vm_peer_id_1, args = [""]),
|
|
executed_state::ap(1),
|
|
scalar!("non_join", peer = vm_peer_id_1, args = [""]),
|
|
executed_state::ap(2),
|
|
scalar!("non_join", peer = vm_peer_id_1, args = [""]),
|
|
executed_state::ap(3),
|
|
scalar!("non_join", peer = vm_peer_id_1, args = [""]),
|
|
executed_state::ap(4),
|
|
scalar!("join", peer = vm_peer_id_1, args = [""]),
|
|
unused!("", peer = vm_peer_id_2, args = [""]),
|
|
unused!(result_value, peer = vm_peer_id_2, args = [result_value]),
|
|
]);
|
|
assert_eq!(actual_trace, expected_trace);
|
|
}
|
|
|
|
#[test]
|
|
fn recursive_stream_error_handling() {
|
|
let vm_peer_id_1 = "vm_peer_id_1";
|
|
|
|
let request_id = std::cell::Cell::new(0);
|
|
let stop_request_id = 5;
|
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
|
let uncelled_request_id = request_id.get();
|
|
|
|
let result = if uncelled_request_id >= stop_request_id {
|
|
CallServiceResult::err(1, json!("error"))
|
|
} else {
|
|
CallServiceResult::ok(json!("non_stop"))
|
|
};
|
|
|
|
request_id.set(uncelled_request_id + 1);
|
|
result
|
|
});
|
|
|
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
|
|
|
let result_value = "result_value";
|
|
let vm_peer_id_2 = "vm_peer_id_2";
|
|
let script = f!(r#"
|
|
(xor
|
|
(seq
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream)
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream))
|
|
(fold $stream iterator
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
|
(xor
|
|
(match value "stop"
|
|
(null))
|
|
(seq
|
|
(ap value $stream)
|
|
(next iterator)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"]))
|
|
"#);
|
|
|
|
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
|
|
let actual_trace = trace_from_result(&result);
|
|
|
|
let expected_trace = vec![
|
|
stream!("non_stop", 0, peer = vm_peer_id_1, function = "stream_value"),
|
|
stream!("non_stop", 1, peer = vm_peer_id_1, function = "stream_value"),
|
|
executed_state::fold(vec![
|
|
subtrace_lore(0, SubTraceDesc::new(3.into(), 2), SubTraceDesc::new(5.into(), 0)),
|
|
subtrace_lore(1, SubTraceDesc::new(5.into(), 2), SubTraceDesc::new(7.into(), 0)),
|
|
subtrace_lore(4, SubTraceDesc::new(7.into(), 2), SubTraceDesc::new(10.into(), 0)),
|
|
subtrace_lore(6, SubTraceDesc::new(9.into(), 1), SubTraceDesc::new(10.into(), 0)),
|
|
subtrace_lore(8, SubTraceDesc::new(10.into(), 1), SubTraceDesc::new(11.into(), 0)),
|
|
]),
|
|
scalar!("non_stop", peer = vm_peer_id_1, function = "stop"),
|
|
executed_state::ap(2),
|
|
scalar!("non_stop", peer = vm_peer_id_1, function = "stop"),
|
|
executed_state::ap(2),
|
|
scalar!("non_stop", peer = vm_peer_id_1, function = "stop"),
|
|
executed_state::ap(3),
|
|
failed!(1, "error", peer = vm_peer_id_1, function = "stop"),
|
|
failed!(1, "error", peer = vm_peer_id_1, function = "stop"),
|
|
];
|
|
|
|
assert_eq!(actual_trace, expected_trace);
|
|
}
|
|
|
|
#[test]
|
|
fn recursive_stream_inner_fold() {
|
|
let vm_peer_id_1 = "vm_peer_id_1";
|
|
|
|
let request_id = std::cell::Cell::new(0);
|
|
let stop_request_id = 10;
|
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
|
let uncelled_request_id = request_id.get();
|
|
|
|
let result = if uncelled_request_id >= stop_request_id {
|
|
CallServiceResult::ok(json!("stop"))
|
|
} else {
|
|
CallServiceResult::ok(json!("non_stop"))
|
|
};
|
|
|
|
request_id.set(uncelled_request_id + 1);
|
|
result
|
|
});
|
|
|
|
let mut vm_1 = create_avm(give_n_results_and_then_stop, vm_peer_id_1);
|
|
|
|
let vm_peer_id_2 = "vm_peer_id_2";
|
|
let mut vm_2 = create_avm(echo_call_service(), vm_peer_id_2);
|
|
|
|
let result_value = "result_value";
|
|
let script = f!(r#"
|
|
(seq
|
|
(seq
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream_1)
|
|
(call "{vm_peer_id_1}" ("" "stream_value") [] $stream_2))
|
|
(fold $stream_1 iterator_1
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
|
(xor
|
|
(match value "stop"
|
|
(null))
|
|
(seq
|
|
(seq
|
|
(ap value $stream_1)
|
|
(fold $stream_2 iterator_2
|
|
(seq
|
|
(call "{vm_peer_id_1}" ("" "stop") [] value)
|
|
(xor
|
|
(match value "stop"
|
|
(null))
|
|
(seq
|
|
(ap value $stream_2)
|
|
(next iterator_2))))))
|
|
(next iterator_1))))))
|
|
(call "{vm_peer_id_2}" ("" "") ["{result_value}"]))
|
|
"#);
|
|
|
|
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
|
|
let result = checked_call_vm!(vm_2, <_>::default(), script, "", result.data);
|
|
let actual_trace = trace_from_result(&result);
|
|
|
|
let actual_last_state = actual_trace.last().unwrap();
|
|
let expected_last_state = unused!(result_value, peer = vm_peer_id_2, args = [result_value]);
|
|
assert_eq!(actual_last_state, &expected_last_state);
|
|
}
|
|
|
|
#[test]
|
|
fn recursive_stream_fold_with_n_service_call() {
|
|
let vm_peer_id = "vm_peer_id_1";
|
|
|
|
let request_id = std::cell::Cell::new(0);
|
|
let stop_request_id = 10;
|
|
let give_n_results_and_then_stop: CallServiceClosure = Box::new(move |_params| {
|
|
let uncelled_request_id = request_id.get();
|
|
|
|
let result = if uncelled_request_id >= stop_request_id {
|
|
CallServiceResult::ok(json!("no"))
|
|
} else {
|
|
CallServiceResult::ok(json!("yes"))
|
|
};
|
|
|
|
request_id.set(uncelled_request_id + 1);
|
|
result
|
|
});
|
|
|
|
let mut vm = create_avm(give_n_results_and_then_stop, vm_peer_id);
|
|
|
|
let script = f!(r#"
|
|
(xor
|
|
(seq
|
|
(seq
|
|
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
|
|
(new $loop
|
|
(new $result
|
|
(seq
|
|
(seq
|
|
(ap "yes" $loop)
|
|
(fold $loop l
|
|
(seq
|
|
(seq
|
|
(xor
|
|
(match l "yes"
|
|
(xor
|
|
(call %init_peer_id% ("yesno" "get") [] $loop)
|
|
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
|
|
)
|
|
)
|
|
(null)
|
|
)
|
|
(ap "success" $result)
|
|
)
|
|
(next l)
|
|
)
|
|
)
|
|
)
|
|
(seq
|
|
(canon %init_peer_id% $result #canon_stream)
|
|
(call %init_peer_id% ("op" "identity") [#canon_stream] result-fix)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
)
|
|
(xor
|
|
(call %init_peer_id% ("callbackSrv" "response") [result-fix])
|
|
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
|
|
)
|
|
)
|
|
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
|
|
)
|
|
"#);
|
|
|
|
let test_params = TestRunParameters::from_init_peer_id(vm_peer_id);
|
|
let result = checked_call_vm!(vm, test_params, &script, "", "");
|
|
let actual_trace = trace_from_result(&result);
|
|
let actual_fold_state = match &actual_trace[2.into()] {
|
|
ExecutedState::Fold(fold_result) => fold_result,
|
|
_ => panic!("2nd state should be fold"),
|
|
};
|
|
let expected_fold_lores = stop_request_id + 1;
|
|
|
|
assert_eq!(actual_fold_state.lore.len(), expected_fold_lores);
|
|
}
|