feat(execution-engine): introduce resolver (#574)

This commit is contained in:
Mike Voronov 2023-04-12 16:09:12 +01:00 committed by GitHub
parent fc92309a6f
commit a66541de49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 181 additions and 478 deletions

View File

@ -31,8 +31,6 @@ use crate::JValue;
use crate::LambdaAST;
use crate::SecurityTetraplet;
pub(crate) use stream::StreamJvaluableIngredients;
use std::borrow::Cow;
/// Represent a value that could be transform to a JValue with or without tetraplets.

View File

@ -86,8 +86,9 @@ impl JValuable for StreamJvaluableIngredients<'_> {
}
}
use crate::execution_step::boxed_value::StreamIter;
use crate::execution_step::boxed_value::stream::StreamIter;
#[allow(dead_code)]
impl<'stream> StreamJvaluableIngredients<'stream> {
pub(crate) fn new(stream: &'stream Stream, generation: Generation) -> Self {
Self { stream, generation }

View File

@ -20,7 +20,6 @@ mod jvaluable;
mod scalar;
mod stream;
mod utils;
mod variable;
pub(crate) use canon_stream::*;
pub(crate) use iterable::*;
@ -29,8 +28,6 @@ pub(crate) use scalar::ScalarRef;
pub(crate) use scalar::ValueAggregate;
pub(crate) use stream::Generation;
pub(crate) use stream::Stream;
pub(crate) use stream::StreamIter;
pub(crate) use utils::populate_tetraplet_with_lambda;
pub(crate) use variable::Variable;
use super::ExecutionResult;

View File

@ -1,66 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::Generation;
use air_parser::{ast, AirPos};
#[derive(Clone, Copy, Debug)]
pub(crate) enum Variable<'i> {
Scalar {
name: &'i str,
},
#[allow(dead_code)] // it will be used in BoxedValues
Stream {
name: &'i str,
generation: Generation,
position: AirPos,
},
CanonStream {
name: &'i str,
},
}
impl<'i> Variable<'i> {
pub(crate) fn scalar(name: &'i str) -> Self {
Self::Scalar { name }
}
pub(crate) fn canon_stream(name: &'i str) -> Self {
Self::CanonStream { name }
}
}
impl<'i> From<&ast::ImmutableVariable<'i>> for Variable<'i> {
fn from(ast_variable: &ast::ImmutableVariable<'i>) -> Self {
use ast::ImmutableVariable::*;
match ast_variable {
Scalar(scalar) => Self::scalar(scalar.name),
CanonStream(canon_stream) => Self::canon_stream(canon_stream.name),
}
}
}
impl<'i> From<&ast::ImmutableVariableWithLambda<'i>> for Variable<'i> {
fn from(ast_variable: &ast::ImmutableVariableWithLambda<'i>) -> Self {
use ast::ImmutableVariableWithLambda::*;
match ast_variable {
Scalar(scalar) => Self::scalar(scalar.name),
CanonStream(canon_stream) => Self::canon_stream(canon_stream.name),
}
}
}

View File

@ -20,9 +20,7 @@ mod utils;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::Variable;
use crate::execution_step::instructions::ValueAggregate;
use crate::execution_step::resolver::apply_lambda;
use crate::log_instruction;
use crate::trace_to_exec_err;
use crate::JValue;

View File

