mirror of
https://github.com/fluencelabs/aquavm
synced 2025-06-26 13:11:35 +00:00
chore(execution-engine): Some stream-related LambdaError are unjoinab… (#503)
chore(execution-engine): Some stream-related LambdaError are unjoinable b/c: canon stream replaces normal stream, when canon stream is used, it is materialized and its size is known
This commit is contained in:
@ -141,15 +141,6 @@ impl Joinable for CatchableError {
|
|||||||
log_join!(" waiting for an argument with name '{}'", var_name);
|
log_join!(" waiting for an argument with name '{}'", var_name);
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
LambdaApplierError(LambdaError::StreamNotHaveEnoughValues { stream_size, idx }) => {
|
|
||||||
log_join!(" waiting for an argument with idx '{}' on stream with size '{}'", idx, stream_size);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
LambdaApplierError(LambdaError::EmptyStream) => {
|
|
||||||
log_join!(" waiting on empty stream for path ");
|
|
||||||
true
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,7 +82,7 @@ fn select_by_path_from_stream<'value>(
|
|||||||
let value = lambda_to_execution_error!(stream
|
let value = lambda_to_execution_error!(stream
|
||||||
.peekable()
|
.peekable()
|
||||||
.nth(idx as usize)
|
.nth(idx as usize)
|
||||||
.ok_or(LambdaError::StreamNotHaveEnoughValues { stream_size, idx }))?;
|
.ok_or(LambdaError::CanonStreamNotHaveEnoughValues { stream_size, idx }))?;
|
||||||
|
|
||||||
let result = select_by_path_from_scalar(value, body.iter(), exec_ctx)?;
|
let result = select_by_path_from_scalar(value, body.iter(), exec_ctx)?;
|
||||||
let select_result = StreamSelectResult::from_cow(result, idx);
|
let select_result = StreamSelectResult::from_cow(result, idx);
|
||||||
|
@ -22,7 +22,7 @@ use thiserror::Error as ThisError;
|
|||||||
#[derive(Debug, Clone, ThisError)]
|
#[derive(Debug, Clone, ThisError)]
|
||||||
pub enum LambdaError {
|
pub enum LambdaError {
|
||||||
#[error("lambda is applied to a stream that have only '{stream_size}' elements, but '{idx}' requested")]
|
#[error("lambda is applied to a stream that have only '{stream_size}' elements, but '{idx}' requested")]
|
||||||
StreamNotHaveEnoughValues { stream_size: usize, idx: u32 },
|
CanonStreamNotHaveEnoughValues { stream_size: usize, idx: u32 },
|
||||||
|
|
||||||
/// An error occurred while trying to apply lambda to an empty stream.
|
/// An error occurred while trying to apply lambda to an empty stream.
|
||||||
#[error("lambda is applied to an empty stream")]
|
#[error("lambda is applied to an empty stream")]
|
||||||
|
@ -64,35 +64,6 @@ fn dont_wait_on_json_path() {
|
|||||||
assert_eq!(result.next_peer_pks, vec![test_params.init_peer_id]);
|
assert_eq!(result.next_peer_pks, vec![test_params.init_peer_id]);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn wait_on_empty_stream_json_path() {
|
|
||||||
let local_peer_id = "local_peer_id";
|
|
||||||
let mut local_vm = create_avm(echo_call_service(), local_peer_id);
|
|
||||||
|
|
||||||
let join_stream_script = f!(r#"
|
|
||||||
(seq
|
|
||||||
(seq
|
|
||||||
(call "{local_peer_id}" ("" "") [[]] nodes)
|
|
||||||
(fold nodes n
|
|
||||||
(par
|
|
||||||
(call n ("" "") [n] $ns)
|
|
||||||
(next n)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
)
|
|
||||||
(seq
|
|
||||||
(canon "{local_peer_id}" $ns #ns)
|
|
||||||
(call "{local_peer_id}" ("" "") [#ns.$.[0] #ns.$.[1] #ns])
|
|
||||||
)
|
|
||||||
)"#);
|
|
||||||
|
|
||||||
let result = checked_call_vm!(local_vm, <_>::default(), join_stream_script, "", "");
|
|
||||||
print_trace(&result, "");
|
|
||||||
let actual_trace = trace_from_result(&result);
|
|
||||||
|
|
||||||
assert_eq!(actual_trace.len(), 2); // only the first call and canon should produce a trace
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn dont_wait_on_json_path_on_scalars() {
|
fn dont_wait_on_json_path_on_scalars() {
|
||||||
let array = json!([1u32, 2u32, 3u32, 4u32, 5u32]);
|
let array = json!([1u32, 2u32, 3u32, 4u32, 5u32]);
|
||||||
|
@ -132,36 +132,6 @@ fn flattening_streams() {
|
|||||||
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![0, 1])));
|
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![0, 1])));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn flattening_empty_values() {
|
|
||||||
let stream_value = json!(
|
|
||||||
{"args": []}
|
|
||||||
);
|
|
||||||
|
|
||||||
let set_variable_peer_id = "set_variable";
|
|
||||||
let mut set_variable_vm = create_avm(set_variable_call_service(stream_value), set_variable_peer_id);
|
|
||||||
|
|
||||||
let closure_call_args = ClosureCallArgs::default();
|
|
||||||
let local_peer_id = "local_peer_id";
|
|
||||||
let mut local_vm = create_avm(create_check_service_closure(closure_call_args.clone()), local_peer_id);
|
|
||||||
|
|
||||||
let script = f!(r#"
|
|
||||||
(seq
|
|
||||||
(call "{set_variable_peer_id}" ("" "") [] $stream)
|
|
||||||
(seq
|
|
||||||
(canon "{local_peer_id}" $stream #stream)
|
|
||||||
(call "{local_peer_id}" ("" "") [#stream.$.[1]!]) ; here #stream.$.[1] returns an empty array
|
|
||||||
)
|
|
||||||
)
|
|
||||||
"#);
|
|
||||||
|
|
||||||
let result = checked_call_vm!(set_variable_vm, <_>::default(), script.clone(), "", "");
|
|
||||||
let result = checked_call_vm!(local_vm, <_>::default(), script, "", result.data);
|
|
||||||
|
|
||||||
assert!(is_interpreter_succeded(&result));
|
|
||||||
assert_eq!(closure_call_args.args_var, Rc::new(RefCell::new(vec![])));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
#[ignore]
|
#[ignore]
|
||||||
fn test_handling_non_flattening_values() {
|
fn test_handling_non_flattening_values() {
|
||||||
|
Reference in New Issue
Block a user