mirror of
https://github.com/fluencelabs/aquavm
synced 2025-04-25 07:12:16 +00:00
WIP Trait to pass multiple attributes
This commit is contained in:
parent
bdc645f344
commit
bcc91cbfde
@ -44,26 +44,24 @@ impl StreamMap {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_value(key: &(impl Into<JValue> + Serialize), value: ValueAggregate) -> Self {
|
||||
pub(crate) fn from_value(key: &(impl Into<JValue> + Serialize), value: &ValueAggregate) -> Self {
|
||||
let obj = StreamMap::from_key_value(key, value.result.as_ref());
|
||||
let value = ValueAggregate::new(obj, value.tetraplet.clone(), value.trace_pos);
|
||||
Self {
|
||||
stream: Stream::from_value(ValueAggregate::new(obj, value.tetraplet, value.trace_pos)),
|
||||
stream: Stream::from_value(value),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert(
|
||||
&mut self,
|
||||
key: &(impl Into<JValue> + Serialize),
|
||||
value: ValueAggregate,
|
||||
value: &ValueAggregate,
|
||||
generation: Generation,
|
||||
source: ValueSource,
|
||||
) -> ExecutionResult<GenerationIdx> {
|
||||
let obj = StreamMap::from_key_value(key, value.result.as_ref());
|
||||
self.stream.add_value(
|
||||
ValueAggregate::new(obj, value.tetraplet, value.trace_pos),
|
||||
generation,
|
||||
source,
|
||||
)
|
||||
let value = ValueAggregate::new(obj, value.tetraplet.clone(), value.trace_pos);
|
||||
self.stream.add_value(value, generation, source)
|
||||
}
|
||||
|
||||
pub(crate) fn compactify(self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
|
||||
@ -104,10 +102,8 @@ mod test {
|
||||
|
||||
let generation_idx = 0;
|
||||
let generation = Generation::Nth(generation_idx.into());
|
||||
let stream_map = StreamMap::from_value(
|
||||
&key.clone(),
|
||||
ValueAggregate::new(value.clone(), <_>::default(), 0.into()),
|
||||
);
|
||||
let value_aggregate: ValueAggregate = ValueAggregate::new(value.clone(), <_>::default(), 0.into());
|
||||
let stream_map = StreamMap::from_value(&key.clone(), &value_aggregate);
|
||||
|
||||
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
|
||||
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
|
||||
@ -116,10 +112,8 @@ mod test {
|
||||
assert_eq!(internal_stream_iter.next(), None);
|
||||
|
||||
let key = 42;
|
||||
let stream_map = StreamMap::from_value(
|
||||
&key.clone(),
|
||||
ValueAggregate::new(value.clone(), <_>::default(), 0.into()),
|
||||
);
|
||||
let value_aggregate = ValueAggregate::new(value.clone(), <_>::default(), 0.into());
|
||||
let stream_map = StreamMap::from_value(&key.clone(), &value_aggregate);
|
||||
|
||||
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
|
||||
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
|
||||
@ -134,13 +128,14 @@ mod test {
|
||||
let key12 = String::from("some_key");
|
||||
let value = Rc::new(obj.clone());
|
||||
let generation_idx = 0;
|
||||
let va = ValueAggregate::new(value.clone(), <_>::default(), 0.into());
|
||||
let mut stream_map = StreamMap::from_value(&key12, va.clone());
|
||||
let value_aggregate: ValueAggregate = ValueAggregate::new(value.clone(), <_>::default(), 0.into());
|
||||
let mut stream_map = StreamMap::from_value(&key12, &value_aggregate);
|
||||
let generation = Generation::Nth(generation_idx.into());
|
||||
let generation_idx_res = stream_map
|
||||
.insert(&key12.clone(), va.clone(), generation, ValueSource::CurrentData)
|
||||
.insert(&key12.clone(), &value_aggregate, generation, ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
assert_eq!(generation_idx_res, generation_idx);
|
||||
|
||||
let examplar = StreamMap::from_key_value(&key12.clone(), value.as_ref());
|
||||
let s = stream_map
|
||||
.stream
|
||||
@ -150,23 +145,27 @@ mod test {
|
||||
assert!(s);
|
||||
let key3 = "other_key";
|
||||
let generation_idx = stream_map
|
||||
.insert(&key3.clone(), va.clone(), generation, ValueSource::CurrentData)
|
||||
.insert(&key3.clone(), &value_aggregate, generation, ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
assert_eq!(generation_idx_res, generation_idx);
|
||||
|
||||
let key4 = 42;
|
||||
let generation_idx = stream_map
|
||||
.insert(&key4, va, generation, ValueSource::CurrentData)
|
||||
.insert(&key4, &value_aggregate, generation, ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
assert_eq!(generation_idx_res, generation_idx);
|
||||
|
||||
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
|
||||
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
||||
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
||||
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
|
||||
let examplar = StreamMap::from_key_value(&key3.clone(), value.as_ref());
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
||||
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
|
||||
let examplar = StreamMap::from_key_value(&key4, value.as_ref());
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
@ -37,6 +37,7 @@ use std::collections::HashMap;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
// TODO This module should be unified with its Stream counterpart.
|
||||
pub(crate) struct StreamMapValueDescriptor<'stream_name> {
|
||||
pub value: ValueAggregate,
|
||||
pub name: &'stream_name str,
|
||||
@ -161,7 +162,7 @@ impl StreamMaps {
|
||||
} = value_descriptor;
|
||||
|
||||
match self.get_mut(name, position) {
|
||||
Some(stream_map) => stream_map.insert(key, value, generation, source),
|
||||
Some(stream_map) => stream_map.insert(key, &value, generation, source),
|
||||
None => {
|
||||
// streams could be created in three ways:
|
||||
// - after met new instruction with stream name that isn't present in streams
|
||||
@ -170,7 +171,7 @@ impl StreamMaps {
|
||||
// for global streams
|
||||
// - and by this function, and if there is no such a streams in streams,
|
||||
// it means that a new global one should be created.
|
||||
let stream_map = StreamMap::from_value(key, value);
|
||||
let stream_map = StreamMap::from_value(key, &value);
|
||||
let descriptor = StreamMapDescriptor::global(stream_map);
|
||||
self.stream_maps.insert(name.to_string(), vec![descriptor]);
|
||||
let generation = 0;
|
||||
|
@ -14,9 +14,9 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
pub(crate) mod stream_descriptor;
|
||||
pub(crate) mod stream_value_descriptor;
|
||||
pub(crate) mod utils;
|
||||
mod stream_descriptor;
|
||||
mod stream_value_descriptor;
|
||||
mod utils;
|
||||
|
||||
use crate::execution_step::ExecutionResult;
|
||||
use crate::execution_step::Stream;
|
||||
|
@ -21,20 +21,20 @@ use air_parser::AirPos;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
pub(crate) struct StreamDescriptor {
|
||||
pub(crate) span: Span,
|
||||
pub(crate) stream: Stream,
|
||||
pub(super) struct StreamDescriptor {
|
||||
pub(super) span: Span,
|
||||
pub(super) stream: Stream,
|
||||
}
|
||||
|
||||
impl StreamDescriptor {
|
||||
pub(crate) fn global(stream: Stream) -> Self {
|
||||
pub(super) fn global(stream: Stream) -> Self {
|
||||
Self {
|
||||
span: Span::new(0.into(), usize::MAX.into()),
|
||||
stream,
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn restricted(stream: Stream, span: Span) -> Self {
|
||||
pub(super) fn restricted(stream: Stream, span: Span) -> Self {
|
||||
Self { span, stream }
|
||||
}
|
||||
}
|
||||
@ -45,7 +45,7 @@ impl fmt::Display for StreamDescriptor {
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn find_closest<'d>(
|
||||
pub(super) fn find_closest<'d>(
|
||||
descriptors: impl DoubleEndedIterator<Item = &'d StreamDescriptor>,
|
||||
position: AirPos,
|
||||
) -> Option<&'d Stream> {
|
||||
@ -59,7 +59,7 @@ pub(crate) fn find_closest<'d>(
|
||||
None
|
||||
}
|
||||
|
||||
pub(crate) fn find_closest_mut<'d>(
|
||||
pub(super) fn find_closest_mut<'d>(
|
||||
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamDescriptor>,
|
||||
position: AirPos,
|
||||
) -> Option<&'d mut Stream> {
|
||||
|
@ -21,7 +21,7 @@ use air_interpreter_data::GlobalStreamGens;
|
||||
|
||||
use std::collections::HashMap;
|
||||
|
||||
pub(crate) fn merge_global_streams(
|
||||
pub(super) fn merge_global_streams(
|
||||
previous_global_streams: GlobalStreamGens,
|
||||
current_global_streams: GlobalStreamGens,
|
||||
) -> HashMap<String, Vec<StreamDescriptor>> {
|
||||
|
@ -77,7 +77,6 @@ fn to_merger_ap_result(
|
||||
fn populate_context<'ctx>(
|
||||
ap_result: &ast::ApResult<'ctx>,
|
||||
merger_ap_result: &MergerApResult,
|
||||
// key_argument: Option<&ApArgument<'ctx>>,
|
||||
result: ValueAggregate,
|
||||
exec_ctx: &mut ExecutionCtx<'ctx>,
|
||||
) -> ExecutionResult<Option<GenerationIdx>> {
|
||||
|
@ -29,8 +29,6 @@ use crate::log_instruction;
|
||||
use crate::trace_to_exec_err;
|
||||
use crate::CatchableError;
|
||||
|
||||
// use crate::log_instruction;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_parser::ast::ApMap;
|
||||
use air_parser::ast::ApMapKey;
|
||||
|
@ -16,24 +16,19 @@
|
||||
|
||||
pub(crate) mod completeness_updater;
|
||||
mod stream_cursor;
|
||||
|
||||
use std::rc::Rc;
|
||||
pub(crate) mod stream_execute_helpers;
|
||||
|
||||
use super::fold::*;
|
||||
use super::fold_scalar::fold;
|
||||
use super::ExecutableInstruction;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::FoldStreamLikeIngredients;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::boxed_value::Stream;
|
||||
use crate::log_instruction;
|
||||
use crate::trace_to_exec_err;
|
||||
use air_parser::ast::Instruction;
|
||||
use air_parser::AirPos;
|
||||
use completeness_updater::FoldGenerationObserver;
|
||||
use stream_cursor::StreamCursor;
|
||||
|
||||
use air_parser::ast::FoldStream;
|
||||
use air_parser::AirPos;
|
||||
pub(crate) use stream_execute_helpers::execute_with_stream;
|
||||
|
||||
impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
@ -54,112 +49,11 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
||||
trace_ctx,
|
||||
get_mut_stream_from_context,
|
||||
get_stream_from_context,
|
||||
self.iterable.name,
|
||||
&self.iterable.position,
|
||||
(
|
||||
self,
|
||||
self.iterator.name,
|
||||
self.instruction.clone(),
|
||||
self.last_instruction.clone(),
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn execute_with_stream<'i>(
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
get_mut_stream: for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c mut Stream,
|
||||
get_stream: for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c Stream,
|
||||
iterable_name: &str,
|
||||
iterable_pos: &AirPos,
|
||||
fold_related_args: (
|
||||
&impl ToString,
|
||||
&'i str,
|
||||
Rc<Instruction<'i>>,
|
||||
Option<Rc<Instruction<'i>>>,
|
||||
),
|
||||
) -> ExecutionResult<()> {
|
||||
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
|
||||
|
||||
let fold_impl_to_str = fold_related_args.0;
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_impl_to_str)?;
|
||||
|
||||
let mut stream_cursor = StreamCursor::new();
|
||||
let mut stream_iterable = stream_cursor.construct_iterables(get_stream(iterable_name, *iterable_pos, exec_ctx));
|
||||
let mut observer = FoldGenerationObserver::new();
|
||||
// this cycle manages recursive streams
|
||||
while !stream_iterable.is_empty() {
|
||||
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
|
||||
// write operation to this stream to write to this new generation
|
||||
add_new_generation_if_non_empty(get_mut_stream(iterable_name, *iterable_pos, exec_ctx));
|
||||
let fold_related_args = (
|
||||
fold_related_args.0,
|
||||
fold_related_args.1,
|
||||
fold_related_args.2.clone(),
|
||||
fold_related_args.3.clone(),
|
||||
fold_id,
|
||||
);
|
||||
execute_iterations(stream_iterable, fold_related_args, &mut observer, exec_ctx, trace_ctx)?;
|
||||
|
||||
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
|
||||
// and likely that stream could be mutably borrowed in execute_iterations
|
||||
let stream = remove_new_generation_if_non_empty(get_mut_stream(iterable_name, *iterable_pos, exec_ctx));
|
||||
|
||||
stream_iterable = stream_cursor.construct_iterables(stream)
|
||||
}
|
||||
|
||||
observer.update_completeness(exec_ctx);
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), fold_impl_to_str)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executes fold iteration over all generation that stream had at the moment of call.
|
||||
/// It must return only uncatchable errors (such as ones from TraceHandler), though
|
||||
/// catchable errors are suppressed and not propagated from this function, because of determinism.
|
||||
/// The issue with determinism here lies in invariant that all previous executed states
|
||||
/// must be met.
|
||||
pub(super) fn execute_iterations<'i>(
|
||||
iterables: Vec<IterableValue>,
|
||||
fold_related_args: (
|
||||
&impl ToString,
|
||||
&'i str,
|
||||
Rc<Instruction<'i>>,
|
||||
Option<Rc<Instruction<'i>>>,
|
||||
u32,
|
||||
),
|
||||
generation_observer: &mut FoldGenerationObserver,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
let (fold_impl_to_str, fold_stream_it_name, fold_instruction, fold_last_instruction, fold_id) = fold_related_args;
|
||||
for iterable in iterables.into_iter() {
|
||||
let value = match iterable.peek() {
|
||||
Some(value) => value,
|
||||
// it's ok, because some generation level of a stream on some point inside execution
|
||||
// flow could contain zero values
|
||||
None => continue,
|
||||
};
|
||||
let value_pos = value.pos();
|
||||
trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos), fold_impl_to_str)?;
|
||||
let result = fold(
|
||||
iterable,
|
||||
IterableType::Stream(fold_id),
|
||||
fold_stream_it_name,
|
||||
fold_instruction.clone(),
|
||||
fold_last_instruction.clone(),
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
);
|
||||
throw_error_if_not_catchable(result)?;
|
||||
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_impl_to_str)?;
|
||||
|
||||
generation_observer.observe_completeness(exec_ctx.is_subgraph_complete());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
fn get_mut_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c mut ExecutionCtx<'_>) -> &'c mut Stream {
|
||||
exec_ctx.streams.get_mut(name, position).unwrap()
|
||||
@ -169,23 +63,26 @@ fn get_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c mut E
|
||||
exec_ctx.streams.get_mut(name, position).unwrap()
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) {
|
||||
stream.add_new_generation_if_non_empty();
|
||||
}
|
||||
impl<'i> FoldStreamLikeIngredients for FoldStream<'i> {
|
||||
type Item = air_parser::ast::Stream<'i>;
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream {
|
||||
stream.remove_last_generation_if_empty();
|
||||
stream
|
||||
}
|
||||
fn iterable_name(&self) -> &'i str {
|
||||
self.iterable.name
|
||||
}
|
||||
|
||||
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
|
||||
/// not deterministic.
|
||||
pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(error) if error.is_catchable() => Ok(()),
|
||||
error @ Err(_) => error,
|
||||
fn iterable_pos(&self) -> air_parser::AirPos {
|
||||
self.iterable.position
|
||||
}
|
||||
|
||||
fn iterator_name(&self) -> &'i str {
|
||||
self.iterator.name
|
||||
}
|
||||
|
||||
fn instruction(&self) -> std::rc::Rc<air_parser::ast::Instruction<'i>> {
|
||||
self.instruction.clone()
|
||||
}
|
||||
|
||||
fn last_instruction(&self) -> Option<std::rc::Rc<air_parser::ast::Instruction<'i>>> {
|
||||
self.last_instruction.clone()
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,137 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::completeness_updater::FoldGenerationObserver;
|
||||
use super::stream_cursor::StreamCursor;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::boxed_value::Stream;
|
||||
use crate::execution_step::instructions::fold::IterableType;
|
||||
use crate::execution_step::instructions::fold::IterableValue;
|
||||
use crate::execution_step::instructions::fold_scalar::fold;
|
||||
use crate::execution_step::instructions::FoldStreamLikeIngredients;
|
||||
use crate::trace_to_exec_err;
|
||||
use air_parser::AirPos;
|
||||
|
||||
type MutStreamGetterClosure = for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c mut Stream;
|
||||
type StreamGetterClosure = for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c Stream;
|
||||
|
||||
pub(crate) fn execute_with_stream<'i>(
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
get_mut_stream: MutStreamGetterClosure,
|
||||
get_stream: StreamGetterClosure,
|
||||
fold_stream_like: &'i (impl FoldStreamLikeIngredients + ToString),
|
||||
) -> ExecutionResult<()> {
|
||||
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
|
||||
|
||||
let iterable_name = fold_stream_like.iterable_name();
|
||||
let iterable_pos = fold_stream_like.iterable_pos();
|
||||
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_stream_like)?;
|
||||
|
||||
let mut stream_cursor = StreamCursor::new();
|
||||
let mut stream_iterable = stream_cursor.construct_iterables(get_stream(iterable_name, iterable_pos, exec_ctx));
|
||||
let mut observer = FoldGenerationObserver::new();
|
||||
// this cycle manages recursive streams
|
||||
while !stream_iterable.is_empty() {
|
||||
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
|
||||
// write operation to this stream to write to this new generation
|
||||
add_new_generation_if_non_empty(get_mut_stream(iterable_name, iterable_pos, exec_ctx));
|
||||
execute_iterations(
|
||||
stream_iterable,
|
||||
&mut observer,
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
fold_stream_like,
|
||||
fold_id,
|
||||
)?;
|
||||
|
||||
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
|
||||
// and likely that stream could be mutably borrowed in execute_iterations
|
||||
let stream = remove_new_generation_if_non_empty(get_mut_stream(iterable_name, iterable_pos, exec_ctx));
|
||||
|
||||
stream_iterable = stream_cursor.construct_iterables(stream)
|
||||
}
|
||||
|
||||
observer.update_completeness(exec_ctx);
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), fold_stream_like)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executes fold iteration over all generation that stream had at the moment of call.
|
||||
/// It must return only uncatchable errors (such as ones from TraceHandler), though
|
||||
/// catchable errors are suppressed and not propagated from this function, because of determinism.
|
||||
/// The issue with determinism here lies in invariant that all previous executed states
|
||||
/// must be met.
|
||||
pub(super) fn execute_iterations<'i>(
|
||||
iterables: Vec<IterableValue>,
|
||||
generation_observer: &mut FoldGenerationObserver,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
fold_stream_like: &'i (impl FoldStreamLikeIngredients + ToString),
|
||||
fold_id: u32,
|
||||
) -> ExecutionResult<()> {
|
||||
// let (fold_impl_to_str, fold_stream_it_name, fold_instruction, fold_last_instruction, fold_id) = fold_related_args;
|
||||
let iterator_name = fold_stream_like.iterator_name();
|
||||
for iterable in iterables.into_iter() {
|
||||
let value = match iterable.peek() {
|
||||
Some(value) => value,
|
||||
// it's ok, because some generation level of a stream on some point inside execution
|
||||
// flow could contain zero values
|
||||
None => continue,
|
||||
};
|
||||
let value_pos = value.pos();
|
||||
trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos), fold_stream_like)?;
|
||||
let result = fold(
|
||||
iterable,
|
||||
IterableType::Stream(fold_id),
|
||||
iterator_name,
|
||||
fold_stream_like.instruction(),
|
||||
fold_stream_like.last_instruction(),
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
);
|
||||
throw_error_if_not_catchable(result)?;
|
||||
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream_like)?;
|
||||
|
||||
generation_observer.observe_completeness(exec_ctx.is_subgraph_complete());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) {
|
||||
stream.add_new_generation_if_non_empty();
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
pub(super) fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream {
|
||||
stream.remove_last_generation_if_empty();
|
||||
stream
|
||||
}
|
||||
|
||||
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
|
||||
/// not deterministic.
|
||||
pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(error) if error.is_catchable() => Ok(()),
|
||||
error @ Err(_) => error,
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@
|
||||
use super::ExecutableInstruction;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::FoldStreamLikeIngredients;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::instructions::fold_stream::execute_with_stream;
|
||||
use crate::execution_step::Stream;
|
||||
@ -44,14 +45,7 @@ impl<'i> ExecutableInstruction<'i> for FoldStreamMap<'i> {
|
||||
trace_ctx,
|
||||
get_mut_stream_from_context,
|
||||
get_stream_from_context,
|
||||
self.iterable.name,
|
||||
&self.iterable.position,
|
||||
(
|
||||
self,
|
||||
self.iterator.name,
|
||||
self.instruction.clone(),
|
||||
self.last_instruction.clone(),
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
@ -69,3 +63,27 @@ fn get_mut_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c m
|
||||
fn get_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c mut ExecutionCtx<'_>) -> &'c Stream {
|
||||
exec_ctx.stream_maps.get_mut(name, position).unwrap().get_stream_ref()
|
||||
}
|
||||
|
||||
impl<'i> FoldStreamLikeIngredients for FoldStreamMap<'i> {
|
||||
type Item = air_parser::ast::StreamMap<'i>;
|
||||
|
||||
fn iterable_name(&self) -> &'i str {
|
||||
self.iterable.name
|
||||
}
|
||||
|
||||
fn iterable_pos(&self) -> air_parser::AirPos {
|
||||
self.iterable.position
|
||||
}
|
||||
|
||||
fn iterator_name(&self) -> &'i str {
|
||||
self.iterator.name
|
||||
}
|
||||
|
||||
fn instruction(&self) -> std::rc::Rc<air_parser::ast::Instruction<'_>> {
|
||||
self.instruction.clone()
|
||||
}
|
||||
|
||||
fn last_instruction(&self) -> Option<std::rc::Rc<air_parser::ast::Instruction<'_>>> {
|
||||
self.last_instruction.clone()
|
||||
}
|
||||
}
|
||||
|
@ -34,6 +34,8 @@ mod par;
|
||||
mod seq;
|
||||
mod xor;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
pub(crate) use call::triplet::resolve_peer_id_to_string;
|
||||
pub(crate) use fold::FoldState;
|
||||
|
||||
@ -142,6 +144,16 @@ macro_rules! log_instruction {
|
||||
};
|
||||
}
|
||||
|
||||
pub trait FoldStreamLikeIngredients {
|
||||
type Item;
|
||||
|
||||
fn iterable_name(&self) -> &str;
|
||||
fn iterable_pos(&self) -> air_parser::AirPos;
|
||||
fn iterator_name(&self) -> &str;
|
||||
fn instruction(&self) -> Rc<Instruction<'_>>;
|
||||
fn last_instruction(&self) -> Option<Rc<Instruction<'_>>>;
|
||||
}
|
||||
|
||||
/// This macro converts joinable errors to Ok and sets subgraph complete to false.
|
||||
#[macro_export]
|
||||
macro_rules! joinable {
|
||||
|
@ -433,8 +433,8 @@ fn ap_stream_map() {
|
||||
let script = f!(r#"
|
||||
(seq
|
||||
(seq
|
||||
(ap "{vm_1_peer_id}" "{service_name1}" %map)
|
||||
(ap "{vm_1_peer_id}" "{service_name2}" %map)
|
||||
(ap ("{vm_1_peer_id}" "{service_name1}") %map)
|
||||
(ap ("{vm_1_peer_id}" "{service_name2}") %map)
|
||||
)
|
||||
(fold %map i
|
||||
(seq
|
||||
|
@ -552,7 +552,7 @@ fn float_map_key_is_unsupported() {
|
||||
|
||||
let map_name = "%map";
|
||||
let join_stream_script = f!(r#"
|
||||
(ap 0.5 "serv1" %map)
|
||||
(ap (0.5 "serv1") %map)
|
||||
"#);
|
||||
|
||||
let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap();
|
||||
@ -570,7 +570,7 @@ fn unsupported_map_key_type() {
|
||||
let join_stream_script = f!(r#"
|
||||
(seq
|
||||
(ap "a" some)
|
||||
(ap some "serv1" %map)
|
||||
(ap (some "serv1") %map)
|
||||
)
|
||||
"#);
|
||||
|
||||
|
@ -62,7 +62,7 @@ pub struct Ap<'i> {
|
||||
pub result: ApResult<'i>,
|
||||
}
|
||||
|
||||
/// (ap key value %map)
|
||||
/// (ap (key value) %map)
|
||||
#[derive(Serialize, Debug, PartialEq)]
|
||||
pub struct ApMap<'i> {
|
||||
pub key: ApMapKey<'i>,
|
||||
|
@ -193,7 +193,7 @@ fn ap_with_stream_map() {
|
||||
let value = "some_string";
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap "{key_name}" "{value}" %stream_map)
|
||||
(ap ("{key_name}" "{value}") %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
@ -208,7 +208,7 @@ fn ap_with_stream_map() {
|
||||
// but populate_context will raise an error
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap {key_name} "{value}" %stream_map)
|
||||
(ap ({key_name} "{value}") %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
@ -224,7 +224,7 @@ fn ap_with_stream_map() {
|
||||
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap "{key_name}" {value} %stream_map)
|
||||
(ap ("{key_name}" {value}) %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
@ -239,7 +239,7 @@ fn ap_with_stream_map() {
|
||||
// but populate_context will raise an error
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap {key_name} {value} %stream_map)
|
||||
(ap ({key_name} {value}) %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
@ -256,7 +256,7 @@ fn ap_with_stream_map() {
|
||||
let key_name = 123;
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap {key_name} {value} %stream_map)
|
||||
(ap ({key_name} {value}) %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
|
@ -166,7 +166,7 @@ impl<'i> VariableValidator<'i> {
|
||||
ApMapKey::CanonStream(stream) => self.met_canon_stream(stream, span),
|
||||
ApMapKey::CanonStreamWithLambda(stream) => self.met_canon_stream_wl(stream, span),
|
||||
}
|
||||
self.met_variable_name(ap_map.map.name, span);
|
||||
self.met_variable_name_definition(ap_map.map.name, span);
|
||||
}
|
||||
|
||||
pub(super) fn finalize(self) -> Vec<ErrorRecovery<AirPos, Token<'i>, ParserError>> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user