@ -15,6 +15,7 @@
*/
use super::*;
use crate::execution_step::resolver::Resolvable;
use crate::execution_step::PEEK_ALLOWED_ON_NON_EMPTY;
use crate::UncatchableError;
@ -65,7 +66,7 @@ fn apply_last_error<'i>(
exec_ctx: &ExecutionCtx<'i>,
trace_ctx: &TraceHandler,
) -> ExecutionResult<ValueAggregate> {
let (value, mut tetraplets) = crate::execution_step::resolver::prepare_last_error(error_accessor, exec_ctx)?;
let (value, mut tetraplets) = error_accessor.resolve(exec_ctx)?;
let value = Rc::new(value);
// removing is safe because prepare_last_error always returns a vec with one element.
let tetraplet = tetraplets.remove(0);
@ -105,11 +106,9 @@ fn apply_scalar_wl(
exec_ctx: &ExecutionCtx<'_>,
trace_ctx: &TraceHandler,
) -> ExecutionResult<ValueAggregate> {
let variable = Variable::scalar(ast_scalar.name);
let (jvalue, tetraplet) = apply_lambda(variable, &ast_scalar.lambda, exec_ctx)?;
let tetraplet = Rc::new(tetraplet);
let (value, mut tetraplets) = ast_scalar.resolve(exec_ctx)?;
let position = trace_ctx.trace_pos().map_err(UncatchableError::from)?;
let result = ValueAggregate::new(Rc::new(jvalue), tetraplet, position);
let result = ValueAggregate::new(Rc::new(value), tetraplets.remove(0), position);
Ok(result)
}

View File

@ -20,6 +20,7 @@ use super::call_result_setter::*;
use super::prev_result_handler::*;
use super::triplet::resolve;
use super::*;
use crate::execution_step::resolver::Resolvable;
use crate::execution_step::RcSecurityTetraplet;
use crate::execution_step::RcSecurityTetraplets;
use crate::execution_step::UncatchableError;
@ -228,14 +229,12 @@ impl<'i> ResolvedCall<'i> {
&self,
exec_ctx: &ExecutionCtx<'i>,
) -> ExecutionResult<(Vec<serde_json::Value>, Vec<RcSecurityTetraplets>)> {
use crate::execution_step::resolver::resolve_to_args;
let function_args = self.function_arg_paths.iter();
let mut call_arguments = Vec::with_capacity(function_args.len());
let mut tetraplets = Vec::with_capacity(function_args.len());
for instruction_value in function_args {
let (arg, tetraplet) = resolve_to_args(instruction_value, exec_ctx)?;
let (arg, tetraplet) = instruction_value.resolve(exec_ctx)?;
call_arguments.push(arg);
tetraplets.push(tetraplet);
}

View File

@ -16,6 +16,7 @@
use super::ExecutionCtx;
use super::ExecutionResult;
use crate::execution_step::resolver::Resolvable;
use crate::execution_step::CatchableError;
use crate::JValue;
@ -48,18 +49,14 @@ pub(crate) fn resolve_peer_id_to_string<'i>(
value: &ast::ResolvableToPeerIdVariable<'i>,
exec_ctx: &ExecutionCtx<'i>,
) -> ExecutionResult<String> {
use crate::execution_step::resolver;
use ast::ResolvableToPeerIdVariable::*;
let ((jvalue, _), name) = match value {
InitPeerId => return Ok(exec_ctx.run_parameters.init_peer_id.to_string()),
Literal(value) => return Ok(value.to_string()),
Scalar(scalar) => (resolver::resolve_ast_scalar(scalar, exec_ctx)?, scalar.name),
ScalarWithLambda(scalar) => (resolver::resolve_ast_scalar_wl(scalar, exec_ctx)?, scalar.name),
CanonStreamWithLambda(canon_stream) => (
resolver::resolve_ast_canon_wl(canon_stream, exec_ctx)?,
canon_stream.name,
),
Scalar(scalar) => (scalar.resolve(exec_ctx)?, scalar.name),
ScalarWithLambda(scalar) => (scalar.resolve(exec_ctx)?, scalar.name),
CanonStreamWithLambda(canon_stream) => (canon_stream.resolve(exec_ctx)?, canon_stream.name),
};
try_jvalue_to_string(jvalue, name)
@ -72,17 +69,13 @@ pub(crate) fn resolve_to_string<'i>(
value: &ast::ResolvableToStringVariable<'i>,
exec_ctx: &ExecutionCtx<'i>,
) -> ExecutionResult<String> {
use crate::execution_step::resolver;
use ast::ResolvableToStringVariable::*;
let ((jvalue, _), name) = match value {
Literal(value) => return Ok(value.to_string()),
Scalar(scalar) => (resolver::resolve_ast_scalar(scalar, exec_ctx)?, scalar.name),
ScalarWithLambda(scalar) => (resolver::resolve_ast_scalar_wl(scalar, exec_ctx)?, scalar.name),
CanonStreamWithLambda(canon_stream) => (
resolver::resolve_ast_canon_wl(canon_stream, exec_ctx)?,
canon_stream.name,
),
Scalar(scalar) => (scalar.resolve(exec_ctx)?, scalar.name),
ScalarWithLambda(scalar) => (scalar.resolve(exec_ctx)?, scalar.name),
CanonStreamWithLambda(canon_stream) => (canon_stream.resolve(exec_ctx)?, canon_stream.name),
};
try_jvalue_to_string(jvalue, name)

View File

@ -1,183 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::execution_step::execution_context::ExecutionCtx;
use crate::execution_step::instructions::ExecutionResult;
use crate::execution_step::resolver::prepare_last_error;
use crate::execution_step::resolver::resolve_ast_variable;
use crate::execution_step::resolver::resolve_ast_variable_wl;
use crate::JValue;
use air_parser::ast;
use std::borrow::Cow;
#[tracing::instrument(skip_all)]
pub(crate) fn are_matchable_eq<'ctx>(
left: &ast::ImmutableValue<'_>,
right: &ast::ImmutableValue<'_>,
exec_ctx: &'ctx ExecutionCtx<'_>,
) -> ExecutionResult<bool> {
use ast::ImmutableValue::*;
match (left, right) {
// TODO: introduce traits to refactor this in boxed values
(InitPeerId, InitPeerId) => Ok(true),
(InitPeerId, matchable) | (matchable, InitPeerId) => compare_matchable(
matchable,
exec_ctx,
make_string_comparator(exec_ctx.run_parameters.init_peer_id.as_str()),
),
(LastError(error_accessor), matchable) | (matchable, LastError(error_accessor)) => {
let (value, _) = prepare_last_error(error_accessor, exec_ctx)?;
compare_matchable(matchable, exec_ctx, make_object_comparator(value))
}
(Literal(left_name), Literal(right_name)) => Ok(left_name == right_name),
(Literal(value), matchable) | (matchable, Literal(value)) => {
compare_matchable(matchable, exec_ctx, make_string_comparator(value))
}
(Timestamp, Timestamp) => Ok(true),
(Timestamp, matchable) | (matchable, Timestamp) => compare_matchable(
matchable,
exec_ctx,
make_object_comparator(JValue::Number(exec_ctx.run_parameters.timestamp.into())),
),
(TTL, TTL) => Ok(true),
(TTL, matchable) | (matchable, TTL) => compare_matchable(
matchable,
exec_ctx,
make_object_comparator(JValue::Number(exec_ctx.run_parameters.ttl.into())),
),
(EmptyArray, EmptyArray) => Ok(true),
(EmptyArray, matchable) | (matchable, EmptyArray) => {
compare_matchable(matchable, exec_ctx, make_object_comparator(JValue::Array(vec![])))
}
(Boolean(left_boolean), Boolean(right_boolean)) => Ok(left_boolean == right_boolean),
(Boolean(value), matchable) | (matchable, Boolean(value)) => {
compare_matchable(matchable, exec_ctx, make_object_comparator((*value).into()))
}
(Number(left_number), Number(right_number)) => Ok(left_number == right_number),
(Number(value), matchable) | (matchable, Number(value)) => {
compare_matchable(matchable, exec_ctx, make_object_comparator(value.into()))
}
(Variable(left_variable), Variable(right_variable)) => {
let (left_value, _) = resolve_ast_variable(left_variable, exec_ctx)?;
let (right_value, _) = resolve_ast_variable(right_variable, exec_ctx)?;
Ok(left_value == right_value)
}
(Variable(left_variable), VariableWithLambda(right_variable)) => {
let (left_value, _) = resolve_ast_variable(left_variable, exec_ctx)?;
let (right_value, _) = resolve_ast_variable_wl(right_variable, exec_ctx)?;
Ok(left_value == right_value)
}
(VariableWithLambda(left_variable), Variable(right_variable)) => {
let (left_value, _) = resolve_ast_variable_wl(left_variable, exec_ctx)?;
let (right_value, _) = resolve_ast_variable(right_variable, exec_ctx)?;
Ok(left_value == right_value)
}
(VariableWithLambda(left_variable), VariableWithLambda(right_variable)) => {
let (left_value, _) = resolve_ast_variable_wl(left_variable, exec_ctx)?;
let (right_value, _) = resolve_ast_variable_wl(right_variable, exec_ctx)?;
Ok(left_value == right_value)
}
}
}
type Comparator<'a> = Box<dyn Fn(Cow<'_, JValue>) -> bool + 'a>;
fn compare_matchable<'ctx>(
matchable: &ast::ImmutableValue<'_>,
exec_ctx: &'ctx ExecutionCtx<'_>,
comparator: Comparator<'ctx>,
) -> ExecutionResult<bool> {
use ast::ImmutableValue::*;
match matchable {
InitPeerId => {
let init_peer_id = exec_ctx.run_parameters.init_peer_id.clone();
let jvalue = init_peer_id.as_str().into();
Ok(comparator(Cow::Owned(jvalue)))
}
LastError(error_accessor) => {
let (jvalue, _) = prepare_last_error(error_accessor, exec_ctx)?;
Ok(comparator(Cow::Owned(jvalue)))
}
Literal(str) => {
let jvalue = str.to_string().into();
Ok(comparator(Cow::Owned(jvalue)))
}
Timestamp => {
let jvalue = exec_ctx.run_parameters.timestamp.into();
Ok(comparator(Cow::Owned(jvalue)))
}
TTL => {
let jvalue = exec_ctx.run_parameters.ttl.into();
Ok(comparator(Cow::Owned(jvalue)))
}
Number(number) => {
let jvalue = number.into();
Ok(comparator(Cow::Owned(jvalue)))
}
Boolean(bool) => {
let jvalue = (*bool).into();
Ok(comparator(Cow::Owned(jvalue)))
}
EmptyArray => {
let jvalue = JValue::Array(vec![]);
Ok(comparator(Cow::Owned(jvalue)))
}
Variable(variable) => {
let (jvalue, _) = resolve_ast_variable(variable, exec_ctx)?;
Ok(comparator(Cow::Owned(jvalue)))
}
VariableWithLambda(variable) => {
let (jvalue, _) = resolve_ast_variable_wl(variable, exec_ctx)?;
Ok(comparator(Cow::Owned(jvalue)))
}
}
}
fn make_string_comparator(comparable_string: &str) -> Comparator<'_> {
use std::ops::Deref;
Box::new(move |jvalue: Cow<'_, JValue>| -> bool {
match jvalue.deref() {
JValue::String(value) => value == comparable_string,
_ => false,
}
})
}
fn make_object_comparator(comparable_value: JValue) -> Comparator<'static> {
use std::ops::Deref;
Box::new(move |jvalue: Cow<'_, JValue>| -> bool { jvalue.deref() == &comparable_value })
}

