diff --git a/Cargo.lock b/Cargo.lock index a5ff4b77..f9cb2214 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -37,6 +37,7 @@ version = "0.1.0" dependencies = [ "aqua-test-utils", "aquamarine-vm", + "boolinator", "env_logger", "fluence", "jsonpath_lib", @@ -1050,9 +1051,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96fe57af81d28386a513cbc6858332abc6117cfdb5999647c6444b8f43a370a5" +checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" dependencies = [ "serde_derive", ] @@ -1078,9 +1079,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f630a6370fd8e457873b4bd2ffdae75408bc291ba72be773772a4c2a065d9ae8" +checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" dependencies = [ "proc-macro2", "quote", diff --git a/artifacts/aquamarine.wasm b/artifacts/aquamarine.wasm index e9522520..0c394581 100644 Binary files a/artifacts/aquamarine.wasm and b/artifacts/aquamarine.wasm differ diff --git a/stepper/.rustfmt.toml b/stepper/.rustfmt.toml new file mode 100644 index 00000000..75306517 --- /dev/null +++ b/stepper/.rustfmt.toml @@ -0,0 +1 @@ +max_width = 120 diff --git a/stepper/Cargo.toml b/stepper/Cargo.toml index 189c611a..ab46afad 100644 --- a/stepper/Cargo.toml +++ b/stepper/Cargo.toml @@ -22,6 +22,7 @@ serde_sexpr = "0.1.0" jsonpath_lib = "0.2.5" +boolinator = "2.4.0" log = "0.4.11" once_cell = "1.4.1" serde_json = "1.0" diff --git a/stepper/src/air/call.rs b/stepper/src/air/call.rs index e1b80037..7a376fb9 100644 --- a/stepper/src/air/call.rs +++ b/stepper/src/air/call.rs @@ -14,12 +14,13 @@ * limitations under the License. */ +mod parsed_call; +mod utils; + +use parsed_call::ParsedCall; + use super::CallEvidenceCtx; -use super::CallResult; -use super::EvidenceState; use super::ExecutionCtx; -use crate::AquamarineError; -use crate::JValue; use crate::Result; use serde_derive::Deserialize; @@ -40,14 +41,14 @@ const CURRENT_PEER_ALIAS: &str = "%current_peer_id%"; #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(untagged)] -pub enum PeerPart { +pub(self) enum PeerPart { PeerPk(String), - PeerPkWithPkServiceId(String, String), + PeerPkWithServiceId(String, String), } #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(untagged)] -pub enum FunctionPart { +pub(self) enum FunctionPart { FuncName(String), ServiceIdWithFuncName(String, String), } @@ -55,341 +56,15 @@ pub enum FunctionPart { #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] pub(crate) struct Call(PeerPart, FunctionPart, Vec, String); -#[derive(Debug, PartialEq, Eq)] -struct ParsedCall { - peer_pk: String, - service_id: String, - function_name: String, - function_arg_paths: Vec, - result_variable_name: String, -} - impl super::ExecutableInstruction for Call { fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { - log::info!( - "call {:?} is called with contexts: {:?} {:?}", - self, - exec_ctx, - call_ctx - ); + log::info!("call {:?} is called with contexts: {:?} {:?}", self, exec_ctx, call_ctx); let parsed_call = ParsedCall::new(self, exec_ctx)?; parsed_call.execute(exec_ctx, call_ctx) } } -impl ParsedCall { - pub fn new(call: &Call, exec_ctx: &ExecutionCtx) -> Result { - let (peer_pk, service_id, func_name) = Self::prepare_peer_fn_parts(call, exec_ctx)?; - let result_variable_name = Self::parse_result_variable_name(call)?; - - Ok(Self { - peer_pk: peer_pk.to_string(), - service_id: service_id.to_string(), - function_name: func_name.to_string(), - function_arg_paths: call.2.clone(), - result_variable_name: result_variable_name.to_string(), - }) - } - - pub fn execute( - self, - exec_ctx: &mut ExecutionCtx, - call_ctx: &mut CallEvidenceCtx, - ) -> Result<()> { - let should_executed = self.prepare_evidence_state(call_ctx, &exec_ctx.current_peer_id)?; - if !should_executed { - return Ok(()); - } - - if self.peer_pk != exec_ctx.current_peer_id && self.peer_pk != CURRENT_PEER_ALIAS { - set_remote_call_result(self.peer_pk, exec_ctx, call_ctx); - - return Ok(()); - } - - let function_args = self.extract_args_by_paths(exec_ctx)?; - let function_args = serde_json::to_string(&function_args) - .map_err(|e| AquamarineError::FuncArgsSerializationError(function_args, e))?; - - let result = - unsafe { crate::call_service(self.service_id, self.function_name, function_args) }; - - if result.ret_code != crate::CALL_SERVICE_SUCCESS { - return Err(AquamarineError::LocalServiceError(result.result)); - } - - let result: JValue = serde_json::from_str(&result.result) - .map_err(|e| AquamarineError::CallServiceResultDeserializationError(result, e))?; - set_local_call_result(self.result_variable_name, exec_ctx, call_ctx, result) - } - - fn prepare_peer_fn_parts<'a>( - raw_call: &'a Call, - exec_ctx: &'a ExecutionCtx, - ) -> Result<(&'a str, &'a str, &'a str)> { - let (peer_pk, service_id, func_name) = match (&raw_call.0, &raw_call.1) { - ( - PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), - FunctionPart::ServiceIdWithFuncName(_service_id, func_name), - ) => Ok((peer_pk, peer_service_id, func_name)), - ( - PeerPart::PeerPkWithPkServiceId(peer_pk, peer_service_id), - FunctionPart::FuncName(func_name), - ) => Ok((peer_pk, peer_service_id, func_name)), - ( - PeerPart::PeerPk(peer_pk), - FunctionPart::ServiceIdWithFuncName(service_id, func_name), - ) => Ok((peer_pk, service_id, func_name)), - (PeerPart::PeerPk(_), FunctionPart::FuncName(_)) => { - Err(AquamarineError::InstructionError(String::from( - "call should have service id specified by peer part or function part", - ))) - } - }?; - - let peer_pk = if peer_pk != CURRENT_PEER_ALIAS { - Self::prepare_call_arg(peer_pk, exec_ctx)? - } else { - peer_pk - }; - - let service_id = Self::prepare_call_arg(service_id, exec_ctx)?; - let func_name = Self::prepare_call_arg(func_name, exec_ctx)?; - - Ok((peer_pk, service_id, func_name)) - } - - fn extract_args_by_paths(&self, ctx: &ExecutionCtx) -> Result { - let mut result = Vec::with_capacity(self.function_arg_paths.len()); - - for arg_path in self.function_arg_paths.iter() { - if is_string_literal(arg_path) { - result.push(JValue::String(arg_path[1..arg_path.len() - 1].to_string())); - } else { - let arg = Self::get_args_by_path(arg_path, ctx)?; - result.extend(arg.into_iter().cloned()); - } - } - - Ok(JValue::Array(result)) - } - - fn parse_result_variable_name(call: &Call) -> Result<&str> { - let result_variable_name = &call.3; - - if result_variable_name.is_empty() { - return Err(AquamarineError::InstructionError(String::from( - "result name of a call instruction must be non empty", - ))); - } - - if super::RESERVED_KEYWORDS.contains(result_variable_name.as_str()) { - return Err(AquamarineError::ReservedKeywordError( - result_variable_name.to_string(), - )); - } - - if is_string_literal(result_variable_name) { - return Err(AquamarineError::InstructionError(String::from( - "result name of a call instruction must be non string literal", - ))); - } - - Ok(result_variable_name) - } - - fn get_args_by_path<'args_path, 'ctx>( - args_path: &'args_path str, - ctx: &'ctx ExecutionCtx, - ) -> Result> { - let mut split_arg: Vec<&str> = args_path.splitn(2, '.').collect(); - let arg_path_head = split_arg.remove(0); - - let value_by_head = match (ctx.data.get(arg_path_head), ctx.folds.get(arg_path_head)) { - (_, Some(fold_state)) => match ctx.data.get(&fold_state.iterable_name) { - Some(JValue::Array(values)) => &values[fold_state.cursor], - Some(v) => { - return Err(AquamarineError::IncompatibleJValueType( - v.clone(), - String::from("array"), - )) - } - None => { - return Err(AquamarineError::VariableNotFound( - fold_state.iterable_name.clone(), - )) - } - }, - (Some(value), None) => value, - (None, None) => { - return Err(AquamarineError::VariableNotFound(arg_path_head.to_string())) - } - }; - - if split_arg.is_empty() { - return Ok(vec![value_by_head]); - } - - let json_path = split_arg.remove(0); - let values = jsonpath_lib::select(value_by_head, json_path).map_err(|e| { - AquamarineError::VariableNotInJsonPath( - value_by_head.clone(), - String::from(json_path), - e, - ) - })?; - - Ok(values) - } - - fn prepare_call_arg<'a>(arg_path: &'a str, ctx: &'a ExecutionCtx) -> Result<&'a str> { - if super::RESERVED_KEYWORDS.contains(arg_path) { - return Err(AquamarineError::ReservedKeywordError(arg_path.to_string())); - } - - if is_string_literal(arg_path) { - return Ok(&arg_path[1..arg_path.len() - 1]); - } - - let args = Self::get_args_by_path(arg_path, ctx)?; - if args.is_empty() { - return Err(AquamarineError::VariableNotFound(arg_path.to_string())); - } - - if args.len() != 1 { - return Err(AquamarineError::MultipleValuesInJsonPath( - arg_path.to_string(), - )); - } - - match args[0] { - JValue::String(str) => Ok(str), - v => Err(AquamarineError::IncompatibleJValueType( - v.clone(), - String::from("string"), - )), - } - } - - fn prepare_evidence_state( - &self, - call_ctx: &mut CallEvidenceCtx, - current_peer_id: &str, - ) -> Result { - if call_ctx.unused_subtree_elements_count == 0 { - log::info!("call evidence: previous state wasn't found"); - return Ok(true); - } - - call_ctx.unused_subtree_elements_count -= 1; - // unwrap is safe here, because current_states length's been checked - let prev_state = call_ctx.current_states.pop_front().unwrap(); - - log::info!("call evidence: previous state found {:?}", prev_state); - - match &prev_state { - // this call was failed on one of the previous executions, - // here it's needed to bubble this special error up - EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => { - let err_msg = err_msg.clone(); - call_ctx.new_states.push(prev_state); - Err(AquamarineError::LocalServiceError(err_msg)) - } - EvidenceState::Call(CallResult::RequestSent) => { - // check whether current node can execute this call - if self.peer_pk == current_peer_id { - Ok(true) - } else { - call_ctx.new_states.push(prev_state); - Ok(false) - } - } - // this instruction's been already executed - EvidenceState::Call(CallResult::Executed) => { - call_ctx.new_states.push(prev_state); - Ok(false) - } - // state has inconsistent order - return a error, call shouldn't be executed - par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState( - par_state.clone(), - String::from("call"), - )), - } - } -} - -fn set_local_call_result( - result_variable_name: String, - exec_ctx: &mut ExecutionCtx, - call_ctx: &mut CallEvidenceCtx, - result: JValue, -) -> Result<()> { - use std::collections::hash_map::Entry; - - let new_evidence_state = EvidenceState::Call(CallResult::Executed); - let is_array = result_variable_name.ends_with("[]"); - - if !is_array { - // if result is not an array, simply insert it into data - if exec_ctx - .data - .insert(result_variable_name.clone(), result) - .is_some() - { - return Err(AquamarineError::MultipleVariablesFound( - result_variable_name, - )); - } - - log::info!("call evidence: adding new state {:?}", new_evidence_state); - call_ctx.new_states.push(new_evidence_state); - - return Ok(()); - } - - // if result is an array, insert result to the end of the array - match exec_ctx - .data - // unwrap is safe because it's been checked for [] - .entry(result_variable_name.strip_suffix("[]").unwrap().to_string()) - { - Entry::Occupied(mut entry) => match entry.get_mut() { - JValue::Array(values) => values.push(result), - v => { - return Err(AquamarineError::IncompatibleJValueType( - v.clone(), - String::from("Array"), - )) - } - }, - Entry::Vacant(entry) => { - entry.insert(JValue::Array(vec![result])); - } - } - - log::info!("call evidence: adding new state {:?}", new_evidence_state); - call_ctx.new_states.push(new_evidence_state); - - Ok(()) -} - -fn set_remote_call_result( - peer_pk: String, - exec_ctx: &mut ExecutionCtx, - call_ctx: &mut CallEvidenceCtx, -) { - exec_ctx.next_peer_pks.push(peer_pk); - - let new_evidence_state = EvidenceState::Call(CallResult::RequestSent); - log::info!("call evidence: adding new state {:?}", new_evidence_state); - call_ctx.new_states.push(new_evidence_state); -} - -fn is_string_literal(value: &str) -> bool { - value.starts_with('"') && value.ends_with('"') -} - #[cfg(test)] mod tests { use crate::JValue; @@ -413,11 +88,7 @@ mod tests { ); let res = vm - .call(json!([ - String::from("asd"), - script, - String::from("{\"value\": \"test\"}"), - ])) + .call(json!(["asd", script, "{}", "{\"value\": \"test\"}",])) .expect("call should be successful"); let res: JValue = serde_json::from_str(&res.data).unwrap(); @@ -431,11 +102,7 @@ mod tests { ); let res = vm - .call(json!([ - String::from("asd"), - script, - String::from("{\"value\": \"test\"}"), - ])) + .call(json!(["asd", script, "{}", "{\"value\": \"test\"}",])) .expect("call should be successful"); let res: JValue = serde_json::from_str(&res.data).unwrap(); @@ -454,11 +121,7 @@ mod tests { ); let res = vm - .call(json!([ - String::from("asd"), - script, - String::from("{\"value\": \"test\"}"), - ])) + .call(json!(["asd", script, "{}", "{\"value\": \"test\"}",])) .expect("call should be successful"); assert_eq!(res.next_peer_pks, vec![remote_peer_id]); @@ -468,16 +131,10 @@ mod tests { fn variables() { let mut vm = create_aqua_vm(echo_string_call_service(), ""); - let script = format!( - r#"(call (remote_peer_id ("some_service_id" "local_fn_name") ("param") result_name))"#, - ); + let script = format!(r#"(call (remote_peer_id ("some_service_id" "local_fn_name") ("param") result_name))"#,); let res = vm - .call(json!([ - String::from("asd"), - script, - String::from("{\"remote_peer_id\": \"some_peer_id\"}"), - ])) + .call(json!(["asd", script, "{}", "{\"remote_peer_id\": \"some_peer_id\"}",])) .expect("call should be successful"); assert_eq!(res.next_peer_pks, vec![String::from("some_peer_id")]); @@ -503,11 +160,7 @@ mod tests { ); let res = vm - .call(json!([ - String::from("asd"), - script, - json!({"arg3": "arg3_value"}).to_string(), - ])) + .call(json!(["asd", script, "{}", json!({"arg3": "arg3_value"}).to_string(),])) .expect("call should be successful"); let jdata: JValue = serde_json::from_str(&res.data).expect("should be valid json"); diff --git a/stepper/src/air/call/parsed_call.rs b/stepper/src/air/call/parsed_call.rs new file mode 100644 index 00000000..3a4a379c --- /dev/null +++ b/stepper/src/air/call/parsed_call.rs @@ -0,0 +1,250 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::utils::is_string_literal; +use super::Call; +use super::CURRENT_PEER_ALIAS; +use crate::air::ExecutionCtx; +use crate::air::RESERVED_KEYWORDS; +use crate::call_evidence::CallEvidenceCtx; +use crate::call_evidence::CallResult; +use crate::call_evidence::EvidenceState; +use crate::AquamarineError; +use crate::JValue; +use crate::Result; + +#[derive(Debug, PartialEq, Eq)] +pub(super) struct ParsedCall { + peer_pk: String, + service_id: String, + function_name: String, + function_arg_paths: Vec, + result_variable_name: String, +} + +impl ParsedCall { + pub(super) fn new(raw_call: &Call, exec_ctx: &ExecutionCtx) -> Result { + let (peer_pk, service_id, func_name) = Self::prepare_peer_fn_parts(raw_call, exec_ctx)?; + let result_variable_name = Self::parse_result_variable_name(raw_call)?; + + Ok(Self { + peer_pk: peer_pk.to_string(), + service_id: service_id.to_string(), + function_name: func_name.to_string(), + function_arg_paths: raw_call.2.clone(), + result_variable_name: result_variable_name.to_string(), + }) + } + + pub(super) fn execute(self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { + let should_executed = self.prepare_evidence_state(call_ctx, &exec_ctx.current_peer_id)?; + if !should_executed { + return Ok(()); + } + + if self.peer_pk != exec_ctx.current_peer_id && self.peer_pk != CURRENT_PEER_ALIAS { + super::utils::set_remote_call_result(self.peer_pk, exec_ctx, call_ctx); + + return Ok(()); + } + + let function_args = self.extract_args_by_paths(exec_ctx)?; + let function_args = serde_json::to_string(&function_args) + .map_err(|e| AquamarineError::FuncArgsSerializationError(function_args, e))?; + + let result = unsafe { crate::call_service(self.service_id, self.function_name, function_args) }; + + if result.ret_code != crate::CALL_SERVICE_SUCCESS { + return Err(AquamarineError::LocalServiceError(result.result)); + } + + let result: JValue = serde_json::from_str(&result.result) + .map_err(|e| AquamarineError::CallServiceResultDeserializationError(result, e))?; + super::utils::set_local_call_result(self.result_variable_name, exec_ctx, call_ctx, result) + } + + fn prepare_peer_fn_parts<'a>( + raw_call: &'a Call, + exec_ctx: &'a ExecutionCtx, + ) -> Result<(&'a str, &'a str, &'a str)> { + use super::FunctionPart::*; + use super::PeerPart::*; + + let (peer_pk, service_id, func_name) = match (&raw_call.0, &raw_call.1) { + (PeerPkWithServiceId(peer_pk, peer_service_id), ServiceIdWithFuncName(_service_id, func_name)) => { + Ok((peer_pk, peer_service_id, func_name)) + } + (PeerPkWithServiceId(peer_pk, peer_service_id), FuncName(func_name)) => { + Ok((peer_pk, peer_service_id, func_name)) + } + (PeerPk(peer_pk), ServiceIdWithFuncName(service_id, func_name)) => Ok((peer_pk, service_id, func_name)), + (PeerPk(_), FuncName(_)) => Err(AquamarineError::InstructionError(String::from( + "call should have service id specified by peer part or function part", + ))), + }?; + + let peer_pk = if peer_pk != CURRENT_PEER_ALIAS { + Self::prepare_call_arg(peer_pk, exec_ctx)? + } else { + peer_pk + }; + + let service_id = Self::prepare_call_arg(service_id, exec_ctx)?; + let func_name = Self::prepare_call_arg(func_name, exec_ctx)?; + + Ok((peer_pk, service_id, func_name)) + } + + fn extract_args_by_paths(&self, ctx: &ExecutionCtx) -> Result { + let mut result = Vec::with_capacity(self.function_arg_paths.len()); + + for arg_path in self.function_arg_paths.iter() { + if is_string_literal(arg_path) { + result.push(JValue::String(arg_path[1..arg_path.len() - 1].to_string())); + } else { + let arg = Self::get_args_by_path(arg_path, ctx)?; + result.extend(arg.into_iter().cloned()); + } + } + + Ok(JValue::Array(result)) + } + + fn parse_result_variable_name(call: &Call) -> Result<&str> { + let result_variable_name = &call.3; + + if result_variable_name.is_empty() { + return Err(AquamarineError::InstructionError(String::from( + "result name of a call instruction must be non empty", + ))); + } + + if RESERVED_KEYWORDS.contains(result_variable_name.as_str()) { + return Err(AquamarineError::ReservedKeywordError(result_variable_name.to_string())); + } + + if is_string_literal(result_variable_name) { + return Err(AquamarineError::InstructionError(String::from( + "result name of a call instruction must be non string literal", + ))); + } + + Ok(result_variable_name) + } + + fn get_args_by_path<'args_path, 'ctx>( + args_path: &'args_path str, + ctx: &'ctx ExecutionCtx, + ) -> Result> { + let mut split_arg: Vec<&str> = args_path.splitn(2, '.').collect(); + let arg_path_head = split_arg.remove(0); + + let value_by_head = match (ctx.data.get(arg_path_head), ctx.folds.get(arg_path_head)) { + (_, Some(fold_state)) => match ctx.data.get(&fold_state.iterable_name) { + Some(JValue::Array(values)) => &values[fold_state.cursor], + Some(v) => { + return Err(AquamarineError::IncompatibleJValueType( + v.clone(), + String::from("array"), + )) + } + None => return Err(AquamarineError::VariableNotFound(fold_state.iterable_name.clone())), + }, + (Some(value), None) => value, + (None, None) => return Err(AquamarineError::VariableNotFound(arg_path_head.to_string())), + }; + + if split_arg.is_empty() { + return Ok(vec![value_by_head]); + } + + let json_path = split_arg.remove(0); + let values = jsonpath_lib::select(value_by_head, json_path) + .map_err(|e| AquamarineError::VariableNotInJsonPath(value_by_head.clone(), String::from(json_path), e))?; + + Ok(values) + } + + fn prepare_call_arg<'a>(arg_path: &'a str, ctx: &'a ExecutionCtx) -> Result<&'a str> { + if RESERVED_KEYWORDS.contains(arg_path) { + return Err(AquamarineError::ReservedKeywordError(arg_path.to_string())); + } + + if is_string_literal(arg_path) { + return Ok(&arg_path[1..arg_path.len() - 1]); + } + + let args = Self::get_args_by_path(arg_path, ctx)?; + if args.is_empty() { + return Err(AquamarineError::VariableNotFound(arg_path.to_string())); + } + + if args.len() != 1 { + return Err(AquamarineError::MultipleValuesInJsonPath(arg_path.to_string())); + } + + match args[0] { + JValue::String(str) => Ok(str), + v => Err(AquamarineError::IncompatibleJValueType( + v.clone(), + String::from("string"), + )), + } + } + + fn prepare_evidence_state(&self, call_ctx: &mut CallEvidenceCtx, current_peer_id: &str) -> Result { + if call_ctx.current_subtree_elements_count == 0 { + log::info!("call evidence: previous state wasn't found"); + return Ok(true); + } + + call_ctx.current_subtree_elements_count -= 1; + // unwrap is safe here, because current_subtree_elements_count depends on current_path len, + // and it's been checked previously + let prev_state = call_ctx.current_path.pop_front().unwrap(); + + log::info!("call evidence: previous state found {:?}", prev_state); + + match &prev_state { + // this call was failed on one of the previous executions, + // here it's needed to bubble this special error up + EvidenceState::Call(CallResult::CallServiceFailed(err_msg)) => { + let err_msg = err_msg.clone(); + call_ctx.new_path.push_back(prev_state); + Err(AquamarineError::LocalServiceError(err_msg)) + } + EvidenceState::Call(CallResult::RequestSent) => { + // check whether current node can execute this call + if self.peer_pk == current_peer_id { + Ok(true) + } else { + call_ctx.new_path.push_back(prev_state); + Ok(false) + } + } + // this instruction's been already executed + EvidenceState::Call(CallResult::Executed) => { + call_ctx.new_path.push_back(prev_state); + Ok(false) + } + // state has inconsistent order - return a error, call shouldn't be executed + par_state @ EvidenceState::Par(..) => Err(AquamarineError::InvalidEvidenceState( + par_state.clone(), + String::from("call"), + )), + } + } +} diff --git a/stepper/src/air/call/utils.rs b/stepper/src/air/call/utils.rs new file mode 100644 index 00000000..26b15347 --- /dev/null +++ b/stepper/src/air/call/utils.rs @@ -0,0 +1,82 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::ExecutionCtx; +use crate::call_evidence::CallEvidenceCtx; +use crate::call_evidence::CallResult; +use crate::call_evidence::EvidenceState; +use crate::AquamarineError; +use crate::JValue; +use crate::Result; + +pub(super) fn set_local_call_result( + result_variable_name: String, + exec_ctx: &mut ExecutionCtx, + call_ctx: &mut CallEvidenceCtx, + result: JValue, +) -> Result<()> { + use std::collections::hash_map::Entry::{Occupied, Vacant}; + + let new_evidence_state = EvidenceState::Call(CallResult::Executed); + let is_array = result_variable_name.ends_with("[]"); + + if !is_array { + // if result is not an array, simply insert it into data + if exec_ctx.data.insert(result_variable_name.clone(), result).is_some() { + return Err(AquamarineError::MultipleVariablesFound(result_variable_name)); + } + + log::info!("call evidence: adding new state {:?}", new_evidence_state); + call_ctx.new_path.push_back(new_evidence_state); + + return Ok(()); + } + + // unwrap is safe because it's been checked for [] + let result_variable_name = result_variable_name.strip_suffix("[]").unwrap().to_string(); + // if result is an array, insert result to the end of the array + match exec_ctx.data.entry(result_variable_name) { + Occupied(mut entry) => match entry.get_mut() { + JValue::Array(values) => values.push(result), + v => { + return Err(AquamarineError::IncompatibleJValueType( + v.clone(), + String::from("Array"), + )) + } + }, + Vacant(entry) => { + entry.insert(JValue::Array(vec![result])); + } + } + + log::info!("call evidence: adding new state {:?}", new_evidence_state); + call_ctx.new_path.push_back(new_evidence_state); + + Ok(()) +} + +pub(super) fn set_remote_call_result(peer_pk: String, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) { + exec_ctx.next_peer_pks.push(peer_pk); + + let new_evidence_state = EvidenceState::Call(CallResult::RequestSent); + log::info!("call evidence: adding new state {:?}", new_evidence_state); + call_ctx.new_path.push_back(new_evidence_state); +} + +pub(super) fn is_string_literal(value: &str) -> bool { + value.starts_with('"') && value.ends_with('"') +} diff --git a/stepper/src/air/fold.rs b/stepper/src/air/fold.rs index 7ad52de4..416b8223 100644 --- a/stepper/src/air/fold.rs +++ b/stepper/src/air/fold.rs @@ -70,11 +70,7 @@ impl super::ExecutableInstruction for Fold { String::from("Array"), )) } - None => { - return Err(AquamarineError::VariableNotFound(String::from( - iterable_name, - ))) - } + None => return Err(AquamarineError::VariableNotFound(String::from(iterable_name))), }; let fold_state = FoldState { @@ -83,11 +79,7 @@ impl super::ExecutableInstruction for Fold { instr_head: instr_head.clone(), }; - if exec_ctx - .folds - .insert(iterator_name.clone(), fold_state) - .is_some() - { + if exec_ctx.folds.insert(iterator_name.clone(), fold_state).is_some() { return Err(AquamarineError::MultipleFoldStates(iterable_name.clone())); } @@ -100,12 +92,7 @@ impl super::ExecutableInstruction for Fold { impl super::ExecutableInstruction for Next { fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { - log::info!( - "next {:?} is called with contexts {:?} {:?}", - self, - exec_ctx, - call_ctx - ); + log::info!("next {:?} is called with contexts {:?} {:?}", self, exec_ctx, call_ctx); let iterator_name = &self.0; let fold_state = exec_ctx @@ -118,7 +105,7 @@ impl super::ExecutableInstruction for Next { .expect("this has been checked on the fold instruction"); let value_len = match value { JValue::Array(array) => array.len(), - _ => unreachable!(), + _ => unreachable!("iterable value shouldn't changed inside fold"), }; fold_state.cursor += 1; @@ -168,9 +155,10 @@ mod tests { let res = vm .call(json!([ - String::from("asd"), + "asd", lfold, - String::from("{\"Iterable\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"), + "{}", + "{\"Iterable\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}", ])) .expect("call should be successful"); @@ -195,9 +183,10 @@ mod tests { let res = vm .call(json!([ - String::from("asd"), + "asd", rfold, - String::from("{\"Iterable\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"), + "{}", + "{\"Iterable\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}", ])) .expect("call should be successful"); @@ -227,9 +216,10 @@ mod tests { let res = vm .call(json!([ - String::from("asd"), + "asd", script, - String::from("{\"Iterable1\": [\"1\",\"2\",\"3\",\"4\",\"5\"], \"Iterable2\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"), + "{}", + "{\"Iterable1\": [\"1\",\"2\",\"3\",\"4\",\"5\"], \"Iterable2\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}", ])) .expect("call should be successful"); @@ -260,12 +250,12 @@ mod tests { ))"#, ); - let res = vm - .call(json!([ - String::from("asd"), - script, - String::from("{\"Iterable1\": [\"1\",\"2\",\"3\",\"4\",\"5\"], \"Iterable2\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}"), - ])); + let res = vm.call(json!([ + "asd", + script, + "{}", + "{\"Iterable1\": [\"1\",\"2\",\"3\",\"4\",\"5\"], \"Iterable2\": [\"1\",\"2\",\"3\",\"4\",\"5\"]}", + ])); assert!(res.is_err()); let error = res.err().unwrap(); @@ -276,9 +266,7 @@ mod tests { assert_eq!( error, - StepperError::MultipleFoldStates(String::from( - "multiple fold states found for iterable Iterable2" - )) + StepperError::MultipleFoldStates(String::from("multiple fold states found for iterable Iterable2")) ); } } diff --git a/stepper/src/air/mod.rs b/stepper/src/air/mod.rs index 597a9b56..286f622e 100644 --- a/stepper/src/air/mod.rs +++ b/stepper/src/air/mod.rs @@ -25,7 +25,6 @@ mod xor; pub(crate) use execution_context::ExecutionCtx; pub(self) use crate::call_evidence::CallEvidenceCtx; -pub(self) use crate::call_evidence::CallResult; pub(self) use crate::call_evidence::EvidenceState; use crate::Result; diff --git a/stepper/src/air/null.rs b/stepper/src/air/null.rs index 50dd0f11..e255116b 100644 --- a/stepper/src/air/null.rs +++ b/stepper/src/air/null.rs @@ -26,11 +26,7 @@ pub(crate) struct Null {} impl super::ExecutableInstruction for Null { fn execute(&self, exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx) -> Result<()> { - log::info!( - "null is called with contexts: {:?} {:?}", - exec_ctx, - call_ctx - ); + log::info!("null is called with contexts: {:?} {:?}", exec_ctx, call_ctx); Ok(()) } diff --git a/stepper/src/air/par.rs b/stepper/src/air/par.rs index 8919a37f..aed2f722 100644 --- a/stepper/src/air/par.rs +++ b/stepper/src/air/par.rs @@ -19,7 +19,6 @@ use super::EvidenceState; use super::ExecutableInstruction; use super::ExecutionCtx; use super::Instruction; -use crate::AquamarineError; use crate::Result; use serde_derive::Deserialize; @@ -34,52 +33,44 @@ impl ExecutableInstruction for Par { let (left_subtree_size, right_subtree_size) = extract_subtree_sizes(call_ctx)?; - let pre_states_count = call_ctx.current_states.len(); - let pre_unused_elements = call_ctx.unused_subtree_elements_count; + let pre_states_count = call_ctx.current_path.len(); + let pre_unused_elements = call_ctx.current_subtree_elements_count; - let pre_new_states_count = call_ctx.new_states.len(); - call_ctx.new_states.push(EvidenceState::Par(0, 0)); + let pre_new_states_count = call_ctx.new_path.len(); + call_ctx.new_path.push_back(EvidenceState::Par(0, 0)); - let new_left_subtree_size = - execute_subtree(&self.0, left_subtree_size, exec_ctx, call_ctx)?; - let new_right_subtree_size = - execute_subtree(&self.1, right_subtree_size, exec_ctx, call_ctx)?; + let new_left_subtree_size = execute_subtree(&self.0, left_subtree_size, exec_ctx, call_ctx)?; + let new_right_subtree_size = execute_subtree(&self.1, right_subtree_size, exec_ctx, call_ctx)?; - let new_par_evidence_state = - EvidenceState::Par(new_left_subtree_size, new_right_subtree_size); - log::info!( - "call evidence: adding new state {:?}", - new_par_evidence_state - ); - call_ctx.new_states[pre_new_states_count] = new_par_evidence_state; + let new_par_evidence_state = EvidenceState::Par(new_left_subtree_size, new_right_subtree_size); + log::info!("call evidence: adding new state {:?}", new_par_evidence_state); + call_ctx.new_path[pre_new_states_count] = new_par_evidence_state; - let post_states_count = call_ctx.current_states.len(); - call_ctx.unused_subtree_elements_count = - pre_unused_elements - (pre_states_count - post_states_count); + let post_states_count = call_ctx.current_path.len(); + call_ctx.current_subtree_elements_count = pre_unused_elements - (pre_states_count - post_states_count); Ok(()) } } fn extract_subtree_sizes(call_ctx: &mut CallEvidenceCtx) -> Result<(usize, usize)> { - if call_ctx.unused_subtree_elements_count == 0 { + use crate::AquamarineError::InvalidEvidenceState; + + if call_ctx.current_subtree_elements_count == 0 { return Ok((0, 0)); } - call_ctx.unused_subtree_elements_count -= 1; + call_ctx.current_subtree_elements_count -= 1; log::info!( "call evidence: the previous state was found {:?}", - call_ctx.current_states[0] + call_ctx.current_path[0] ); // unwrap is safe here because of length's been checked - match call_ctx.current_states.pop_front().unwrap() { + match call_ctx.current_path.pop_front().unwrap() { EvidenceState::Par(left, right) => Ok((left, right)), - state => Err(AquamarineError::InvalidEvidenceState( - state, - String::from("par"), - )), + state => Err(InvalidEvidenceState(state, String::from("par"))), } } @@ -89,13 +80,13 @@ fn execute_subtree( exec_ctx: &mut ExecutionCtx, call_ctx: &mut CallEvidenceCtx, ) -> Result { - call_ctx.unused_subtree_elements_count = subtree_size; - let before_states_count = call_ctx.new_states.len(); + call_ctx.current_subtree_elements_count = subtree_size; + let before_states_count = call_ctx.new_path.len(); // execute subtree subtree.execute(exec_ctx, call_ctx)?; - Ok(call_ctx.new_states.len() - before_states_count) + Ok(call_ctx.new_path.len() - before_states_count) } #[cfg(test)] @@ -120,16 +111,13 @@ mod tests { ); let mut res = vm - .call(json!([String::from("asd"), script, String::from("{}"),])) + .call(json!(["asd", script, "{}", "{}",])) .expect("call should be successful"); let peers_result: HashSet<_> = res.next_peer_pks.drain(..).collect(); - let peers_right: HashSet<_> = vec![ - String::from("remote_peer_id_1"), - String::from("remote_peer_id_2"), - ] - .drain(..) - .collect(); + let peers_right: HashSet<_> = vec![String::from("remote_peer_id_1"), String::from("remote_peer_id_2")] + .drain(..) + .collect(); assert_eq!(peers_result, peers_right); } @@ -147,7 +135,7 @@ mod tests { ); let res = vm - .call(json!([String::from("asd"), script, String::from("{}"),])) + .call(json!(["asd", script, "{}", "{}",])) .expect("call should be successful"); assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]); diff --git a/stepper/src/air/seq.rs b/stepper/src/air/seq.rs index 23517660..59fcb7d0 100644 --- a/stepper/src/air/seq.rs +++ b/stepper/src/air/seq.rs @@ -60,15 +60,16 @@ mod tests { ); let res = vm - .call(json!([String::from("asd"), script, String::from("{}")])) + .call(json!(["asd", script, "{}", "{}",])) .expect("call should be successful"); assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_1")]); let res = vm .call(json!([ - String::from("asd"), + "asd", script, + "{}", json!({ "__call": [{"call": "executed"}] } @@ -93,7 +94,7 @@ mod tests { ); let res = vm - .call(json!([String::from("asd"), script, String::from("{}"),])) + .call(json!(["asd", script, "{}", "{}",])) .expect("call should be successful"); assert_eq!(res.next_peer_pks, vec![String::from("remote_peer_id_2")]); diff --git a/stepper/src/air/xor.rs b/stepper/src/air/xor.rs index 29a56094..6ac3bb47 100644 --- a/stepper/src/air/xor.rs +++ b/stepper/src/air/xor.rs @@ -17,7 +17,7 @@ use super::CallEvidenceCtx; use super::ExecutionCtx; use super::Instruction; -use crate::AquamarineError; +use crate::AquamarineError::LocalServiceError; use crate::Result; use serde_derive::Deserialize; @@ -31,7 +31,7 @@ impl super::ExecutableInstruction for Xor { log::info!("xor is called with contexts: {:?} {:?}", exec_ctx, call_ctx); match self.0.execute(exec_ctx, call_ctx) { - Err(AquamarineError::LocalServiceError(_)) => self.1.execute(exec_ctx, call_ctx), + Err(LocalServiceError(_)) => self.1.execute(exec_ctx, call_ctx), res => res, } } @@ -64,11 +64,7 @@ mod tests { } else { // return success for services with other ids Some(IValue::Record( - Vec1::new(vec![ - IValue::S32(0), - IValue::String(String::from("\"res\"")), - ]) - .unwrap(), + Vec1::new(vec![IValue::S32(0), IValue::String(String::from("\"res\""))]).unwrap(), )) } }); @@ -84,11 +80,7 @@ mod tests { ); let res = vm - .call(json!([ - String::from("asd"), - script, - json!({"arg3": "arg3_value"}).to_string(), - ])) + .call(json!(["asd", script, "{}", json!({"arg3": "arg3_value"}).to_string(),])) .expect("call should be successful"); let jdata: JValue = serde_json::from_str(&res.data).expect("should be valid json"); @@ -104,11 +96,7 @@ mod tests { ); let res = vm - .call(json!([ - String::from("asd"), - script, - json!({"arg3": "arg3_value"}).to_string(), - ])) + .call(json!(["asd", script, "{}", json!({"arg3": "arg3_value"}).to_string(),])) .expect("call should be successful"); let jdata: JValue = serde_json::from_str(&res.data).expect("should be valid json"); diff --git a/stepper/src/call_evidence/context.rs b/stepper/src/call_evidence/context.rs index be4be92d..4355b4f3 100644 --- a/stepper/src/call_evidence/context.rs +++ b/stepper/src/call_evidence/context.rs @@ -14,27 +14,26 @@ * limitations under the License. */ -use super::EvidenceState; +use super::CallEvidencePath; use serde::Deserialize; use serde::Serialize; -use std::collections::VecDeque; - #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub(crate) struct CallEvidenceCtx { - pub(crate) current_states: VecDeque, - pub(crate) unused_subtree_elements_count: usize, - pub(crate) new_states: Vec, + pub(crate) current_path: CallEvidencePath, + pub(crate) current_subtree_elements_count: usize, + // TODO: consider change it to Vec for optimization + pub(crate) new_path: CallEvidencePath, } impl CallEvidenceCtx { - pub fn new(current_states: VecDeque) -> Self { - let right = current_states.len(); + pub fn new(current_path: CallEvidencePath) -> Self { + let current_subtree_elements_count = current_path.len(); Self { - current_states, - unused_subtree_elements_count: right, - new_states: vec![], + current_path, + current_subtree_elements_count, + new_path: CallEvidencePath::new(), } } } diff --git a/stepper/src/call_evidence/mod.rs b/stepper/src/call_evidence/mod.rs index fb0038fb..965c3db0 100644 --- a/stepper/src/call_evidence/mod.rs +++ b/stepper/src/call_evidence/mod.rs @@ -18,5 +18,7 @@ mod context; mod state; pub(crate) use context::CallEvidenceCtx; +pub(crate) use state::merge_call_states; +pub(crate) use state::CallEvidencePath; pub(crate) use state::CallResult; pub(crate) use state::EvidenceState; diff --git a/stepper/src/call_evidence/state.rs b/stepper/src/call_evidence/state.rs index 0487d78e..18ec76c1 100644 --- a/stepper/src/call_evidence/state.rs +++ b/stepper/src/call_evidence/state.rs @@ -14,8 +14,13 @@ * limitations under the License. */ +use crate::Result; + use serde::Deserialize; use serde::Serialize; +use std::cmp::max; + +pub(crate) type CallEvidencePath = std::collections::VecDeque; #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "snake_case")] @@ -36,3 +41,242 @@ pub(crate) enum EvidenceState { Par(usize, usize), Call(CallResult), } + +pub(crate) fn merge_call_states( + mut prev_path: CallEvidencePath, + mut current_path: CallEvidencePath, +) -> Result { + let mut merged_path = CallEvidencePath::new(); + + let prev_subtree_size = prev_path.len(); + let current_subtree_size = current_path.len(); + + handle_subtree( + &mut prev_path, + prev_subtree_size, + &mut current_path, + current_subtree_size, + &mut merged_path, + )?; + + log::info!("merged path: {:?}", merged_path); + + Ok(merged_path) +} + +fn handle_subtree( + prev_path: &mut CallEvidencePath, + mut prev_subtree_size: usize, + current_path: &mut CallEvidencePath, + mut current_subtree_size: usize, + result_path: &mut CallEvidencePath, +) -> Result<()> { + use crate::AquamarineError::EvidencePathTooSmall; + use crate::AquamarineError::IncompatibleEvidenceStates; + use EvidenceState::Call; + use EvidenceState::Par; + + loop { + let prev_state = if prev_subtree_size != 0 { + prev_subtree_size -= 1; + prev_path.pop_front() + } else { + None + }; + + let current_state = if current_subtree_size != 0 { + current_subtree_size -= 1; + current_path.pop_front() + } else { + None + }; + + match (prev_state, current_state) { + (Some(Call(prev_call)), Some(Call(call))) => { + let resulted_call = handle_call(prev_call, call)?; + result_path.push_back(Call(resulted_call)); + } + (Some(Par(prev_left, prev_right)), Some(Par(current_left, current_right))) => { + result_path.push_back(Par(max(prev_left, current_left), max(prev_right, current_right))); + + handle_subtree(prev_path, prev_left, current_path, current_left, result_path)?; + handle_subtree(prev_path, prev_right, current_path, current_right, result_path)?; + + prev_subtree_size -= prev_left + prev_right; + current_subtree_size -= current_left + current_right; + } + (None, Some(s)) => { + if current_path.len() < current_subtree_size { + return Err(EvidencePathTooSmall(current_path.len(), current_subtree_size)); + } + + result_path.push_back(s); + result_path.extend(current_path.drain(..current_subtree_size)); + break; + } + (Some(s), None) => { + if prev_path.len() < prev_subtree_size { + return Err(EvidencePathTooSmall(prev_path.len(), prev_subtree_size)); + } + + result_path.push_back(s); + result_path.extend(prev_path.drain(..prev_subtree_size)); + break; + } + (None, None) => break, + // this match arn represents (Call, Par) and (Par, Call) states + (Some(prev_state), Some(current_state)) => { + return Err(IncompatibleEvidenceStates(prev_state, current_state)) + } + } + } + + Ok(()) +} + +fn handle_call(prev_call_result: CallResult, current_call_result: CallResult) -> Result { + use crate::AquamarineError::IncompatibleCallResults; + use CallResult::*; + + match (&prev_call_result, ¤t_call_result) { + (CallServiceFailed(prev_err_msg), CallServiceFailed(err_msg)) => { + if prev_err_msg != err_msg { + return Err(IncompatibleCallResults(prev_call_result, current_call_result)); + } + Ok(current_call_result) + } + (RequestSent, CallServiceFailed(_)) => Ok(current_call_result), + (CallServiceFailed(_), RequestSent) => Ok(prev_call_result), + (RequestSent, RequestSent) => Ok(prev_call_result), + (RequestSent, Executed) => Ok(current_call_result), + (Executed, RequestSent) => Ok(prev_call_result), + (Executed, Executed) => Ok(prev_call_result), + (CallServiceFailed(_), Executed) => Err(IncompatibleCallResults(prev_call_result, current_call_result)), + (Executed, CallServiceFailed(_)) => Err(IncompatibleCallResults(prev_call_result, current_call_result)), + } +} + +#[cfg(test)] +mod tests { + use crate::call_evidence::CallResult; + use crate::call_evidence::EvidenceState; + use crate::call_evidence::{merge_call_states, CallEvidencePath}; + + #[test] + fn merge_call_states_1() { + use CallResult::*; + use EvidenceState::*; + + let mut prev_path = CallEvidencePath::new(); + prev_path.push_back(Par(1, 1)); + prev_path.push_back(Call(RequestSent)); + prev_path.push_back(Call(Executed)); + prev_path.push_back(Par(1, 1)); + prev_path.push_back(Call(RequestSent)); + prev_path.push_back(Call(Executed)); + + let mut current_path = CallEvidencePath::new(); + current_path.push_back(Par(1, 1)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(RequestSent)); + current_path.push_back(Par(1, 1)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(RequestSent)); + + let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful"); + + let mut right_merged_path = CallEvidencePath::new(); + right_merged_path.push_back(Par(1, 1)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Par(1, 1)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + + assert_eq!(merged_path, right_merged_path); + } + + #[test] + fn merge_call_states_2() { + use CallResult::*; + use EvidenceState::*; + + let mut prev_path = CallEvidencePath::new(); + prev_path.push_back(Par(1, 0)); + prev_path.push_back(Call(RequestSent)); + prev_path.push_back(Par(1, 1)); + prev_path.push_back(Call(RequestSent)); + prev_path.push_back(Call(Executed)); + + let mut current_path = CallEvidencePath::new(); + current_path.push_back(Par(2, 2)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(RequestSent)); + current_path.push_back(Par(1, 1)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(RequestSent)); + + let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful"); + + let mut right_merged_path = CallEvidencePath::new(); + right_merged_path.push_back(Par(2, 2)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(RequestSent)); + right_merged_path.push_back(Par(1, 1)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + + assert_eq!(merged_path, right_merged_path); + } + + #[test] + fn merge_call_states_3() { + use CallResult::*; + use EvidenceState::*; + + let mut prev_path = CallEvidencePath::new(); + prev_path.push_back(Call(Executed)); + prev_path.push_back(Par(2, 0)); + prev_path.push_back(Par(1, 0)); + prev_path.push_back(Call(RequestSent)); + prev_path.push_back(Par(1, 2)); + prev_path.push_back(Call(RequestSent)); + prev_path.push_back(Call(Executed)); + prev_path.push_back(Call(RequestSent)); + + let mut current_path = CallEvidencePath::new(); + current_path.push_back(Call(RequestSent)); + current_path.push_back(Par(3, 3)); + current_path.push_back(Par(1, 1)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(Executed)); + current_path.push_back(Par(1, 1)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(RequestSent)); + current_path.push_back(Par(1, 1)); + current_path.push_back(Call(Executed)); + current_path.push_back(Call(RequestSent)); + + let merged_path = merge_call_states(prev_path, current_path).expect("merging should be successful"); + + let mut right_merged_path = CallEvidencePath::new(); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Par(3, 3)); + right_merged_path.push_back(Par(1, 1)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Par(1, 1)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(RequestSent)); + right_merged_path.push_back(Par(1, 2)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(Executed)); + right_merged_path.push_back(Call(RequestSent)); + + assert_eq!(merged_path, right_merged_path); + } +} diff --git a/stepper/src/defines.rs b/stepper/src/defines.rs index 8de6e60b..afdc597f 100644 --- a/stepper/src/defines.rs +++ b/stepper/src/defines.rs @@ -22,8 +22,10 @@ use serde_derive::Serialize; pub(crate) type Result = std::result::Result; pub(crate) type AquaData = std::collections::HashMap; pub(crate) type JValue = serde_json::Value; + pub(crate) use crate::errors::AquamarineError; pub(crate) use crate::stepper_outcome::StepperOutcome; +pub(crate) use crate::stepper_outcome::STEPPER_SUCCESS; pub(crate) const CALL_SERVICE_SUCCESS: i32 = 0; diff --git a/stepper/src/errors.rs b/stepper/src/errors.rs index 14ab1ed4..fd3dce4f 100644 --- a/stepper/src/errors.rs +++ b/stepper/src/errors.rs @@ -14,7 +14,7 @@ * limitations under the License. */ -use crate::call_evidence::EvidenceState; +use crate::call_evidence::{CallResult, EvidenceState}; use crate::CallServiceResult; use crate::JValue; use crate::StepperOutcome; @@ -85,6 +85,15 @@ pub(crate) enum AquamarineError { /// Errors occurred when reserved keyword is used for variable name. ReservedKeywordError(String), + + /// Errors occurred when previous and current evidence states are incompatible. + IncompatibleEvidenceStates(EvidenceState, EvidenceState), + + /// Errors occurred when previous and current call results are incompatible. + IncompatibleCallResults(CallResult, CallResult), + + /// Errors occurred when evidence path contains less elements then corresponding Par has. + EvidencePathTooSmall(usize, usize), } impl Error for AquamarineError {} @@ -92,19 +101,13 @@ impl Error for AquamarineError {} impl std::fmt::Display for AquamarineError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { match self { - AquamarineError::SExprParseError(err) => { - write!(f, "aqua script can't be parsed: {:?}", err) + AquamarineError::SExprParseError(err) => write!(f, "aqua script can't be parsed: {:?}", err), + AquamarineError::DataDeserializationError(err) => { + write!(f, "an error occurred while data deserialization: {:?}", err) + } + AquamarineError::DataSerializationError(err) => { + write!(f, "an error occurred while data serialization: {:?}", err) } - AquamarineError::DataDeserializationError(err) => write!( - f, - "an error occurred while data deserialization: {:?}", - err - ), - AquamarineError::DataSerializationError(err) => write!( - f, - "an error occurred while data serialization: {:?}", - err - ), AquamarineError::FuncArgsSerializationError(args, err) => write!( f, "function arguments {} can't be serialized or deserialized with an error: {:?}", @@ -118,67 +121,64 @@ impl std::fmt::Display for AquamarineError { AquamarineError::CurrentPeerIdEnvError(err, env_name) => write!( f, "the environment variable \"{}\" can't be obtained: {:?}", - env_name, - err + env_name, err ), AquamarineError::InstructionError(err_msg) => write!(f, "{}", err_msg), AquamarineError::LocalServiceError(err_msg) => write!(f, "{}", err_msg), - AquamarineError::VariableNotFound(variable_name) => write!( - f, - "variable with name {} isn't present in data", - variable_name - ), - AquamarineError::MultipleVariablesFound(variable_name) => write!( - f, - "multiple variables found for name {} in data", - variable_name - ), + AquamarineError::VariableNotFound(variable_name) => { + write!(f, "variable with name {} isn't present in data", variable_name) + } + AquamarineError::MultipleVariablesFound(variable_name) => { + write!(f, "multiple variables found for name {} in data", variable_name) + } AquamarineError::VariableNotInJsonPath(value, json_path, json_path_err) => write!( f, "variable with path {} not found in {:?} with error: {:?}", json_path, value, json_path_err ), - AquamarineError::IncompatibleJValueType(avalue, desired_type) => write!( - f, - "got avalue \"{:?}\", but {} type is needed", - avalue, - desired_type, - ), - AquamarineError::MultipleValuesInJsonPath(json_path) => write!( - f, - "multiple variables found for this json path {}", - json_path - ), - AquamarineError::FoldStateNotFound(iterator) => write!( - f, - "fold state not found for this iterable {}", - iterator - ), - AquamarineError::MultipleFoldStates(iterator) => write!( - f, - "multiple fold states found for iterable {}", - iterator - ), - AquamarineError::InvalidEvidenceState(found_state, expected) => write!( + AquamarineError::IncompatibleJValueType(jvalue, desired_type) => { + write!(f, "got avalue \"{:?}\", but {} type is needed", jvalue, desired_type,) + } + AquamarineError::MultipleValuesInJsonPath(json_path) => { + write!(f, "multiple variables found for this json path {}", json_path) + } + AquamarineError::FoldStateNotFound(iterator) => { + write!(f, "fold state not found for this iterable {}", iterator) + } + AquamarineError::MultipleFoldStates(iterator) => { + write!(f, "multiple fold states found for iterable {}", iterator) + } + AquamarineError::InvalidEvidenceState(found, expected) => write!( f, "invalid evidence state: expected {}, but found {:?}", - expected, found_state - ), - AquamarineError::CallEvidenceDeserializationError(err) => write!( - f, - "an error occurred while data deserialization: {:?}", - err - ), - AquamarineError::CallEvidenceSerializationError(err) => write!( - f, - "an error occurred while data serialization: {:?}", - err + expected, found ), + AquamarineError::CallEvidenceDeserializationError(err) => { + write!(f, "an error occurred while data deserialization: {:?}", err) + } + AquamarineError::CallEvidenceSerializationError(err) => { + write!(f, "an error occurred while data serialization: {:?}", err) + } AquamarineError::ReservedKeywordError(variable_name) => write!( f, "a variable can't be named as {} because this name is reserved", variable_name ), + AquamarineError::IncompatibleEvidenceStates(prev_state, current_state) => write!( + f, + "previous and current data have incompatible states: {:?} {:?}", + prev_state, current_state + ), + AquamarineError::IncompatibleCallResults(prev_call_result, current_call_result) => write!( + f, + "previous and current call results are incompatible: {:?} {:?}", + prev_call_result, current_call_result + ), + AquamarineError::EvidencePathTooSmall(actual_count, desired_count) => write!( + f, + "evidence path remains {} elements, but {} requires by Par", + actual_count, desired_count + ), } } } @@ -217,6 +217,9 @@ impl Into for AquamarineError { AquamarineError::CallEvidenceDeserializationError(..) => 17, AquamarineError::CallEvidenceSerializationError(..) => 18, AquamarineError::ReservedKeywordError(..) => 19, + AquamarineError::IncompatibleEvidenceStates(..) => 20, + AquamarineError::IncompatibleCallResults(..) => 21, + AquamarineError::EvidencePathTooSmall(..) => 21, }; StepperOutcome { diff --git a/stepper/src/execution.rs b/stepper/src/execution.rs index b8cc795e..b5c14414 100644 --- a/stepper/src/execution.rs +++ b/stepper/src/execution.rs @@ -14,134 +14,45 @@ * limitations under the License. */ -use super::StepperOutcome; +mod epilog; +mod prolog; +mod utils; + +use epilog::make_result_data; +use prolog::make_contexts; +use prolog::prepare; +use utils::dedup; + use crate::air::ExecutableInstruction; -use crate::air::ExecutionCtx; -use crate::air::Instruction; -use crate::call_evidence::EvidenceState; -use crate::get_current_peer_id; -use crate::AquaData; -use crate::AquamarineError; use crate::Result; +use crate::StepperOutcome; +use crate::STEPPER_SUCCESS; -use crate::call_evidence::CallEvidenceCtx; +pub(self) const CALL_EVIDENCE_CTX_KEY: &str = "__call"; -use std::collections::VecDeque; - -pub(crate) fn execute_aqua(init_user_id: String, aqua: String, data: String) -> StepperOutcome { +pub(crate) fn execute_aqua(init_user_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome { log::info!( - "stepper invoked with user_id = {}, aqua = {:?}, data = {:?}", + "stepper invoked with user_id = {}, aqua = {:?}, prev_data = {:?}, data = {:?}", init_user_id, aqua, + prev_data, data ); - execute_aqua_impl(init_user_id, aqua, data).unwrap_or_else(Into::into) + execute_aqua_impl(init_user_id, aqua, prev_data, data).unwrap_or_else(Into::into) } -fn execute_aqua_impl(_init_user_id: String, aqua: String, data: String) -> Result { - let mut parsed_data: AquaData = - serde_json::from_str(&data).map_err(AquamarineError::DataDeserializationError)?; - let formatted_aqua = format_aqua(aqua); - let parsed_aqua = serde_sexpr::from_str::(&formatted_aqua)?; +fn execute_aqua_impl(_init_user_id: String, aqua: String, prev_data: String, data: String) -> Result { + let (prev_data, data, aqua) = prepare(prev_data, data, aqua)?; + let (mut exec_ctx, mut call_ctx) = make_contexts(prev_data, data)?; - log::info!( - "\nparsed_aqua: {:?}\nparsed_data: {:?}", - parsed_aqua, - parsed_data - ); + aqua.execute(&mut exec_ctx, &mut call_ctx)?; - let current_peer_id = get_current_peer_id() - .map_err(|e| AquamarineError::CurrentPeerIdEnvError(e, String::from("CURRENT_PEER_ID")))?; - - let call_evidence_ctx_key: &str = "__call"; - let current_states: VecDeque = match parsed_data.remove(call_evidence_ctx_key) { - Some(jvalue) => serde_json::from_value(jvalue) - .map_err(AquamarineError::CallEvidenceDeserializationError)?, - None => VecDeque::new(), - }; - - let mut execution_ctx = ExecutionCtx::new(parsed_data, current_peer_id); - let mut call_evidence_ctx = CallEvidenceCtx::new(current_states); - - parsed_aqua.execute(&mut execution_ctx, &mut call_evidence_ctx)?; - - let serialized_call_ctx = serde_json::to_value(call_evidence_ctx.new_states) - .map_err(AquamarineError::CallEvidenceSerializationError)?; - execution_ctx - .data - .insert(call_evidence_ctx_key.to_string(), serialized_call_ctx); - - let data = serde_json::to_string(&execution_ctx.data) - .map_err(AquamarineError::DataSerializationError)?; + let data = make_result_data(exec_ctx.data, call_ctx)?; Ok(StepperOutcome { - ret_code: 0, + ret_code: STEPPER_SUCCESS, data, - next_peer_pks: dedup(execution_ctx.next_peer_pks), + next_peer_pks: dedup(exec_ctx.next_peer_pks), }) } - -/// Formats aqua script in a form of S-expressions to a form compatible with the serde_sexpr crate. -fn format_aqua(aqua: String) -> String { - use std::iter::FromIterator; - - let mut formatted_aqua = Vec::with_capacity(aqua.len()); - // whether to skip the next whitespace - let mut skip_next_whitespace = false; - // whether c was a closing brace - let mut was_cbr = false; - - for c in aqua.chars() { - let is_whitespace = c == ' '; - if (skip_next_whitespace && is_whitespace) || c == '\n' { - continue; - } - - let is_cbr = c == ')'; - - skip_next_whitespace = is_whitespace || c == '(' || is_cbr; - if was_cbr && !is_cbr { - formatted_aqua.push(' '); - } - - was_cbr = is_cbr; - formatted_aqua.push(c) - } - - String::from_iter(formatted_aqua.into_iter()) -} - -use std::hash::Hash; - -fn dedup(mut vec: Vec) -> Vec { - use std::collections::HashSet; - - let set: HashSet<_> = vec.drain(..).collect(); // dedup - set.into_iter().collect() -} - -#[cfg(test)] -mod tests { - #[test] - fn format_aqua_test() { - let aqua = format!( - r#"(( (( (seq ( - (call (%current_peer_id% (add_module ||) (module) module)) - (seq ( - (call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id)) - (seq ( - (call (%current_peer_id% (create ||) (blueprint_id) service_id)) - (call ({} (|| ||) (service_id) client_result)) - ) ) - ) ) - ))"#, - "abc" - ); - - let aqua = super::format_aqua(aqua); - let formatted_aqua = String::from("(((((seq ((call (%current_peer_id% (add_module ||) (module) module)) (seq ((call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id)) (seq ((call (%current_peer_id% (create ||) (blueprint_id) service_id)) (call (abc (|| ||) (service_id) client_result))))))))"); - - assert_eq!(aqua, formatted_aqua); - } -} diff --git a/stepper/src/execution/epilog.rs b/stepper/src/execution/epilog.rs new file mode 100644 index 00000000..8b2f7005 --- /dev/null +++ b/stepper/src/execution/epilog.rs @@ -0,0 +1,33 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::CALL_EVIDENCE_CTX_KEY; +use crate::call_evidence::CallEvidenceCtx; +use crate::AquaData; +use crate::AquamarineError::CallEvidenceSerializationError as CallSeError; +use crate::AquamarineError::DataSerializationError as DataSeError; +use crate::Result; + +pub(super) fn make_result_data(mut data: AquaData, call_ctx: CallEvidenceCtx) -> Result { + use serde_json::{to_string, to_value}; + + let serialized_call_ctx = to_value(call_ctx.new_path).map_err(CallSeError)?; + data.insert(CALL_EVIDENCE_CTX_KEY.to_string(), serialized_call_ctx); + + let data = to_string(&data).map_err(DataSeError)?; + + Ok(data) +} diff --git a/stepper/src/execution/prolog.rs b/stepper/src/execution/prolog.rs new file mode 100644 index 00000000..c1008eaf --- /dev/null +++ b/stepper/src/execution/prolog.rs @@ -0,0 +1,99 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::utils::format_aqua; +use super::CALL_EVIDENCE_CTX_KEY; +use crate::air::ExecutionCtx; +use crate::air::Instruction; +use crate::call_evidence::merge_call_states; +use crate::call_evidence::CallEvidenceCtx; +use crate::call_evidence::EvidenceState; +use crate::get_current_peer_id; +use crate::AquaData; +use crate::AquamarineError; +use crate::Result; + +use std::collections::VecDeque; + +/// Parse and prepare supplied data and aqua script. +pub(super) fn prepare(prev_data: String, data: String, aqua: String) -> Result<(AquaData, AquaData, Instruction)> { + use AquamarineError::DataDeserializationError as DataDeError; + + let parsed_prev_data: AquaData = serde_json::from_str(&prev_data).map_err(DataDeError)?; + let parsed_data: AquaData = serde_json::from_str(&data).map_err(DataDeError)?; + + let formatted_aqua = format_aqua(aqua); + let parsed_aqua: Instruction = serde_sexpr::from_str(&formatted_aqua)?; + + log::info!( + "\nparsed aqua: {:?}\nparsed prev_data: {:?}\nparsed data: {:?}", + parsed_aqua, + parsed_prev_data, + parsed_data + ); + + Ok((parsed_prev_data, parsed_data, parsed_aqua)) +} + +/// Make execution and call evidence contexts from supplied data. +/// Internally, it unites variable from previous and current data and merges call evidence paths. +pub(super) fn make_contexts(mut prev_data: AquaData, mut data: AquaData) -> Result<(ExecutionCtx, CallEvidenceCtx)> { + use AquamarineError::CallEvidenceDeserializationError as CallDeError; + use AquamarineError::CurrentPeerIdEnvError as EnvError; + + let current_peer_id = get_current_peer_id().map_err(|e| EnvError(e, String::from("CURRENT_PEER_ID")))?; + + let prev_states: VecDeque = match prev_data.remove(CALL_EVIDENCE_CTX_KEY) { + Some(jvalue) => serde_json::from_value(jvalue).map_err(CallDeError)?, + None => VecDeque::new(), + }; + + let states: VecDeque = match data.remove(CALL_EVIDENCE_CTX_KEY) { + Some(jvalue) => serde_json::from_value(jvalue).map_err(CallDeError)?, + None => VecDeque::new(), + }; + + let data = merge_data(prev_data, data)?; + let current_states = merge_call_states(prev_states, states)?; + + let execution_ctx = ExecutionCtx::new(data, current_peer_id); + let call_evidence_ctx = CallEvidenceCtx::new(current_states); + + Ok((execution_ctx, call_evidence_ctx)) +} + +fn merge_data(mut prev_data: AquaData, data: AquaData) -> Result { + use boolinator::Boolinator; + use std::collections::hash_map::Entry::{Occupied, Vacant}; + use AquamarineError::MultipleVariablesFound; + + for (key, value) in data { + match prev_data.entry(key) { + Vacant(entry) => { + entry.insert(value); + } + // check that data has equal values for the same key + Occupied(entry) => { + entry + .get() + .eq(&value) + .ok_or_else(|| MultipleVariablesFound(entry.key().clone()))?; + } + } + } + + Ok(prev_data) +} diff --git a/stepper/src/execution/utils.rs b/stepper/src/execution/utils.rs new file mode 100644 index 00000000..042833b5 --- /dev/null +++ b/stepper/src/execution/utils.rs @@ -0,0 +1,80 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::hash::Hash; + +/// Formats aqua script in a form of S-expressions to a form compatible with the serde_sexpr crate. +pub(super) fn format_aqua(aqua: String) -> String { + use std::iter::FromIterator; + + let mut formatted_aqua = Vec::with_capacity(aqua.len()); + // whether to skip the next whitespace + let mut skip_next_whitespace = false; + // whether c was a closing brace + let mut was_cbr = false; + + for c in aqua.chars() { + let is_whitespace = c == ' '; + if (skip_next_whitespace && is_whitespace) || c == '\n' { + continue; + } + + let is_cbr = c == ')'; + + skip_next_whitespace = is_whitespace || c == '(' || is_cbr; + if was_cbr && !is_cbr { + formatted_aqua.push(' '); + } + + was_cbr = is_cbr; + formatted_aqua.push(c) + } + + String::from_iter(formatted_aqua.into_iter()) +} + +/// Deduplicate values in a supplied vector. +pub(super) fn dedup(mut vec: Vec) -> Vec { + use std::collections::HashSet; + + let set: HashSet<_> = vec.drain(..).collect(); + set.into_iter().collect() +} + +#[cfg(test)] +mod tests { + #[test] + fn format_aqua_test() { + let aqua = format!( + r#"(( (( (seq ( + (call (%current_peer_id% (add_module ||) (module) module)) + (seq ( + (call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id)) + (seq ( + (call (%current_peer_id% (create ||) (blueprint_id) service_id)) + (call ({} (|| ||) (service_id) client_result)) + ) ) + ) ) + ))"#, + "abc" + ); + + let aqua = super::format_aqua(aqua); + let formatted_aqua = String::from("(((((seq ((call (%current_peer_id% (add_module ||) (module) module)) (seq ((call (%current_peer_id% (add_blueprint ||) (blueprint) blueprint_id)) (seq ((call (%current_peer_id% (create ||) (blueprint_id) service_id)) (call (abc (|| ||) (service_id) client_result))))))))"); + + assert_eq!(aqua, formatted_aqua); + } +} diff --git a/stepper/src/fce.rs b/stepper/src/fce.rs index f2438bfa..7bee635e 100644 --- a/stepper/src/fce.rs +++ b/stepper/src/fce.rs @@ -14,6 +14,7 @@ * limitations under the License. */ +#![allow(improper_ctypes)] #![warn(rust_2018_idioms)] #![deny( dead_code, @@ -45,8 +46,8 @@ pub fn main() { } #[fce] -pub fn invoke(init_user_id: String, aqua: String, data: String) -> StepperOutcome { - execute_aqua(init_user_id, aqua, data) +pub fn invoke(init_user_id: String, aqua: String, prev_data: String, data: String) -> StepperOutcome { + execute_aqua(init_user_id, aqua, prev_data, data) } pub fn get_current_peer_id() -> std::result::Result { diff --git a/stepper/src/stepper_outcome.rs b/stepper/src/stepper_outcome.rs index 87fe3004..f3c1460f 100644 --- a/stepper/src/stepper_outcome.rs +++ b/stepper/src/stepper_outcome.rs @@ -17,6 +17,8 @@ use fluence::fce; use serde::{Deserialize, Serialize}; +pub const STEPPER_SUCCESS: i32 = 0; + #[fce] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct StepperOutcome { diff --git a/stepper/src/wasm_bindgen.rs b/stepper/src/wasm_bindgen.rs index 7d533b8f..423038b2 100644 --- a/stepper/src/wasm_bindgen.rs +++ b/stepper/src/wasm_bindgen.rs @@ -14,6 +14,18 @@ * limitations under the License. */ +#![allow(unused_attributes)] +#![warn(rust_2018_idioms)] +#![deny( + dead_code, + nonstandard_style, + unused_imports, + unused_mut, + unused_variables, + // unused_unsafe, + unreachable_patterns +)] + mod air; mod call_evidence; mod defines; @@ -34,8 +46,8 @@ pub fn main() { } #[wasm_bindgen] -pub fn invoke(init_user_id: String, aqua: String, data: String) -> String { - let outcome = execute_aqua(init_user_id, aqua, data); +pub fn invoke(init_user_id: String, aqua: String, prev_data: String, data: String) -> String { + let outcome = execute_aqua(init_user_id, aqua, prev_data, data); serde_json::to_string(&outcome).expect("Cannot parse StepperOutcome") } diff --git a/stepper/tests/air_basic.rs b/stepper/tests/air_basic.rs index 123d8134..f605c4c2 100644 --- a/stepper/tests/air_basic.rs +++ b/stepper/tests/air_basic.rs @@ -40,11 +40,10 @@ fn seq_par_call() { ); let res = vm - .call(json!([String::from("asd"), script, String::from("{}"),])) + .call(json!(["asd", script, "{}", "{}",])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res.data).expect("stepper should return valid json"); let right_json = json!( { "result_1" : "test", @@ -75,11 +74,10 @@ fn par_par_call() { ); let res = vm - .call(json!([String::from("asd"), script, String::from("{}"),])) + .call(json!(["asd", script, "{}", "{}",])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res.data).expect("stepper should return valid json"); let right_json = json!( { "result_1" : "test", @@ -146,18 +144,14 @@ fn create_service() { }; Some(IValue::Record( - Vec1::new(vec![ - IValue::S32(0), - IValue::String(format!("\"{}\"", response)), - ]) - .unwrap(), + Vec1::new(vec![IValue::S32(0), IValue::String(format!("\"{}\"", response))]).unwrap(), )) }); let mut vm = create_aqua_vm(call_service, ""); let res = vm - .call(json!([String::from("init_user_pk"), script, data,])) + .call(json!(["init_user_pk", script, "{}", data,])) .expect("should be successful"); let resulted_data: JValue = serde_json::from_str(&res.data).expect("should be correct json"); diff --git a/stepper/tests/call_evidence_basic.rs b/stepper/tests/call_evidence_basic.rs index 8f2cc29a..d5cb4d06 100644 --- a/stepper/tests/call_evidence_basic.rs +++ b/stepper/tests/call_evidence_basic.rs @@ -27,8 +27,6 @@ type JValue = serde_json::Value; #[test] fn evidence_seq_par_call() { - env_logger::init(); - let mut vm = create_aqua_vm(unit_call_service(), ""); let script = String::from( @@ -44,8 +42,9 @@ fn evidence_seq_par_call() { let res = vm .call(json!([ - String::from("asd"), + "asd", script, + "{}", json!({ "__call": [ { "par": [1,1] }, @@ -57,8 +56,7 @@ fn evidence_seq_par_call() { ])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res.data).expect("stepper should return valid json"); let right_json = json!( { "result_2": "test", @@ -91,8 +89,9 @@ fn evidence_par_par_call() { let res = vm .call(json!([ - String::from("asd"), + "asd", script, + "{}", json!({ "__call": [ { "par": [3,0] }, @@ -105,8 +104,7 @@ fn evidence_par_par_call() { ])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res.data).expect("stepper should return valid json"); let right_json = json!( { "result_1" : "test", @@ -145,23 +143,22 @@ fn evidence_seq_seq() { ); let res1 = vm2 - .call(json!([String::from("asd"), script, String::from("{}")])) + .call(json!(["asd", script, "{}", "{}",])) .expect("should be successful"); assert_eq!(res1.next_peer_pks, vec![peer_id_1.clone()]); let res2 = vm1 - .call(json!([String::from("asd"), script, res1.data])) + .call(json!(["asd", script, "{}", res1.data,])) .expect("should be successful"); assert_eq!(res2.next_peer_pks, vec![peer_id_2.clone()]); let res3 = vm2 - .call(json!([String::from("asd"), script, res2.data])) + .call(json!(["asd", script, "{}", res2.data,])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res3.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res3.data).expect("stepper should return valid json"); let right_json = json!( { "void0": "test", @@ -234,18 +231,14 @@ fn evidence_create_service() { }; Some(IValue::Record( - Vec1::new(vec![ - IValue::S32(0), - IValue::String(format!("\"{}\"", response)), - ]) - .unwrap(), + Vec1::new(vec![IValue::S32(0), IValue::String(format!("\"{}\"", response))]).unwrap(), )) }); let mut vm = create_aqua_vm(call_service, ""); let res = vm - .call(json!([String::from("init_user_pk"), script, data,])) + .call(json!(["init_user_pk", script, "{}", data,])) .expect("should be successful"); let resulted_data: JValue = serde_json::from_str(&res.data).expect("should be correct json"); @@ -261,7 +254,7 @@ fn evidence_create_service() { #[test] fn evidence_par_seq_fold_call() { - let return_numbers_call_service: HostExportedFunc = Box::new(|_, args| -> Option { + let return_numbers_call_service: HostExportedFunc = Box::new(|_, _| -> Option { Some(IValue::Record( Vec1::new(vec![ IValue::S32(0), @@ -295,8 +288,9 @@ fn evidence_par_seq_fold_call() { let res1 = vm2 .call(json!([ - String::from("asd"), + "asd", script, + "{}", json!({ "__call": [] }) @@ -305,25 +299,24 @@ fn evidence_par_seq_fold_call() { .expect("should be successful"); let res2 = vm1 - .call(json!([String::from("asd"), script, res1.data])) + .call(json!(["asd", script, "{}", res1.data,])) .expect("should be successful"); let mut data = res2.data; for _ in 0..100 { let res3 = vm2 - .call(json!([String::from("asd"), script, data])) + .call(json!(["asd", script, "{}", data,])) .expect("should be successful"); data = res3.data; } let res4 = vm3 - .call(json!([String::from("asd"), script, data])) + .call(json!(["asd", script, "{}", data,])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res4.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res4.data).expect("stepper should return valid json"); let right_json = json!( { "result_2": "test", @@ -362,7 +355,7 @@ fn evidence_par_seq_fold_call() { #[test] fn evidence_par_seq_fold_in_cycle_call() { - let return_numbers_call_service: HostExportedFunc = Box::new(|_, args| -> Option { + let return_numbers_call_service: HostExportedFunc = Box::new(|_, _| -> Option { Some(IValue::Record( Vec1::new(vec![ IValue::S32(0), @@ -398,26 +391,25 @@ fn evidence_par_seq_fold_in_cycle_call() { for _ in 0..100 { let res1 = vm1 - .call(json!([String::from("asd"), script, data])) + .call(json!(["asd", script, "{}", data])) .expect("should be successful"); data = res1.data; let res2 = vm2 - .call(json!([String::from("asd"), script, data])) + .call(json!(["asd", script, "{}", data])) .expect("should be successful"); data = res2.data; let res3 = vm3 - .call(json!([String::from("asd"), script, data])) + .call(json!(["asd", script, "{}", data])) .expect("should be successful"); data = res3.data; } - let resulted_json: JValue = - serde_json::from_str(&data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&data).expect("stepper should return valid json"); let right_json = json!( { "result_2": "test", @@ -479,23 +471,22 @@ fn evidence_seq_par_seq_seq() { ); let res1 = vm2 - .call(json!([String::from("asd"), script, String::from("{}")])) + .call(json!(["asd", script, "{}", "{}",])) .expect("should be successful"); assert_eq!(res1.next_peer_pks, vec![peer_id_1.clone()]); let res2 = vm1 - .call(json!([String::from("asd"), script, res1.data])) + .call(json!(["asd", script, "{}", res1.data])) .expect("should be successful"); assert_eq!(res2.next_peer_pks, vec![peer_id_2.clone()]); let res3 = vm2 - .call(json!([String::from("asd"), script, res2.data])) + .call(json!(["asd", script, "{}", res2.data])) .expect("should be successful"); - let resulted_json: JValue = - serde_json::from_str(&res3.data).expect("stepper should return valid json"); + let resulted_json: JValue = serde_json::from_str(&res3.data).expect("stepper should return valid json"); let right_json = json!( { "result_1": "test", diff --git a/stepper/tests/data_merge.rs b/stepper/tests/data_merge.rs new file mode 100644 index 00000000..b20fe7de --- /dev/null +++ b/stepper/tests/data_merge.rs @@ -0,0 +1,187 @@ +/* + * Copyright 2020 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use aqua_test_utils::create_aqua_vm; +use aqua_test_utils::echo_number_call_service; +use aqua_test_utils::unit_call_service; +use aquamarine_vm::vec1::Vec1; +use aquamarine_vm::HostExportedFunc; +use aquamarine_vm::IValue; + +use serde_json::json; + +type JValue = serde_json::Value; + +#[test] +fn data_merge() { + let neighborhood_call_service1: HostExportedFunc = Box::new(|_, _| -> Option { + Some(IValue::Record( + Vec1::new(vec![IValue::S32(0), IValue::String(String::from("[\"A\", \"B\"]"))]).unwrap(), + )) + }); + + let neighborhood_call_service2: HostExportedFunc = Box::new(|_, _| -> Option { + Some(IValue::Record( + Vec1::new(vec![IValue::S32(0), IValue::String(String::from("[\"A\", \"B\"]"))]).unwrap(), + )) + }); + + let mut vm1 = create_aqua_vm(neighborhood_call_service1, "A"); + let mut vm2 = create_aqua_vm(neighborhood_call_service2, "B"); + + let script = String::from( + r#" + (seq ( + (call (%current_peer_id% ("neighborhood" "") () neighborhood)) + (seq ( + (seq ( + (fold (neighborhood i + (par ( + (call (i ("add_provider" "") () void[])) + (next i) + )) + )) + (fold (neighborhood i + (par ( + (call (i ("get_providers" "") () providers[])) + (next i) + )) + )) + )) + (seq ( + (call ("A" ("identity" "") () void[])) + (call ("B" ("" "") () none)) + )) + )) + )) + "#, + ); + + let res1 = vm1 + .call(json!(["asd", script, "{}", "{}"])) + .expect("should be successful"); + + let res2 = vm2 + .call(json!(["asd", script, "{}", "{}"])) + .expect("should be successful"); + + let res3 = vm2 + .call(json!(["asd", script, res1.data, res2.data])) + .expect("should be successful"); + + let res4 = vm1 + .call(json!(["asd", script, res1.data, res2.data])) + .expect("should be successful"); + + let res5 = vm2 + .call(json!(["asd", script, res3.data, res4.data])) + .expect("should be successful"); + + let res6 = vm1 + .call(json!(["asd", script, res3.data, res4.data])) + .expect("should be successful"); + + let resulted_json3: JValue = serde_json::from_str(&res3.data).expect("stepper should return valid json"); + + let right_json3 = json!( { + "void": [["A", "B"]], + "neighborhood": ["A", "B"], + "providers": [["A", "B"]], + "__call": [ + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "par": [1,2] }, + { "call": "request_sent" }, + { "par": [1,0] }, + { "call": "executed" }, + ] + }); + + assert_eq!(resulted_json3, right_json3); + assert_eq!(res3.next_peer_pks, vec![String::from("A")]); + + let resulted_json4: JValue = serde_json::from_str(&res4.data).expect("stepper should return valid json"); + + let right_json4 = json!( { + "void": [["A", "B"]], + "neighborhood": ["A", "B"], + "providers": [["A", "B"]], + "__call": [ + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(resulted_json4, right_json4); + assert_eq!(res4.next_peer_pks, vec![String::from("B")]); + + let resulted_json5: JValue = serde_json::from_str(&res5.data).expect("stepper should return valid json"); + + let right_json5 = json!( { + "void": [["A", "B"]], + "neighborhood": ["A", "B"], + "providers": [["A", "B"]], + "__call": [ + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "call": "request_sent" }, + ] + }); + + assert_eq!(resulted_json5, right_json5); + assert_eq!(res5.next_peer_pks, vec![String::from("A")]); + + let resulted_json6: JValue = serde_json::from_str(&res6.data).expect("stepper should return valid json"); + + let right_json6 = json!( { + "void": [["A", "B"], ["A", "B"]], + "neighborhood": ["A", "B"], + "providers": [["A", "B"]], + "__call": [ + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "par": [1,2] }, + { "call": "executed" }, + { "par": [1,0] }, + { "call": "executed" }, + { "call": "executed" }, + { "call": "request_sent" } + ] + }); + + assert_eq!(resulted_json6, right_json6); + assert_eq!(res6.next_peer_pks, vec![String::from("B")]); +}