From bcc91cbfde49e22739d8c9556bf635477bf61bcc Mon Sep 17 00:00:00 2001 From: Roman Nozdrin Date: Mon, 15 May 2023 22:16:20 +0000 Subject: [PATCH] WIP Trait to pass multiple attributes --- .../execution_step/boxed_value/stream_map.rs | 41 +++-- .../stream_maps_variables.rs | 5 +- .../execution_context/streams_variables.rs | 6 +- .../streams_variables/stream_descriptor.rs | 14 +- .../streams_variables/utils.rs | 2 +- air/src/execution_step/instructions/ap.rs | 1 - air/src/execution_step/instructions/ap_map.rs | 2 - .../instructions/fold_stream.rs | 151 +++--------------- .../fold_stream/stream_execute_helpers.rs | 137 ++++++++++++++++ .../instructions/fold_stream_map.rs | 34 +++- air/src/execution_step/instructions/mod.rs | 12 ++ air/tests/test_module/instructions/ap.rs | 4 +- .../negative_tests/execution_step.rs | 4 +- .../air-parser/src/ast/instructions.rs | 2 +- .../air-lib/air-parser/src/parser/tests/ap.rs | 10 +- .../air-parser/src/parser/validator.rs | 2 +- 16 files changed, 244 insertions(+), 183 deletions(-) create mode 100644 air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs diff --git a/air/src/execution_step/boxed_value/stream_map.rs b/air/src/execution_step/boxed_value/stream_map.rs index b7ec9bbc..89cc0b5b 100644 --- a/air/src/execution_step/boxed_value/stream_map.rs +++ b/air/src/execution_step/boxed_value/stream_map.rs @@ -44,26 +44,24 @@ impl StreamMap { } } - pub(crate) fn from_value(key: &(impl Into + Serialize), value: ValueAggregate) -> Self { + pub(crate) fn from_value(key: &(impl Into + Serialize), value: &ValueAggregate) -> Self { let obj = StreamMap::from_key_value(key, value.result.as_ref()); + let value = ValueAggregate::new(obj, value.tetraplet.clone(), value.trace_pos); Self { - stream: Stream::from_value(ValueAggregate::new(obj, value.tetraplet, value.trace_pos)), + stream: Stream::from_value(value), } } pub(crate) fn insert( &mut self, key: &(impl Into + Serialize), - value: ValueAggregate, + value: &ValueAggregate, generation: Generation, source: ValueSource, ) -> ExecutionResult { let obj = StreamMap::from_key_value(key, value.result.as_ref()); - self.stream.add_value( - ValueAggregate::new(obj, value.tetraplet, value.trace_pos), - generation, - source, - ) + let value = ValueAggregate::new(obj, value.tetraplet.clone(), value.trace_pos); + self.stream.add_value(value, generation, source) } pub(crate) fn compactify(self, trace_ctx: &mut TraceHandler) -> ExecutionResult { @@ -104,10 +102,8 @@ mod test { let generation_idx = 0; let generation = Generation::Nth(generation_idx.into()); - let stream_map = StreamMap::from_value( - &key.clone(), - ValueAggregate::new(value.clone(), <_>::default(), 0.into()), - ); + let value_aggregate: ValueAggregate = ValueAggregate::new(value.clone(), <_>::default(), 0.into()); + let stream_map = StreamMap::from_value(&key.clone(), &value_aggregate); let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap(); let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap(); @@ -116,10 +112,8 @@ mod test { assert_eq!(internal_stream_iter.next(), None); let key = 42; - let stream_map = StreamMap::from_value( - &key.clone(), - ValueAggregate::new(value.clone(), <_>::default(), 0.into()), - ); + let value_aggregate = ValueAggregate::new(value.clone(), <_>::default(), 0.into()); + let stream_map = StreamMap::from_value(&key.clone(), &value_aggregate); let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap(); let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap(); @@ -134,13 +128,14 @@ mod test { let key12 = String::from("some_key"); let value = Rc::new(obj.clone()); let generation_idx = 0; - let va = ValueAggregate::new(value.clone(), <_>::default(), 0.into()); - let mut stream_map = StreamMap::from_value(&key12, va.clone()); + let value_aggregate: ValueAggregate = ValueAggregate::new(value.clone(), <_>::default(), 0.into()); + let mut stream_map = StreamMap::from_value(&key12, &value_aggregate); let generation = Generation::Nth(generation_idx.into()); let generation_idx_res = stream_map - .insert(&key12.clone(), va.clone(), generation, ValueSource::CurrentData) + .insert(&key12.clone(), &value_aggregate, generation, ValueSource::CurrentData) .unwrap(); assert_eq!(generation_idx_res, generation_idx); + let examplar = StreamMap::from_key_value(&key12.clone(), value.as_ref()); let s = stream_map .stream @@ -150,23 +145,27 @@ mod test { assert!(s); let key3 = "other_key"; let generation_idx = stream_map - .insert(&key3.clone(), va.clone(), generation, ValueSource::CurrentData) + .insert(&key3.clone(), &value_aggregate, generation, ValueSource::CurrentData) .unwrap(); assert_eq!(generation_idx_res, generation_idx); + let key4 = 42; let generation_idx = stream_map - .insert(&key4, va, generation, ValueSource::CurrentData) + .insert(&key4, &value_aggregate, generation, ValueSource::CurrentData) .unwrap(); assert_eq!(generation_idx_res, generation_idx); let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap(); let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap(); assert_eq!(*v, *examplar.as_ref()); + let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap(); assert_eq!(*v, *examplar.as_ref()); + let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap(); let examplar = StreamMap::from_key_value(&key3.clone(), value.as_ref()); assert_eq!(*v, *examplar.as_ref()); + let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap(); let examplar = StreamMap::from_key_value(&key4, value.as_ref()); assert_eq!(*v, *examplar.as_ref()); diff --git a/air/src/execution_step/execution_context/stream_maps_variables.rs b/air/src/execution_step/execution_context/stream_maps_variables.rs index aee549e6..35f03b8b 100644 --- a/air/src/execution_step/execution_context/stream_maps_variables.rs +++ b/air/src/execution_step/execution_context/stream_maps_variables.rs @@ -37,6 +37,7 @@ use std::collections::HashMap; use std::fmt; +// TODO This module should be unified with its Stream counterpart. pub(crate) struct StreamMapValueDescriptor<'stream_name> { pub value: ValueAggregate, pub name: &'stream_name str, @@ -161,7 +162,7 @@ impl StreamMaps { } = value_descriptor; match self.get_mut(name, position) { - Some(stream_map) => stream_map.insert(key, value, generation, source), + Some(stream_map) => stream_map.insert(key, &value, generation, source), None => { // streams could be created in three ways: // - after met new instruction with stream name that isn't present in streams @@ -170,7 +171,7 @@ impl StreamMaps { // for global streams // - and by this function, and if there is no such a streams in streams, // it means that a new global one should be created. - let stream_map = StreamMap::from_value(key, value); + let stream_map = StreamMap::from_value(key, &value); let descriptor = StreamMapDescriptor::global(stream_map); self.stream_maps.insert(name.to_string(), vec![descriptor]); let generation = 0; diff --git a/air/src/execution_step/execution_context/streams_variables.rs b/air/src/execution_step/execution_context/streams_variables.rs index eab371a1..9f1d1a77 100644 --- a/air/src/execution_step/execution_context/streams_variables.rs +++ b/air/src/execution_step/execution_context/streams_variables.rs @@ -14,9 +14,9 @@ * limitations under the License. */ -pub(crate) mod stream_descriptor; -pub(crate) mod stream_value_descriptor; -pub(crate) mod utils; +mod stream_descriptor; +mod stream_value_descriptor; +mod utils; use crate::execution_step::ExecutionResult; use crate::execution_step::Stream; diff --git a/air/src/execution_step/execution_context/streams_variables/stream_descriptor.rs b/air/src/execution_step/execution_context/streams_variables/stream_descriptor.rs index 19b88587..e2390e97 100644 --- a/air/src/execution_step/execution_context/streams_variables/stream_descriptor.rs +++ b/air/src/execution_step/execution_context/streams_variables/stream_descriptor.rs @@ -21,20 +21,20 @@ use air_parser::AirPos; use std::fmt; -pub(crate) struct StreamDescriptor { - pub(crate) span: Span, - pub(crate) stream: Stream, +pub(super) struct StreamDescriptor { + pub(super) span: Span, + pub(super) stream: Stream, } impl StreamDescriptor { - pub(crate) fn global(stream: Stream) -> Self { + pub(super) fn global(stream: Stream) -> Self { Self { span: Span::new(0.into(), usize::MAX.into()), stream, } } - pub(crate) fn restricted(stream: Stream, span: Span) -> Self { + pub(super) fn restricted(stream: Stream, span: Span) -> Self { Self { span, stream } } } @@ -45,7 +45,7 @@ impl fmt::Display for StreamDescriptor { } } -pub(crate) fn find_closest<'d>( +pub(super) fn find_closest<'d>( descriptors: impl DoubleEndedIterator, position: AirPos, ) -> Option<&'d Stream> { @@ -59,7 +59,7 @@ pub(crate) fn find_closest<'d>( None } -pub(crate) fn find_closest_mut<'d>( +pub(super) fn find_closest_mut<'d>( descriptors: impl DoubleEndedIterator, position: AirPos, ) -> Option<&'d mut Stream> { diff --git a/air/src/execution_step/execution_context/streams_variables/utils.rs b/air/src/execution_step/execution_context/streams_variables/utils.rs index 6778d7bb..4cf6197f 100644 --- a/air/src/execution_step/execution_context/streams_variables/utils.rs +++ b/air/src/execution_step/execution_context/streams_variables/utils.rs @@ -21,7 +21,7 @@ use air_interpreter_data::GlobalStreamGens; use std::collections::HashMap; -pub(crate) fn merge_global_streams( +pub(super) fn merge_global_streams( previous_global_streams: GlobalStreamGens, current_global_streams: GlobalStreamGens, ) -> HashMap> { diff --git a/air/src/execution_step/instructions/ap.rs b/air/src/execution_step/instructions/ap.rs index a1099de5..43a2aff2 100644 --- a/air/src/execution_step/instructions/ap.rs +++ b/air/src/execution_step/instructions/ap.rs @@ -77,7 +77,6 @@ fn to_merger_ap_result( fn populate_context<'ctx>( ap_result: &ast::ApResult<'ctx>, merger_ap_result: &MergerApResult, - // key_argument: Option<&ApArgument<'ctx>>, result: ValueAggregate, exec_ctx: &mut ExecutionCtx<'ctx>, ) -> ExecutionResult> { diff --git a/air/src/execution_step/instructions/ap_map.rs b/air/src/execution_step/instructions/ap_map.rs index fd7d884b..37e483cd 100644 --- a/air/src/execution_step/instructions/ap_map.rs +++ b/air/src/execution_step/instructions/ap_map.rs @@ -29,8 +29,6 @@ use crate::log_instruction; use crate::trace_to_exec_err; use crate::CatchableError; -// use crate::log_instruction; - use air_interpreter_data::GenerationIdx; use air_parser::ast::ApMap; use air_parser::ast::ApMapKey; diff --git a/air/src/execution_step/instructions/fold_stream.rs b/air/src/execution_step/instructions/fold_stream.rs index dd433f57..b0e4aaf0 100644 --- a/air/src/execution_step/instructions/fold_stream.rs +++ b/air/src/execution_step/instructions/fold_stream.rs @@ -16,24 +16,19 @@ pub(crate) mod completeness_updater; mod stream_cursor; - -use std::rc::Rc; +pub(crate) mod stream_execute_helpers; use super::fold::*; -use super::fold_scalar::fold; use super::ExecutableInstruction; use super::ExecutionCtx; use super::ExecutionResult; +use super::FoldStreamLikeIngredients; use super::TraceHandler; use crate::execution_step::boxed_value::Stream; use crate::log_instruction; -use crate::trace_to_exec_err; -use air_parser::ast::Instruction; -use air_parser::AirPos; -use completeness_updater::FoldGenerationObserver; -use stream_cursor::StreamCursor; - use air_parser::ast::FoldStream; +use air_parser::AirPos; +pub(crate) use stream_execute_helpers::execute_with_stream; impl<'i> ExecutableInstruction<'i> for FoldStream<'i> { fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> { @@ -54,112 +49,11 @@ impl<'i> ExecutableInstruction<'i> for FoldStream<'i> { trace_ctx, get_mut_stream_from_context, get_stream_from_context, - self.iterable.name, - &self.iterable.position, - ( - self, - self.iterator.name, - self.instruction.clone(), - self.last_instruction.clone(), - ), + self, ) } } -pub(super) fn execute_with_stream<'i>( - exec_ctx: &mut ExecutionCtx<'i>, - trace_ctx: &mut TraceHandler, - get_mut_stream: for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c mut Stream, - get_stream: for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c Stream, - iterable_name: &str, - iterable_pos: &AirPos, - fold_related_args: ( - &impl ToString, - &'i str, - Rc>, - Option>>, - ), -) -> ExecutionResult<()> { - let fold_id = exec_ctx.tracker.fold.seen_stream_count; - - let fold_impl_to_str = fold_related_args.0; - trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_impl_to_str)?; - - let mut stream_cursor = StreamCursor::new(); - let mut stream_iterable = stream_cursor.construct_iterables(get_stream(iterable_name, *iterable_pos, exec_ctx)); - let mut observer = FoldGenerationObserver::new(); - // this cycle manages recursive streams - while !stream_iterable.is_empty() { - // add a new generation to made all consequence "new" (meaning that they are just executed on this peer) - // write operation to this stream to write to this new generation - add_new_generation_if_non_empty(get_mut_stream(iterable_name, *iterable_pos, exec_ctx)); - let fold_related_args = ( - fold_related_args.0, - fold_related_args.1, - fold_related_args.2.clone(), - fold_related_args.3.clone(), - fold_id, - ); - execute_iterations(stream_iterable, fold_related_args, &mut observer, exec_ctx, trace_ctx)?; - - // it's needed to get stream again, because RefCell allows only one mutable borrowing at time, - // and likely that stream could be mutably borrowed in execute_iterations - let stream = remove_new_generation_if_non_empty(get_mut_stream(iterable_name, *iterable_pos, exec_ctx)); - - stream_iterable = stream_cursor.construct_iterables(stream) - } - - observer.update_completeness(exec_ctx); - trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), fold_impl_to_str)?; - Ok(()) -} - -/// Executes fold iteration over all generation that stream had at the moment of call. -/// It must return only uncatchable errors (such as ones from TraceHandler), though -/// catchable errors are suppressed and not propagated from this function, because of determinism. -/// The issue with determinism here lies in invariant that all previous executed states -/// must be met. -pub(super) fn execute_iterations<'i>( - iterables: Vec, - fold_related_args: ( - &impl ToString, - &'i str, - Rc>, - Option>>, - u32, - ), - generation_observer: &mut FoldGenerationObserver, - exec_ctx: &mut ExecutionCtx<'i>, - trace_ctx: &mut TraceHandler, -) -> ExecutionResult<()> { - let (fold_impl_to_str, fold_stream_it_name, fold_instruction, fold_last_instruction, fold_id) = fold_related_args; - for iterable in iterables.into_iter() { - let value = match iterable.peek() { - Some(value) => value, - // it's ok, because some generation level of a stream on some point inside execution - // flow could contain zero values - None => continue, - }; - let value_pos = value.pos(); - trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos), fold_impl_to_str)?; - let result = fold( - iterable, - IterableType::Stream(fold_id), - fold_stream_it_name, - fold_instruction.clone(), - fold_last_instruction.clone(), - exec_ctx, - trace_ctx, - ); - throw_error_if_not_catchable(result)?; - trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_impl_to_str)?; - - generation_observer.observe_completeness(exec_ctx.is_subgraph_complete()); - } - - Ok(()) -} - /// Safety: this function should be called iff stream is present in context fn get_mut_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c mut ExecutionCtx<'_>) -> &'c mut Stream { exec_ctx.streams.get_mut(name, position).unwrap() @@ -169,23 +63,26 @@ fn get_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c mut E exec_ctx.streams.get_mut(name, position).unwrap() } -/// Safety: this function should be called iff stream is present in context -pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) { - stream.add_new_generation_if_non_empty(); -} +impl<'i> FoldStreamLikeIngredients for FoldStream<'i> { + type Item = air_parser::ast::Stream<'i>; -/// Safety: this function should be called iff stream is present in context -fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream { - stream.remove_last_generation_if_empty(); - stream -} + fn iterable_name(&self) -> &'i str { + self.iterable.name + } -/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be -/// not deterministic. -pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> { - match result { - Ok(_) => Ok(()), - Err(error) if error.is_catchable() => Ok(()), - error @ Err(_) => error, + fn iterable_pos(&self) -> air_parser::AirPos { + self.iterable.position + } + + fn iterator_name(&self) -> &'i str { + self.iterator.name + } + + fn instruction(&self) -> std::rc::Rc> { + self.instruction.clone() + } + + fn last_instruction(&self) -> Option>> { + self.last_instruction.clone() } } diff --git a/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs b/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs new file mode 100644 index 00000000..465a7901 --- /dev/null +++ b/air/src/execution_step/instructions/fold_stream/stream_execute_helpers.rs @@ -0,0 +1,137 @@ +/* + * Copyright 2023 Fluence Labs Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use super::completeness_updater::FoldGenerationObserver; +use super::stream_cursor::StreamCursor; +use super::ExecutionCtx; +use super::ExecutionResult; +use super::TraceHandler; +use crate::execution_step::boxed_value::Stream; +use crate::execution_step::instructions::fold::IterableType; +use crate::execution_step::instructions::fold::IterableValue; +use crate::execution_step::instructions::fold_scalar::fold; +use crate::execution_step::instructions::FoldStreamLikeIngredients; +use crate::trace_to_exec_err; +use air_parser::AirPos; + +type MutStreamGetterClosure = for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c mut Stream; +type StreamGetterClosure = for<'c> fn(&str, AirPos, &'c mut ExecutionCtx<'_>) -> &'c Stream; + +pub(crate) fn execute_with_stream<'i>( + exec_ctx: &mut ExecutionCtx<'i>, + trace_ctx: &mut TraceHandler, + get_mut_stream: MutStreamGetterClosure, + get_stream: StreamGetterClosure, + fold_stream_like: &'i (impl FoldStreamLikeIngredients + ToString), +) -> ExecutionResult<()> { + let fold_id = exec_ctx.tracker.fold.seen_stream_count; + + let iterable_name = fold_stream_like.iterable_name(); + let iterable_pos = fold_stream_like.iterable_pos(); + + trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_stream_like)?; + + let mut stream_cursor = StreamCursor::new(); + let mut stream_iterable = stream_cursor.construct_iterables(get_stream(iterable_name, iterable_pos, exec_ctx)); + let mut observer = FoldGenerationObserver::new(); + // this cycle manages recursive streams + while !stream_iterable.is_empty() { + // add a new generation to made all consequence "new" (meaning that they are just executed on this peer) + // write operation to this stream to write to this new generation + add_new_generation_if_non_empty(get_mut_stream(iterable_name, iterable_pos, exec_ctx)); + execute_iterations( + stream_iterable, + &mut observer, + exec_ctx, + trace_ctx, + fold_stream_like, + fold_id, + )?; + + // it's needed to get stream again, because RefCell allows only one mutable borrowing at time, + // and likely that stream could be mutably borrowed in execute_iterations + let stream = remove_new_generation_if_non_empty(get_mut_stream(iterable_name, iterable_pos, exec_ctx)); + + stream_iterable = stream_cursor.construct_iterables(stream) + } + + observer.update_completeness(exec_ctx); + trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), fold_stream_like)?; + Ok(()) +} + +/// Executes fold iteration over all generation that stream had at the moment of call. +/// It must return only uncatchable errors (such as ones from TraceHandler), though +/// catchable errors are suppressed and not propagated from this function, because of determinism. +/// The issue with determinism here lies in invariant that all previous executed states +/// must be met. +pub(super) fn execute_iterations<'i>( + iterables: Vec, + generation_observer: &mut FoldGenerationObserver, + exec_ctx: &mut ExecutionCtx<'i>, + trace_ctx: &mut TraceHandler, + fold_stream_like: &'i (impl FoldStreamLikeIngredients + ToString), + fold_id: u32, +) -> ExecutionResult<()> { + // let (fold_impl_to_str, fold_stream_it_name, fold_instruction, fold_last_instruction, fold_id) = fold_related_args; + let iterator_name = fold_stream_like.iterator_name(); + for iterable in iterables.into_iter() { + let value = match iterable.peek() { + Some(value) => value, + // it's ok, because some generation level of a stream on some point inside execution + // flow could contain zero values + None => continue, + }; + let value_pos = value.pos(); + trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos), fold_stream_like)?; + let result = fold( + iterable, + IterableType::Stream(fold_id), + iterator_name, + fold_stream_like.instruction(), + fold_stream_like.last_instruction(), + exec_ctx, + trace_ctx, + ); + throw_error_if_not_catchable(result)?; + trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream_like)?; + + generation_observer.observe_completeness(exec_ctx.is_subgraph_complete()); + } + + Ok(()) +} + +/// Safety: this function should be called iff stream is present in context +pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) { + stream.add_new_generation_if_non_empty(); +} + +/// Safety: this function should be called iff stream is present in context +pub(super) fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream { + stream.remove_last_generation_if_empty(); + stream +} + +/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be +/// not deterministic. +pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> { + match result { + Ok(_) => Ok(()), + Err(error) if error.is_catchable() => Ok(()), + error @ Err(_) => error, + } +} diff --git a/air/src/execution_step/instructions/fold_stream_map.rs b/air/src/execution_step/instructions/fold_stream_map.rs index 593a5c35..0a59682a 100644 --- a/air/src/execution_step/instructions/fold_stream_map.rs +++ b/air/src/execution_step/instructions/fold_stream_map.rs @@ -17,6 +17,7 @@ use super::ExecutableInstruction; use super::ExecutionCtx; use super::ExecutionResult; +use super::FoldStreamLikeIngredients; use super::TraceHandler; use crate::execution_step::instructions::fold_stream::execute_with_stream; use crate::execution_step::Stream; @@ -44,14 +45,7 @@ impl<'i> ExecutableInstruction<'i> for FoldStreamMap<'i> { trace_ctx, get_mut_stream_from_context, get_stream_from_context, - self.iterable.name, - &self.iterable.position, - ( - self, - self.iterator.name, - self.instruction.clone(), - self.last_instruction.clone(), - ), + self, ) } } @@ -69,3 +63,27 @@ fn get_mut_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c m fn get_stream_from_context<'c>(name: &str, position: AirPos, exec_ctx: &'c mut ExecutionCtx<'_>) -> &'c Stream { exec_ctx.stream_maps.get_mut(name, position).unwrap().get_stream_ref() } + +impl<'i> FoldStreamLikeIngredients for FoldStreamMap<'i> { + type Item = air_parser::ast::StreamMap<'i>; + + fn iterable_name(&self) -> &'i str { + self.iterable.name + } + + fn iterable_pos(&self) -> air_parser::AirPos { + self.iterable.position + } + + fn iterator_name(&self) -> &'i str { + self.iterator.name + } + + fn instruction(&self) -> std::rc::Rc> { + self.instruction.clone() + } + + fn last_instruction(&self) -> Option>> { + self.last_instruction.clone() + } +} diff --git a/air/src/execution_step/instructions/mod.rs b/air/src/execution_step/instructions/mod.rs index d96f553b..a7c1c495 100644 --- a/air/src/execution_step/instructions/mod.rs +++ b/air/src/execution_step/instructions/mod.rs @@ -34,6 +34,8 @@ mod par; mod seq; mod xor; +use std::rc::Rc; + pub(crate) use call::triplet::resolve_peer_id_to_string; pub(crate) use fold::FoldState; @@ -142,6 +144,16 @@ macro_rules! log_instruction { }; } +pub trait FoldStreamLikeIngredients { + type Item; + + fn iterable_name(&self) -> &str; + fn iterable_pos(&self) -> air_parser::AirPos; + fn iterator_name(&self) -> &str; + fn instruction(&self) -> Rc>; + fn last_instruction(&self) -> Option>>; +} + /// This macro converts joinable errors to Ok and sets subgraph complete to false. #[macro_export] macro_rules! joinable { diff --git a/air/tests/test_module/instructions/ap.rs b/air/tests/test_module/instructions/ap.rs index 6622ac26..759e3a4f 100644 --- a/air/tests/test_module/instructions/ap.rs +++ b/air/tests/test_module/instructions/ap.rs @@ -433,8 +433,8 @@ fn ap_stream_map() { let script = f!(r#" (seq (seq - (ap "{vm_1_peer_id}" "{service_name1}" %map) - (ap "{vm_1_peer_id}" "{service_name2}" %map) + (ap ("{vm_1_peer_id}" "{service_name1}") %map) + (ap ("{vm_1_peer_id}" "{service_name2}") %map) ) (fold %map i (seq diff --git a/air/tests/test_module/negative_tests/execution_step.rs b/air/tests/test_module/negative_tests/execution_step.rs index 460d9b6e..9137d109 100644 --- a/air/tests/test_module/negative_tests/execution_step.rs +++ b/air/tests/test_module/negative_tests/execution_step.rs @@ -552,7 +552,7 @@ fn float_map_key_is_unsupported() { let map_name = "%map"; let join_stream_script = f!(r#" - (ap 0.5 "serv1" %map) + (ap (0.5 "serv1") %map) "#); let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap(); @@ -570,7 +570,7 @@ fn unsupported_map_key_type() { let join_stream_script = f!(r#" (seq (ap "a" some) - (ap some "serv1" %map) + (ap (some "serv1") %map) ) "#); diff --git a/crates/air-lib/air-parser/src/ast/instructions.rs b/crates/air-lib/air-parser/src/ast/instructions.rs index d3b38571..f4bd5ae5 100644 --- a/crates/air-lib/air-parser/src/ast/instructions.rs +++ b/crates/air-lib/air-parser/src/ast/instructions.rs @@ -62,7 +62,7 @@ pub struct Ap<'i> { pub result: ApResult<'i>, } -/// (ap key value %map) +/// (ap (key value) %map) #[derive(Serialize, Debug, PartialEq)] pub struct ApMap<'i> { pub key: ApMapKey<'i>, diff --git a/crates/air-lib/air-parser/src/parser/tests/ap.rs b/crates/air-lib/air-parser/src/parser/tests/ap.rs index 7fe0c5ba..4cf033f8 100644 --- a/crates/air-lib/air-parser/src/parser/tests/ap.rs +++ b/crates/air-lib/air-parser/src/parser/tests/ap.rs @@ -193,7 +193,7 @@ fn ap_with_stream_map() { let value = "some_string"; let source_code = format!( r#" - (ap "{key_name}" "{value}" %stream_map) + (ap ("{key_name}" "{value}") %stream_map) "# ); let actual = parse(source_code.as_str()); @@ -208,7 +208,7 @@ fn ap_with_stream_map() { // but populate_context will raise an error let source_code = format!( r#" - (ap {key_name} "{value}" %stream_map) + (ap ({key_name} "{value}") %stream_map) "# ); let actual = parse(source_code.as_str()); @@ -224,7 +224,7 @@ fn ap_with_stream_map() { let source_code = format!( r#" - (ap "{key_name}" {value} %stream_map) + (ap ("{key_name}" {value}) %stream_map) "# ); let actual = parse(source_code.as_str()); @@ -239,7 +239,7 @@ fn ap_with_stream_map() { // but populate_context will raise an error let source_code = format!( r#" - (ap {key_name} {value} %stream_map) + (ap ({key_name} {value}) %stream_map) "# ); let actual = parse(source_code.as_str()); @@ -256,7 +256,7 @@ fn ap_with_stream_map() { let key_name = 123; let source_code = format!( r#" - (ap {key_name} {value} %stream_map) + (ap ({key_name} {value}) %stream_map) "# ); let actual = parse(source_code.as_str()); diff --git a/crates/air-lib/air-parser/src/parser/validator.rs b/crates/air-lib/air-parser/src/parser/validator.rs index ed0eb84a..9a7d85f0 100644 --- a/crates/air-lib/air-parser/src/parser/validator.rs +++ b/crates/air-lib/air-parser/src/parser/validator.rs @@ -166,7 +166,7 @@ impl<'i> VariableValidator<'i> { ApMapKey::CanonStream(stream) => self.met_canon_stream(stream, span), ApMapKey::CanonStreamWithLambda(stream) => self.met_canon_stream_wl(stream, span), } - self.met_variable_name(ap_map.map.name, span); + self.met_variable_name_definition(ap_map.map.name, span); } pub(super) fn finalize(self) -> Vec, ParserError>> {