View File

@ -14,6 +14,20 @@
* limitations under the License.
*/
mod comparator;
use crate::execution_step::resolver::Resolvable;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::ExecutionResult;
pub(super) use comparator::are_matchable_eq;
use air_parser::ast;
#[tracing::instrument(skip_all)]
pub(crate) fn are_matchable_eq<'ctx>(
left: &ast::ImmutableValue<'_>,
right: &ast::ImmutableValue<'_>,
exec_ctx: &'ctx ExecutionCtx<'_>,
) -> ExecutionResult<bool> {
let (left_value, _) = left.resolve(exec_ctx)?;
let (right_value, _) = right.resolve(exec_ctx)?;
Ok(left_value == right_value)
}

View File

@ -17,9 +17,8 @@
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::Variable;
use crate::execution_step::execution_context::check_error_object;
use crate::execution_step::resolver;
use crate::execution_step::resolver::Resolvable;
use crate::execution_step::CatchableError;
use crate::execution_step::LastError;
use crate::execution_step::RcSecurityTetraplet;
@ -52,7 +51,7 @@ impl<'i> super::ExecutableInstruction<'i> for Fail<'i> {
}
fn fail_with_scalar<'i>(scalar: &ast::Scalar<'i>, exec_ctx: &mut ExecutionCtx<'i>) -> ExecutionResult<()> {
let (value, mut tetraplet) = resolver::resolve_ast_scalar(scalar, exec_ctx)?;
let (value, mut tetraplet) = scalar.resolve(exec_ctx)?;
// tetraplets always have one element here and it'll be refactored after boxed value
let tetraplet = tetraplet.remove(0);
check_error_object(&value).map_err(CatchableError::InvalidLastErrorObjectError)?;
@ -61,7 +60,7 @@ fn fail_with_scalar<'i>(scalar: &ast::Scalar<'i>, exec_ctx: &mut ExecutionCtx<'i
}
fn fail_with_scalar_wl<'i>(scalar: &ast::ScalarWithLambda<'i>, exec_ctx: &mut ExecutionCtx<'i>) -> ExecutionResult<()> {
let (value, mut tetraplet) = resolver::resolve_ast_scalar_wl(scalar, exec_ctx)?;
let (value, mut tetraplet) = scalar.resolve(exec_ctx)?;
// tetraplets always have one element here and it'll be refactored after boxed value
let tetraplet = tetraplet.remove(0);
check_error_object(&value).map_err(CatchableError::InvalidLastErrorObjectError)?;
@ -92,13 +91,12 @@ fn fail_with_canon_stream(
ast_canon: &ast::CanonStreamWithLambda<'_>,
exec_ctx: &mut ExecutionCtx<'_>,
) -> ExecutionResult<()> {
let variable = Variable::CanonStream { name: ast_canon.name };
let (value, mut tetraplets) = ast_canon.resolve(exec_ctx)?;
let (value, tetraplet) = resolver::apply_lambda(variable, &ast_canon.lambda, exec_ctx)?;
// tetraplets always have one element here and it'll be refactored after boxed value
check_error_object(&value).map_err(CatchableError::InvalidLastErrorObjectError)?;
fail_with_error_object(exec_ctx, Rc::new(value), Some(Rc::new(tetraplet)))
fail_with_error_object(exec_ctx, Rc::new(value), Some(tetraplets.remove(0)))
}
fn fail_with_last_error(exec_ctx: &mut ExecutionCtx<'_>) -> ExecutionResult<()> {

View File

@ -14,8 +14,13 @@
* limitations under the License.
*/
mod resolve;
mod resolvable_impl;
pub(crate) use resolve::*;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::ExecutionResult;
use crate::execution_step::RcSecurityTetraplets;
use crate::JValue;
use super::RcSecurityTetraplets;
pub(crate) trait Resolvable {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)>;
}

