mirror of
https://github.com/fluencelabs/aquavm
synced 2025-06-27 13:41:32 +00:00
Particle File Vault (#120)
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -153,7 +153,7 @@ checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "avm-server"
|
name = "avm-server"
|
||||||
version = "0.7.0"
|
version = "0.8.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"air-interpreter-interface",
|
"air-interpreter-interface",
|
||||||
"fluence-faas",
|
"fluence-faas",
|
||||||
|
@ -130,8 +130,8 @@ fn duplicate_variables() {
|
|||||||
// Check that string literals can be used as call parameters.
|
// Check that string literals can be used as call parameters.
|
||||||
#[test]
|
#[test]
|
||||||
fn string_parameters() {
|
fn string_parameters() {
|
||||||
let call_service: CallServiceClosure = Box::new(|_, args| -> Option<IValue> {
|
let call_service: CallServiceClosure = Box::new(|args| -> Option<IValue> {
|
||||||
let arg = match &args[2] {
|
let arg = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -123,8 +123,8 @@ fn create_service() {
|
|||||||
let add_blueprint_response = String::from("add_blueprint response");
|
let add_blueprint_response = String::from("add_blueprint response");
|
||||||
let create_response = String::from("create response");
|
let create_response = String::from("create response");
|
||||||
|
|
||||||
let call_service: CallServiceClosure = Box::new(move |_, args| -> Option<IValue> {
|
let call_service: CallServiceClosure = Box::new(move |args| -> Option<IValue> {
|
||||||
let builtin_service = match &args[0] {
|
let builtin_service = match &args.function_args[0] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -72,18 +72,18 @@ fn client_host_function(
|
|||||||
);
|
);
|
||||||
|
|
||||||
let all_info_inner = all_info.clone();
|
let all_info_inner = all_info.clone();
|
||||||
let host_function: CallServiceClosure = Box::new(move |_, args| -> Option<IValue> {
|
let host_function: CallServiceClosure = Box::new(move |args| -> Option<IValue> {
|
||||||
let service_name = match &args[0] {
|
let service_name = match &args.function_args[0] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let function_name = match &args[1] {
|
let function_name = match &args.function_args[1] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let function_args = match &args[2] {
|
let function_args = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
@ -132,18 +132,18 @@ fn peer_host_function(
|
|||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
Box::new(move |_, args| -> Option<IValue> {
|
Box::new(move |args| -> Option<IValue> {
|
||||||
let service_name = match &args[0] {
|
let service_name = match &args.function_args[0] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let function_name = match &args[1] {
|
let function_name = match &args.function_args[1] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let function_args = match &args[2] {
|
let function_args = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -14,6 +14,9 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
use pretty_assertions::assert_eq;
|
||||||
|
use serde_json::json;
|
||||||
|
|
||||||
use air_test_utils::call_vm;
|
use air_test_utils::call_vm;
|
||||||
use air_test_utils::create_avm;
|
use air_test_utils::create_avm;
|
||||||
use air_test_utils::executed_state;
|
use air_test_utils::executed_state;
|
||||||
@ -24,22 +27,19 @@ use air_test_utils::ExecutionTrace;
|
|||||||
use air_test_utils::IValue;
|
use air_test_utils::IValue;
|
||||||
use air_test_utils::NEVec;
|
use air_test_utils::NEVec;
|
||||||
|
|
||||||
use pretty_assertions::assert_eq;
|
|
||||||
use serde_json::json;
|
|
||||||
|
|
||||||
type JValue = serde_json::Value;
|
type JValue = serde_json::Value;
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn data_merge() {
|
fn data_merge() {
|
||||||
use executed_state::*;
|
use executed_state::*;
|
||||||
|
|
||||||
let neighborhood_call_service1: CallServiceClosure = Box::new(|_, _| -> Option<IValue> {
|
let neighborhood_call_service1: CallServiceClosure = Box::new(|_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![IValue::S32(0), IValue::String(String::from("[\"A\", \"B\"]"))]).unwrap(),
|
NEVec::new(vec![IValue::S32(0), IValue::String(String::from("[\"A\", \"B\"]"))]).unwrap(),
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
|
||||||
let neighborhood_call_service2: CallServiceClosure = Box::new(|_, _| -> Option<IValue> {
|
let neighborhood_call_service2: CallServiceClosure = Box::new(|_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![IValue::S32(0), IValue::String(String::from("[\"A\", \"B\"]"))]).unwrap(),
|
NEVec::new(vec![IValue::S32(0), IValue::String(String::from("[\"A\", \"B\"]"))]).unwrap(),
|
||||||
))
|
))
|
||||||
@ -166,15 +166,15 @@ fn data_merge() {
|
|||||||
fn acc_merge() {
|
fn acc_merge() {
|
||||||
env_logger::init();
|
env_logger::init();
|
||||||
|
|
||||||
let neighborhood_call_service: CallServiceClosure = Box::new(|_, args| -> Option<IValue> {
|
let neighborhood_call_service: CallServiceClosure = Box::new(|args| -> Option<IValue> {
|
||||||
let args_count = match &args[1] {
|
let args_count = match &args.function_args[1] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let args_count = (args_count.as_bytes()[0] - b'0') as usize;
|
let args_count = (args_count.as_bytes()[0] - b'0') as usize;
|
||||||
|
|
||||||
let args_json = match &args[2] {
|
let args_json = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
@ -231,8 +231,8 @@ fn fold_merge() {
|
|||||||
"stream2".to_string() => r#"["s4", "s5", "s6"]"#.to_string(),
|
"stream2".to_string() => r#"["s4", "s5", "s6"]"#.to_string(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let local_vm_service_call: CallServiceClosure = Box::new(|_, values| -> Option<IValue> {
|
let local_vm_service_call: CallServiceClosure = Box::new(|args| -> Option<IValue> {
|
||||||
let args = match &values[2] {
|
let args = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -37,22 +37,22 @@ struct ClosureCallArgs {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn create_check_service_closure(closure_call_args: ClosureCallArgs) -> CallServiceClosure {
|
fn create_check_service_closure(closure_call_args: ClosureCallArgs) -> CallServiceClosure {
|
||||||
Box::new(move |_, args| -> Option<IValue> {
|
Box::new(move |args| -> Option<IValue> {
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
|
||||||
let service_id = match &args[0] {
|
let service_id = match &args.function_args[0] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
*closure_call_args.service_id_var.deref().borrow_mut() = service_id.clone();
|
*closure_call_args.service_id_var.deref().borrow_mut() = service_id.clone();
|
||||||
|
|
||||||
let function_name = match &args[1] {
|
let function_name = match &args.function_args[1] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
*closure_call_args.function_name_var.deref().borrow_mut() = function_name.clone();
|
*closure_call_args.function_name_var.deref().borrow_mut() = function_name.clone();
|
||||||
|
|
||||||
let call_args = match &args[2] {
|
let call_args = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -29,7 +29,7 @@ use serde_json::json;
|
|||||||
fn join_chat() {
|
fn join_chat() {
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
let members_call_service1: CallServiceClosure = Box::new(|_, _| -> Option<IValue> {
|
let members_call_service1: CallServiceClosure = Box::new(|_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![
|
NEVec::new(vec![
|
||||||
IValue::S32(0),
|
IValue::S32(0),
|
||||||
@ -193,7 +193,7 @@ fn join_chat() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn join() {
|
fn join() {
|
||||||
let members_call_service1: CallServiceClosure = Box::new(|_, _| -> Option<IValue> {
|
let members_call_service1: CallServiceClosure = Box::new(|_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(),
|
NEVec::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(),
|
||||||
))
|
))
|
||||||
@ -249,7 +249,7 @@ fn join() {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn init_peer_id() {
|
fn init_peer_id() {
|
||||||
let members_call_service1: CallServiceClosure = Box::new(|_, _| -> Option<IValue> {
|
let members_call_service1: CallServiceClosure = Box::new(|_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(),
|
NEVec::new(vec![IValue::S32(0), IValue::String(String::from(r#"[["A"], ["B"]]"#))]).unwrap(),
|
||||||
))
|
))
|
||||||
|
@ -33,8 +33,8 @@ fn create_check_service_closure(
|
|||||||
args_to_check: ArgToCheck<LastError>,
|
args_to_check: ArgToCheck<LastError>,
|
||||||
tetraplets_to_check: ArgToCheck<Vec<Vec<SecurityTetraplet>>>,
|
tetraplets_to_check: ArgToCheck<Vec<Vec<SecurityTetraplet>>>,
|
||||||
) -> CallServiceClosure {
|
) -> CallServiceClosure {
|
||||||
Box::new(move |_, args| -> Option<IValue> {
|
Box::new(move |args| -> Option<IValue> {
|
||||||
let call_args = match &args[2] {
|
let call_args = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
@ -42,7 +42,7 @@ fn create_check_service_closure(
|
|||||||
let mut call_args: Vec<LastError> =
|
let mut call_args: Vec<LastError> =
|
||||||
serde_json::from_str(call_args).expect("json deserialization shouldn't fail");
|
serde_json::from_str(call_args).expect("json deserialization shouldn't fail");
|
||||||
|
|
||||||
let tetraplets = match &args[3] {
|
let tetraplets = match &args.function_args[3] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -25,8 +25,8 @@ use serde_json::Value as JValue;
|
|||||||
#[test]
|
#[test]
|
||||||
fn empty_stream() {
|
fn empty_stream() {
|
||||||
fn arg_type_check_closure() -> CallServiceClosure {
|
fn arg_type_check_closure() -> CallServiceClosure {
|
||||||
Box::new(move |_, args| -> Option<IValue> {
|
Box::new(move |args| -> Option<IValue> {
|
||||||
let call_args = match &args[2] {
|
let call_args = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "avm-server"
|
name = "avm-server"
|
||||||
description = "Fluence AIR VM"
|
description = "Fluence AIR VM"
|
||||||
version = "0.7.0"
|
version = "0.8.0"
|
||||||
authors = ["Fluence Labs"]
|
authors = ["Fluence Labs"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license = "Apache-2.0"
|
license = "Apache-2.0"
|
||||||
|
@ -14,11 +14,14 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
use crate::call_service::CallServiceArgs;
|
||||||
use crate::config::AVMConfig;
|
use crate::config::AVMConfig;
|
||||||
|
use crate::data_store::{create_vault_effect, particle_vault_dir, prev_data_file};
|
||||||
|
use crate::errors::AVMError::CleanupParticleError;
|
||||||
use crate::AVMError;
|
use crate::AVMError;
|
||||||
|
use crate::InterpreterOutcome;
|
||||||
use crate::{CallServiceClosure, IType, Result};
|
use crate::{CallServiceClosure, IType, Result};
|
||||||
|
|
||||||
use crate::InterpreterOutcome;
|
|
||||||
use fluence_faas::FluenceFaaS;
|
use fluence_faas::FluenceFaaS;
|
||||||
use fluence_faas::HostImportDescriptor;
|
use fluence_faas::HostImportDescriptor;
|
||||||
use fluence_faas::IValue;
|
use fluence_faas::IValue;
|
||||||
@ -61,6 +64,7 @@ pub struct ParticleParameters {
|
|||||||
pub struct AVM {
|
pub struct AVM {
|
||||||
faas: SendSafeFaaS,
|
faas: SendSafeFaaS,
|
||||||
particle_data_store: PathBuf,
|
particle_data_store: PathBuf,
|
||||||
|
vault_dir: PathBuf,
|
||||||
/// file name of the AIR interpreter .wasm
|
/// file name of the AIR interpreter .wasm
|
||||||
wasm_filename: String,
|
wasm_filename: String,
|
||||||
/// information about the particle that is being executed at the moment
|
/// information about the particle that is being executed at the moment
|
||||||
@ -70,10 +74,16 @@ pub struct AVM {
|
|||||||
impl AVM {
|
impl AVM {
|
||||||
/// Create AVM with provided config.
|
/// Create AVM with provided config.
|
||||||
pub fn new(config: AVMConfig) -> Result<Self> {
|
pub fn new(config: AVMConfig) -> Result<Self> {
|
||||||
use AVMError::InvalidDataStorePath;
|
use AVMError::{CreateVaultDirError, InvalidDataStorePath};
|
||||||
|
|
||||||
let current_particle: Arc<Mutex<ParticleParameters>> = <_>::default();
|
let current_particle: Arc<Mutex<ParticleParameters>> = <_>::default();
|
||||||
let call_service = call_service_descriptor(current_particle.clone(), config.call_service);
|
let particle_data_store = config.particle_data_store;
|
||||||
|
let vault_dir = config.vault_dir;
|
||||||
|
let call_service = call_service_descriptor(
|
||||||
|
current_particle.clone(),
|
||||||
|
config.call_service,
|
||||||
|
vault_dir.clone(),
|
||||||
|
);
|
||||||
let (wasm_dir, wasm_filename) = split_dirname(config.air_wasm_path)?;
|
let (wasm_dir, wasm_filename) = split_dirname(config.air_wasm_path)?;
|
||||||
|
|
||||||
let faas_config = make_faas_config(
|
let faas_config = make_faas_config(
|
||||||
@ -85,13 +95,15 @@ impl AVM {
|
|||||||
);
|
);
|
||||||
let faas = FluenceFaaS::with_raw_config(faas_config)?;
|
let faas = FluenceFaaS::with_raw_config(faas_config)?;
|
||||||
|
|
||||||
let particle_data_store = config.particle_data_store;
|
|
||||||
std::fs::create_dir_all(&particle_data_store)
|
std::fs::create_dir_all(&particle_data_store)
|
||||||
.map_err(|e| InvalidDataStorePath(e, particle_data_store.clone()))?;
|
.map_err(|e| InvalidDataStorePath(e, particle_data_store.clone()))?;
|
||||||
|
std::fs::create_dir_all(&vault_dir)
|
||||||
|
.map_err(|e| CreateVaultDirError(e, vault_dir.clone()))?;
|
||||||
|
|
||||||
let avm = Self {
|
let avm = Self {
|
||||||
faas: SendSafeFaaS(faas),
|
faas: SendSafeFaaS(faas),
|
||||||
particle_data_store,
|
particle_data_store,
|
||||||
|
vault_dir,
|
||||||
wasm_filename,
|
wasm_filename,
|
||||||
current_particle,
|
current_particle,
|
||||||
};
|
};
|
||||||
@ -111,7 +123,7 @@ impl AVM {
|
|||||||
let particle_id = particle_id.into();
|
let particle_id = particle_id.into();
|
||||||
let init_user_id = init_user_id.into();
|
let init_user_id = init_user_id.into();
|
||||||
|
|
||||||
let prev_data_path = self.particle_data_store.join(&particle_id);
|
let prev_data_path = prev_data_file(&self.particle_data_store, &particle_id);
|
||||||
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
|
// TODO: check for errors related to invalid file content (such as invalid UTF8 string)
|
||||||
let prev_data = std::fs::read_to_string(&prev_data_path)
|
let prev_data = std::fs::read_to_string(&prev_data_path)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
@ -136,6 +148,19 @@ impl AVM {
|
|||||||
Ok(outcome)
|
Ok(outcome)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Remove particle directories and files:
|
||||||
|
/// - prev data file
|
||||||
|
/// - particle file vault directory
|
||||||
|
pub fn cleanup_particle(&self, particle_id: &str) -> Result<()> {
|
||||||
|
let prev_data = prev_data_file(&self.particle_data_store, particle_id);
|
||||||
|
std::fs::remove_file(&prev_data).map_err(|err| CleanupParticleError(err, prev_data))?;
|
||||||
|
|
||||||
|
let vault_dir = particle_vault_dir(&self.vault_dir, particle_id);
|
||||||
|
std::fs::remove_dir_all(&vault_dir).map_err(|err| CleanupParticleError(err, vault_dir))?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
fn update_current_particle(&self, particle_id: String, init_user_id: String) {
|
fn update_current_particle(&self, particle_id: String, init_user_id: String) {
|
||||||
let mut params = self.current_particle.lock();
|
let mut params = self.current_particle.lock();
|
||||||
params.particle_id = particle_id;
|
params.particle_id = particle_id;
|
||||||
@ -160,13 +185,22 @@ fn prepare_args(
|
|||||||
fn call_service_descriptor(
|
fn call_service_descriptor(
|
||||||
params: Arc<Mutex<ParticleParameters>>,
|
params: Arc<Mutex<ParticleParameters>>,
|
||||||
call_service: CallServiceClosure,
|
call_service: CallServiceClosure,
|
||||||
|
vault_dir: PathBuf,
|
||||||
) -> HostImportDescriptor {
|
) -> HostImportDescriptor {
|
||||||
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
|
let call_service_closure: HostExportedFunc = Box::new(move |_, ivalues: Vec<IValue>| {
|
||||||
let params = {
|
let params = {
|
||||||
let lock = params.lock();
|
let lock = params.lock();
|
||||||
lock.deref().clone()
|
lock.deref().clone()
|
||||||
};
|
};
|
||||||
call_service(params, ivalues)
|
|
||||||
|
let create_vault = create_vault_effect(&vault_dir, ¶ms.particle_id);
|
||||||
|
|
||||||
|
let args = CallServiceArgs {
|
||||||
|
particle_parameters: params,
|
||||||
|
function_args: ivalues,
|
||||||
|
create_vault,
|
||||||
|
};
|
||||||
|
call_service(args)
|
||||||
});
|
});
|
||||||
|
|
||||||
HostImportDescriptor {
|
HostImportDescriptor {
|
||||||
|
28
avm/server/src/call_service.rs
Normal file
28
avm/server/src/call_service.rs
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 Fluence Labs Limited
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use crate::{AVMError, IValue, ParticleParameters};
|
||||||
|
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
pub type Effect<T> = Box<dyn Fn() -> Result<T, AVMError> + 'static>;
|
||||||
|
pub struct CallServiceArgs {
|
||||||
|
pub particle_parameters: ParticleParameters,
|
||||||
|
pub function_args: Vec<IValue>,
|
||||||
|
pub create_vault: Effect<PathBuf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type CallServiceClosure = Box<dyn Fn(CallServiceArgs) -> Option<IValue> + 'static>;
|
@ -14,14 +14,10 @@
|
|||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
use crate::avm::ParticleParameters;
|
use crate::CallServiceClosure;
|
||||||
use crate::IValue;
|
|
||||||
|
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
|
||||||
pub type CallServiceClosure =
|
|
||||||
Box<dyn Fn(ParticleParameters, Vec<IValue>) -> Option<IValue> + 'static>;
|
|
||||||
|
|
||||||
/// Describes behaviour of the AVM.
|
/// Describes behaviour of the AVM.
|
||||||
pub struct AVMConfig {
|
pub struct AVMConfig {
|
||||||
/// Path to a AIR interpreter Wasm file.
|
/// Path to a AIR interpreter Wasm file.
|
||||||
@ -37,6 +33,10 @@ pub struct AVMConfig {
|
|||||||
/// AVM uses it to store data obtained after interpreter execution, and load it as a prev_data by particle_id.
|
/// AVM uses it to store data obtained after interpreter execution, and load it as a prev_data by particle_id.
|
||||||
pub particle_data_store: PathBuf,
|
pub particle_data_store: PathBuf,
|
||||||
|
|
||||||
|
/// Path to a directory to store shared directories called Particle File Vault.
|
||||||
|
/// These directories are shared between services called in the span of a same particle execution.
|
||||||
|
pub vault_dir: PathBuf,
|
||||||
|
|
||||||
/// Mask used to filter logs, for details see `log_utf8_string` in fluence-faas.
|
/// Mask used to filter logs, for details see `log_utf8_string` in fluence-faas.
|
||||||
pub logging_mask: i32,
|
pub logging_mask: i32,
|
||||||
}
|
}
|
||||||
|
39
avm/server/src/data_store.rs
Normal file
39
avm/server/src/data_store.rs
Normal file
@ -0,0 +1,39 @@
|
|||||||
|
/*
|
||||||
|
* Copyright 2021 Fluence Labs Limited
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
use crate::call_service::Effect;
|
||||||
|
use crate::errors::AVMError::CreateVaultDirError;
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
pub fn create_vault_effect(vault_dir: &Path, particle_id: &str) -> Effect<PathBuf> {
|
||||||
|
let particle_vault_dir = particle_vault_dir(vault_dir, particle_id);
|
||||||
|
let closure = move || {
|
||||||
|
std::fs::create_dir_all(&particle_vault_dir)
|
||||||
|
.map_err(|err| CreateVaultDirError(err, particle_vault_dir.clone()))?;
|
||||||
|
Ok(particle_vault_dir.clone())
|
||||||
|
};
|
||||||
|
|
||||||
|
Box::new(closure)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn particle_vault_dir(vault_dir: &Path, particle_id: &str) -> PathBuf {
|
||||||
|
vault_dir.join(particle_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn prev_data_file(particle_data_store: &Path, particle_id: &str) -> PathBuf {
|
||||||
|
particle_data_store.join(particle_id)
|
||||||
|
}
|
@ -33,11 +33,19 @@ pub enum AVMError {
|
|||||||
|
|
||||||
/// I/O errors while persisting resulted data.
|
/// I/O errors while persisting resulted data.
|
||||||
#[error("an error occurred while saving prev data {0:?} by {1:?} path")]
|
#[error("an error occurred while saving prev data {0:?} by {1:?} path")]
|
||||||
PersistDataError(IOError, PathBuf),
|
PersistDataError(#[source] IOError, PathBuf),
|
||||||
|
|
||||||
/// Errors related to particle_data_store path from supplied config.
|
/// Errors related to particle_data_store path from supplied config.
|
||||||
#[error("an error occurred while creating data storage {0:?} by {1:?} path")]
|
#[error("an error occurred while creating data storage {0:?} by {1:?} path")]
|
||||||
InvalidDataStorePath(IOError, PathBuf),
|
InvalidDataStorePath(#[source] IOError, PathBuf),
|
||||||
|
|
||||||
|
/// Failed to create Particle File Vault directory (thrown inside Effect)
|
||||||
|
#[error("error creating Particle File Vault {1:?}: {0:?}")]
|
||||||
|
CreateVaultDirError(#[source] IOError, PathBuf),
|
||||||
|
|
||||||
|
/// Failed to remove particle directories (called by node after particle's ttl is expired)
|
||||||
|
#[error("error cleaning up particle directory {1:?}: {0:?}")]
|
||||||
|
CleanupParticleError(#[source] IOError, PathBuf),
|
||||||
|
|
||||||
/// Specified path to AIR interpreter .wasm file was invalid
|
/// Specified path to AIR interpreter .wasm file was invalid
|
||||||
#[error("path to AIR interpreter .wasm ({invalid_path:?}) is invalid: {reason}; IO Error: {io_error:?}")]
|
#[error("path to AIR interpreter .wasm ({invalid_path:?}) is invalid: {reason}; IO Error: {io_error:?}")]
|
||||||
|
@ -25,13 +25,16 @@
|
|||||||
)]
|
)]
|
||||||
|
|
||||||
mod avm;
|
mod avm;
|
||||||
|
mod call_service;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod data_store;
|
||||||
mod errors;
|
mod errors;
|
||||||
|
|
||||||
pub use avm::ParticleParameters;
|
pub use avm::ParticleParameters;
|
||||||
pub use avm::AVM;
|
pub use avm::AVM;
|
||||||
|
pub use call_service::CallServiceArgs;
|
||||||
|
pub use call_service::CallServiceClosure;
|
||||||
pub use config::AVMConfig;
|
pub use config::AVMConfig;
|
||||||
pub use config::CallServiceClosure;
|
|
||||||
pub use errors::AVMError;
|
pub use errors::AVMError;
|
||||||
|
|
||||||
// Re-exports
|
// Re-exports
|
||||||
|
@ -51,15 +51,16 @@ pub fn create_avm(call_service: CallServiceClosure, current_peer_id: impl Into<S
|
|||||||
air_wasm_path: PathBuf::from("../target/wasm32-wasi/debug/air_interpreter_server.wasm"),
|
air_wasm_path: PathBuf::from("../target/wasm32-wasi/debug/air_interpreter_server.wasm"),
|
||||||
call_service,
|
call_service,
|
||||||
current_peer_id: current_peer_id.into(),
|
current_peer_id: current_peer_id.into(),
|
||||||
|
vault_dir: tmp_dir.join("vault"),
|
||||||
particle_data_store: tmp_dir,
|
particle_data_store: tmp_dir,
|
||||||
logging_mask: i32::max_value(),
|
logging_mask: i32::MAX,
|
||||||
};
|
};
|
||||||
|
|
||||||
AVM::new(config).expect("vm should be created")
|
AVM::new(config).expect("vm should be created")
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn unit_call_service() -> CallServiceClosure {
|
pub fn unit_call_service() -> CallServiceClosure {
|
||||||
Box::new(|_, _| -> Option<IValue> {
|
Box::new(|_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![
|
NEVec::new(vec![
|
||||||
IValue::S32(0),
|
IValue::S32(0),
|
||||||
@ -71,8 +72,8 @@ pub fn unit_call_service() -> CallServiceClosure {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn echo_string_call_service() -> CallServiceClosure {
|
pub fn echo_string_call_service() -> CallServiceClosure {
|
||||||
Box::new(|_, args| -> Option<IValue> {
|
Box::new(|args| -> Option<IValue> {
|
||||||
let arg = match &args[2] {
|
let arg = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
@ -87,8 +88,8 @@ pub fn echo_string_call_service() -> CallServiceClosure {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn echo_number_call_service() -> CallServiceClosure {
|
pub fn echo_number_call_service() -> CallServiceClosure {
|
||||||
Box::new(|_, args| -> Option<IValue> {
|
Box::new(|args| -> Option<IValue> {
|
||||||
let arg = match &args[2] {
|
let arg = match &args.function_args[2] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
@ -103,7 +104,7 @@ pub fn echo_number_call_service() -> CallServiceClosure {
|
|||||||
|
|
||||||
pub fn set_variable_call_service(json: impl Into<String>) -> CallServiceClosure {
|
pub fn set_variable_call_service(json: impl Into<String>) -> CallServiceClosure {
|
||||||
let json = json.into();
|
let json = json.into();
|
||||||
Box::new(move |_, _| -> Option<IValue> {
|
Box::new(move |_| -> Option<IValue> {
|
||||||
Some(IValue::Record(
|
Some(IValue::Record(
|
||||||
NEVec::new(vec![IValue::S32(0), IValue::String(json.clone())]).unwrap(),
|
NEVec::new(vec![IValue::S32(0), IValue::String(json.clone())]).unwrap(),
|
||||||
))
|
))
|
||||||
@ -111,8 +112,8 @@ pub fn set_variable_call_service(json: impl Into<String>) -> CallServiceClosure
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn set_variables_call_service(ret_mapping: HashMap<String, String>) -> CallServiceClosure {
|
pub fn set_variables_call_service(ret_mapping: HashMap<String, String>) -> CallServiceClosure {
|
||||||
Box::new(move |_, args| -> Option<IValue> {
|
Box::new(move |args| -> Option<IValue> {
|
||||||
let arg_name = match &args[2] {
|
let arg_name = match &args.function_args[2] {
|
||||||
IValue::String(json_str) => {
|
IValue::String(json_str) => {
|
||||||
let json = serde_json::from_str(json_str).expect("a valid json");
|
let json = serde_json::from_str(json_str).expect("a valid json");
|
||||||
match json {
|
match json {
|
||||||
@ -140,8 +141,8 @@ pub fn set_variables_call_service(ret_mapping: HashMap<String, String>) -> CallS
|
|||||||
pub fn fallible_call_service(fallible_service_id: impl Into<String>) -> CallServiceClosure {
|
pub fn fallible_call_service(fallible_service_id: impl Into<String>) -> CallServiceClosure {
|
||||||
let fallible_service_id = fallible_service_id.into();
|
let fallible_service_id = fallible_service_id.into();
|
||||||
|
|
||||||
Box::new(move |_, args| -> Option<IValue> {
|
Box::new(move |args| -> Option<IValue> {
|
||||||
let builtin_service = match &args[0] {
|
let builtin_service = match &args.function_args[0] {
|
||||||
IValue::String(str) => str,
|
IValue::String(str) => str,
|
||||||
_ => unreachable!(),
|
_ => unreachable!(),
|
||||||
};
|
};
|
||||||
|
Reference in New Issue
Block a user