View File

@ -0,0 +1,137 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::RcSecurityTetraplets;
use super::Resolvable;
use crate::execution_step::boxed_value::JValuable;
use crate::execution_step::execution_context::ExecutionCtx;
use crate::execution_step::lambda_applier::select_by_lambda_from_scalar;
use crate::execution_step::ExecutionResult;
use crate::JValue;
use crate::LambdaAST;
use crate::SecurityTetraplet;
use air_parser::ast;
use serde_json::json;
use std::rc::Rc;
/// Resolve value to called function arguments.
impl Resolvable for ast::ImmutableValue<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
use ast::ImmutableValue::*;
match self {
InitPeerId => resolve_const(ctx.run_parameters.init_peer_id.as_str(), ctx),
LastError(error_accessor) => error_accessor.resolve(ctx),
Literal(value) => resolve_const(value.to_string(), ctx),
Timestamp => resolve_const(ctx.run_parameters.timestamp, ctx),
TTL => resolve_const(ctx.run_parameters.ttl, ctx),
Boolean(value) => resolve_const(*value, ctx),
Number(value) => resolve_const(value, ctx),
EmptyArray => resolve_const(json!([]), ctx),
Variable(variable) => variable.resolve(ctx),
VariableWithLambda(variable) => variable.resolve(ctx),
}
}
}
pub(crate) fn resolve_const(
arg: impl Into<JValue>,
ctx: &ExecutionCtx<'_>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let jvalue = arg.into();
let tetraplet = SecurityTetraplet::literal_tetraplet(ctx.run_parameters.init_peer_id.as_ref());
let tetraplet = Rc::new(tetraplet);
Ok((jvalue, vec![tetraplet]))
}
impl Resolvable for Option<LambdaAST<'_>> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
use crate::LastError;
let LastError { error, tetraplet } = ctx.last_error();
let jvalue = match self {
Some(error_accessor) => select_by_lambda_from_scalar(error.as_ref(), error_accessor, ctx)?.into_owned(),
None => error.as_ref().clone(),
};
let tetraplets = match tetraplet {
Some(tetraplet) => vec![tetraplet.clone()],
None => {
let tetraplet = SecurityTetraplet::literal_tetraplet(ctx.run_parameters.init_peer_id.as_ref());
let tetraplet = Rc::new(tetraplet);
vec![tetraplet]
}
};
Ok((jvalue, tetraplets))
}
}
impl Resolvable for ast::Scalar<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let value = ctx.scalars.get_value(self.name)?.into_jvaluable();
let tetraplets = value.as_tetraplets();
Ok((value.into_jvalue(), tetraplets))
}
}
impl Resolvable for ast::CanonStream<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let value: &dyn JValuable = &ctx.scalars.get_canon_stream(self.name)?;
let tetraplets = value.as_tetraplets();
Ok((value.as_jvalue().into_owned(), tetraplets))
}
}
impl Resolvable for ast::ImmutableVariable<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
match self {
Self::Scalar(scalar) => scalar.resolve(ctx),
Self::CanonStream(canon_stream) => canon_stream.resolve(ctx),
}
}
}
impl Resolvable for ast::ScalarWithLambda<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let value = ctx.scalars.get_value(self.name)?.into_jvaluable();
let (value, tetraplet) = value.apply_lambda_with_tetraplets(&self.lambda, ctx)?;
let tetraplet = Rc::new(tetraplet);
Ok((value.into_owned(), vec![tetraplet]))
}
}
impl Resolvable for ast::CanonStreamWithLambda<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let value: &dyn JValuable = &ctx.scalars.get_canon_stream(self.name)?;
let (value, tetraplet) = value.apply_lambda_with_tetraplets(&self.lambda, ctx)?;
let tetraplet = Rc::new(tetraplet);
Ok((value.into_owned(), vec![tetraplet]))
}
}
impl Resolvable for ast::ImmutableVariableWithLambda<'_> {
fn resolve(&self, ctx: &ExecutionCtx<'_>) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
match self {
Self::Scalar(scalar) => scalar.resolve(ctx),
Self::CanonStream(canon_stream) => canon_stream.resolve(ctx),
}
}
}

View File

@ -1,187 +0,0 @@
/*
* Copyright 2020 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::RcSecurityTetraplets;
use crate::execution_step::boxed_value::JValuable;
use crate::execution_step::boxed_value::Variable;
use crate::execution_step::execution_context::ExecutionCtx;
use crate::execution_step::lambda_applier::select_by_lambda_from_scalar;
use crate::execution_step::ExecutionResult;
use crate::JValue;
use crate::LambdaAST;
use crate::SecurityTetraplet;
use air_parser::ast;
use serde_json::json;
use std::rc::Rc;
/// Resolve value to called function arguments.
pub(crate) fn resolve_to_args<'i>(
value: &ast::ImmutableValue<'i>,
ctx: &ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
use ast::ImmutableValue::*;
match value {
InitPeerId => prepare_const(ctx.run_parameters.init_peer_id.as_str(), ctx),
LastError(error_accessor) => prepare_last_error(error_accessor, ctx),
Literal(value) => prepare_const(value.to_string(), ctx),
Timestamp => prepare_const(ctx.run_parameters.timestamp, ctx),
TTL => prepare_const(ctx.run_parameters.ttl, ctx),
Boolean(value) => prepare_const(*value, ctx),
Number(value) => prepare_const(value, ctx),
EmptyArray => prepare_const(json!([]), ctx),
Variable(variable) => resolve_ast_variable(variable, ctx),
VariableWithLambda(variable) => resolve_ast_variable_wl(variable, ctx),
}
}
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn prepare_const(
arg: impl Into<JValue>,
ctx: &ExecutionCtx<'_>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let jvalue = arg.into();
let tetraplet = SecurityTetraplet::literal_tetraplet(ctx.run_parameters.init_peer_id.as_ref());
let tetraplet = Rc::new(tetraplet);
Ok((jvalue, vec![tetraplet]))
}
#[allow(clippy::unnecessary_wraps)]
pub(crate) fn prepare_last_error<'i>(
error_accessor: &Option<LambdaAST<'i>>,
ctx: &ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
use crate::LastError;
let LastError { error, tetraplet } = ctx.last_error();
let jvalue = match error_accessor {
Some(error_accessor) => select_by_lambda_from_scalar(error.as_ref(), error_accessor, ctx)?.into_owned(),
None => error.as_ref().clone(),
};
let tetraplets = match tetraplet {
Some(tetraplet) => vec![tetraplet.clone()],
None => {
let tetraplet = SecurityTetraplet::literal_tetraplet(ctx.run_parameters.init_peer_id.as_ref());
let tetraplet = Rc::new(tetraplet);
vec![tetraplet]
}
};
Ok((jvalue, tetraplets))
}
#[tracing::instrument(level = "trace", skip(ctx))]
pub(crate) fn resolve_variable<'ctx, 'i>(
variable: Variable<'_>,
ctx: &'ctx ExecutionCtx<'i>,
) -> ExecutionResult<Box<dyn JValuable + 'ctx>> {
use crate::execution_step::boxed_value::StreamJvaluableIngredients;
match variable {
Variable::Scalar { name, .. } => Ok(ctx.scalars.get_value(name)?.into_jvaluable()),
Variable::Stream {
name,
generation,
position,
} => {
match ctx.streams.get(name, position) {
Some(stream) => {
let ingredients = StreamJvaluableIngredients::new(stream, generation);
Ok(Box::new(ingredients))
}
// return an empty stream for not found stream
// here it ignores the join behaviour
None => Ok(Box::new(())),
}
}
Variable::CanonStream { name, .. } => {
let canon_stream = ctx.scalars.get_canon_stream(name)?;
Ok(Box::new(canon_stream))
}
}
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_variable<'ctx, 'i>(
ast_variable: &ast::ImmutableVariable<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let variable: Variable<'_> = ast_variable.into();
let value = resolve_variable(variable, exec_ctx)?;
let tetraplets = value.as_tetraplets();
Ok((value.into_jvalue(), tetraplets))
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_variable_wl<'ctx, 'i>(
ast_variable: &ast::ImmutableVariableWithLambda<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
let variable: Variable<'_> = ast_variable.into();
apply_lambda(variable, ast_variable.lambda(), exec_ctx).map(|(value, tetraplet)| {
let tetraplet = Rc::new(tetraplet);
(value, vec![tetraplet])
})
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_scalar<'ctx, 'i>(
ast_scalar: &ast::Scalar<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
// TODO: wrap lambda path with Rc to make this clone cheaper
let variable = ast::ImmutableVariable::Scalar(ast_scalar.clone());
resolve_ast_variable(&variable, exec_ctx)
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_scalar_wl<'ctx, 'i>(
ast_scalar: &ast::ScalarWithLambda<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
// TODO: wrap lambda path with Rc to make this clone cheaper
let variable = ast::ImmutableVariableWithLambda::Scalar(ast_scalar.clone());
resolve_ast_variable_wl(&variable, exec_ctx)
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn resolve_ast_canon_wl<'ctx, 'i>(
ast_canon: &ast::CanonStreamWithLambda<'_>,
exec_ctx: &'ctx ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, RcSecurityTetraplets)> {
// TODO: wrap lambda path with Rc to make this clone cheaper
let variable = ast::ImmutableVariableWithLambda::CanonStream(ast_canon.clone());
resolve_ast_variable_wl(&variable, exec_ctx)
}
#[tracing::instrument(level = "trace", skip(exec_ctx))]
pub(crate) fn apply_lambda<'i>(
variable: Variable<'_>,
lambda: &LambdaAST<'i>,
exec_ctx: &ExecutionCtx<'i>,
) -> ExecutionResult<(JValue, SecurityTetraplet)> {
let resolved = resolve_variable(variable, exec_ctx)?;
let (jvalue, tetraplet) = resolved.apply_lambda_with_tetraplets(lambda, exec_ctx)?;
// it's known that apply_lambda_with_tetraplets returns vec of one value
Ok((jvalue.into_owned(), tetraplet))
}