mirror of
https://github.com/fluencelabs/aquavm
synced 2025-04-24 23:02:16 +00:00
feat(execution-engine): Stream Map initial support [fixes VM-283,VM-284] (#592)
feat(execution-engine): StreamMap initial support for ap and new instructions [fixes VM-283,VM-284]
This commit is contained in:
parent
252d43b348
commit
9d7d34a452
@ -19,6 +19,7 @@ mod iterable;
|
||||
mod jvaluable;
|
||||
mod scalar;
|
||||
mod stream;
|
||||
mod stream_map;
|
||||
mod utils;
|
||||
|
||||
pub(crate) use canon_stream::*;
|
||||
@ -31,6 +32,7 @@ pub(crate) use scalar::ServiceResultAggregate;
|
||||
pub(crate) use scalar::ValueAggregate;
|
||||
pub(crate) use stream::Generation;
|
||||
pub(crate) use stream::Stream;
|
||||
pub(crate) use stream_map::StreamMap;
|
||||
pub(crate) use utils::populate_tetraplet_with_lambda;
|
||||
|
||||
use super::ExecutionResult;
|
||||
|
201
air/src/execution_step/boxed_value/stream_map.rs
Normal file
201
air/src/execution_step/boxed_value/stream_map.rs
Normal file
@ -0,0 +1,201 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::stream::*;
|
||||
use super::ValueAggregate;
|
||||
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
|
||||
use crate::execution_step::ExecutionResult;
|
||||
use crate::JValue;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_trace_handler::merger::ValueSource;
|
||||
use air_trace_handler::TraceHandler;
|
||||
|
||||
use serde_json::json;
|
||||
use std::rc::Rc;
|
||||
|
||||
fn from_key_value(key: StreamMapKey<'_>, value: &JValue) -> Rc<JValue> {
|
||||
Rc::new(json!({ "key": key, "value": value }))
|
||||
}
|
||||
|
||||
#[derive(Debug, Default, Clone)]
|
||||
pub struct StreamMap {
|
||||
stream: Stream,
|
||||
}
|
||||
|
||||
impl StreamMap {
|
||||
pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self {
|
||||
Self {
|
||||
stream: Stream::from_generations_count(previous_count, current_count),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn from_value(key: StreamMapKey<'_>, value: &ValueAggregate) -> Self {
|
||||
let obj = from_key_value(key, value.get_result());
|
||||
let value = ValueAggregate::new(
|
||||
obj,
|
||||
value.get_tetraplet(),
|
||||
value.get_trace_pos(),
|
||||
value.get_provenance(),
|
||||
);
|
||||
Self {
|
||||
stream: Stream::from_value(value),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn insert(
|
||||
&mut self,
|
||||
key: StreamMapKey<'_>,
|
||||
value: &ValueAggregate,
|
||||
generation: Generation,
|
||||
source: ValueSource,
|
||||
) -> ExecutionResult<GenerationIdx> {
|
||||
let obj = from_key_value(key, value.get_result());
|
||||
let value = ValueAggregate::new(
|
||||
obj,
|
||||
value.get_tetraplet(),
|
||||
value.get_trace_pos(),
|
||||
value.get_provenance(),
|
||||
);
|
||||
self.stream.add_value(value, generation, source)
|
||||
}
|
||||
|
||||
pub(crate) fn compactify(self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
|
||||
self.stream.compactify(trace_ctx)
|
||||
}
|
||||
|
||||
pub(crate) fn get_mut_stream_ref(&mut self) -> &mut Stream {
|
||||
&mut self.stream
|
||||
}
|
||||
}
|
||||
|
||||
use std::fmt;
|
||||
|
||||
impl fmt::Display for StreamMap {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
self.stream.fmt(f)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::Generation;
|
||||
use super::StreamMap;
|
||||
use crate::execution_step::boxed_value::stream_map::from_key_value;
|
||||
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
|
||||
use crate::execution_step::ValueAggregate;
|
||||
use air_trace_handler::merger::ValueSource;
|
||||
use serde_json::json;
|
||||
use std::borrow::Cow;
|
||||
use std::rc::Rc;
|
||||
|
||||
#[test]
|
||||
fn test_from_value() {
|
||||
let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]);
|
||||
let key_str = "some_key";
|
||||
let key = StreamMapKey::Str(Cow::Borrowed(key_str));
|
||||
let value = Rc::new(obj.clone());
|
||||
|
||||
let generation_idx = 0;
|
||||
let generation = Generation::Nth(generation_idx.into());
|
||||
let value_aggregate: ValueAggregate = ValueAggregate::new(
|
||||
value.clone(),
|
||||
<_>::default(),
|
||||
0.into(),
|
||||
air_interpreter_data::Provenance::literal(),
|
||||
);
|
||||
let stream_map = StreamMap::from_value(key.clone(), &value_aggregate);
|
||||
|
||||
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
|
||||
let v = internal_stream_iter.next().map(|e| e.get_result()).unwrap();
|
||||
let examplar = from_key_value(key, value.as_ref());
|
||||
assert_eq!(*v, examplar);
|
||||
assert_eq!(internal_stream_iter.next(), None);
|
||||
|
||||
let key = StreamMapKey::I64(42.into());
|
||||
let value_aggregate = ValueAggregate::new(
|
||||
value.clone(),
|
||||
<_>::default(),
|
||||
0.into(),
|
||||
air_interpreter_data::Provenance::literal(),
|
||||
);
|
||||
let stream_map = StreamMap::from_value(key.clone(), &value_aggregate);
|
||||
|
||||
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
|
||||
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
|
||||
let examplar = from_key_value(key, value.as_ref());
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
assert_eq!(internal_stream_iter.next(), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_insert() {
|
||||
let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]);
|
||||
let key_str = "some_key";
|
||||
let key12 = StreamMapKey::Str(Cow::Borrowed(key_str));
|
||||
let value = Rc::new(obj.clone());
|
||||
let generation_idx = 0;
|
||||
let value_aggregate: ValueAggregate = ValueAggregate::new(
|
||||
value.clone(),
|
||||
<_>::default(),
|
||||
0.into(),
|
||||
air_interpreter_data::Provenance::literal(),
|
||||
);
|
||||
let mut stream_map = StreamMap::from_value(key12.clone(), &value_aggregate);
|
||||
let generation = Generation::Nth(generation_idx.into());
|
||||
let generation_idx_res = stream_map
|
||||
.insert(key12.clone(), &value_aggregate, generation, ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
assert_eq!(generation_idx_res, generation_idx);
|
||||
|
||||
let examplar = from_key_value(key12, value.as_ref());
|
||||
let s = stream_map
|
||||
.stream
|
||||
.iter(generation)
|
||||
.unwrap()
|
||||
.all(|e| *e.get_result().as_ref() == *examplar.as_ref());
|
||||
assert!(s);
|
||||
|
||||
let key_str = "other_key";
|
||||
let key3 = StreamMapKey::Str(Cow::Borrowed(key_str));
|
||||
let generation_idx = stream_map
|
||||
.insert(key3.clone(), &value_aggregate, generation, ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
assert_eq!(generation_idx_res, generation_idx);
|
||||
|
||||
let key4 = StreamMapKey::I64(42.into());
|
||||
let generation_idx = stream_map
|
||||
.insert(key4.clone(), &value_aggregate, generation, ValueSource::CurrentData)
|
||||
.unwrap();
|
||||
assert_eq!(generation_idx_res, generation_idx);
|
||||
|
||||
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
|
||||
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
||||
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
||||
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
|
||||
let examplar = from_key_value(key3, value.as_ref());
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
|
||||
let v = internal_stream_iter.next().map(|e| e.get_result().as_ref()).unwrap();
|
||||
let examplar = from_key_value(key4, value.as_ref());
|
||||
assert_eq!(*v, *examplar.as_ref());
|
||||
assert_eq!(internal_stream_iter.next(), None);
|
||||
}
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
|
||||
use super::Joinable;
|
||||
use super::LastErrorAffectable;
|
||||
use crate::execution_step::execution_context::errors::StreamMapError;
|
||||
use crate::execution_step::execution_context::LastErrorObjectError;
|
||||
use crate::execution_step::lambda_applier::LambdaError;
|
||||
use crate::JValue;
|
||||
@ -93,6 +94,10 @@ pub enum CatchableError {
|
||||
variable_name: String,
|
||||
actual_value: JValue,
|
||||
},
|
||||
|
||||
/// Stream map related errors.
|
||||
#[error(transparent)]
|
||||
StreamMapError(#[from] StreamMapError),
|
||||
}
|
||||
|
||||
impl From<LambdaError> for Rc<CatchableError> {
|
||||
|
@ -18,6 +18,7 @@ use super::ExecutionCidState;
|
||||
use super::LastError;
|
||||
use super::LastErrorDescriptor;
|
||||
use super::Scalars;
|
||||
use super::StreamMaps;
|
||||
use super::Streams;
|
||||
|
||||
use air_execution_info_collector::InstructionTracker;
|
||||
@ -42,6 +43,9 @@ pub(crate) struct ExecutionCtx<'i> {
|
||||
/// Contains all streams.
|
||||
pub(crate) streams: Streams,
|
||||
|
||||
/// Contains all stream maps.
|
||||
pub(crate) stream_maps: StreamMaps,
|
||||
|
||||
/// Set of peer public keys that should receive resulted data.
|
||||
pub(crate) next_peer_pks: Vec<String>,
|
||||
|
||||
|
@ -18,6 +18,7 @@ mod cid_state;
|
||||
mod context;
|
||||
mod last_error;
|
||||
mod scalar_variables;
|
||||
mod stream_maps_variables;
|
||||
mod streams_variables;
|
||||
|
||||
pub use last_error::*;
|
||||
@ -25,4 +26,5 @@ pub use last_error::*;
|
||||
pub use cid_state::ExecutionCidState;
|
||||
pub(crate) use context::*;
|
||||
pub(crate) use scalar_variables::*;
|
||||
pub(crate) use stream_maps_variables::*;
|
||||
pub(crate) use streams_variables::*;
|
||||
|
@ -0,0 +1,261 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
pub(crate) mod errors;
|
||||
pub(crate) mod stream_map_key;
|
||||
|
||||
use self::stream_map_key::StreamMapKey;
|
||||
use crate::execution_step::boxed_value::StreamMap;
|
||||
use crate::execution_step::ExecutionResult;
|
||||
use crate::execution_step::Generation;
|
||||
use crate::execution_step::ValueAggregate;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_interpreter_data::RestrictedStreamGens;
|
||||
use air_interpreter_data::RestrictedStreamMapGens;
|
||||
use air_parser::ast::Span;
|
||||
use air_parser::AirPos;
|
||||
use air_trace_handler::merger::ValueSource;
|
||||
use air_trace_handler::TraceHandler;
|
||||
|
||||
use std::collections::hash_map::Entry::{Occupied, Vacant};
|
||||
use std::collections::HashMap;
|
||||
use std::fmt;
|
||||
|
||||
// TODO This module should be unified with its Stream counterpart.
|
||||
pub(crate) struct StreamMapValueDescriptor<'stream_name> {
|
||||
pub value: ValueAggregate,
|
||||
pub name: &'stream_name str,
|
||||
pub source: ValueSource,
|
||||
pub generation: Generation,
|
||||
pub position: AirPos,
|
||||
}
|
||||
|
||||
impl<'stream_name> StreamMapValueDescriptor<'stream_name> {
|
||||
pub fn new(
|
||||
value: ValueAggregate,
|
||||
name: &'stream_name str,
|
||||
source: ValueSource,
|
||||
generation: Generation,
|
||||
position: AirPos,
|
||||
) -> Self {
|
||||
Self {
|
||||
value,
|
||||
name,
|
||||
source,
|
||||
generation,
|
||||
position,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) struct StreamMapDescriptor {
|
||||
pub(super) span: Span,
|
||||
pub(super) stream_map: StreamMap,
|
||||
}
|
||||
|
||||
impl StreamMapDescriptor {
|
||||
pub(super) fn global(stream_map: StreamMap) -> Self {
|
||||
Self {
|
||||
span: Span::new(0.into(), usize::MAX.into()),
|
||||
stream_map,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn restricted(stream_map: StreamMap, span: Span) -> Self {
|
||||
Self { span, stream_map }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StreamMapDescriptor {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, " <{}> - <{}>: {}", self.span.left, self.span.right, self.stream_map)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn find_closest<'d>(
|
||||
descriptors: impl DoubleEndedIterator<Item = &'d StreamMapDescriptor>,
|
||||
position: AirPos,
|
||||
) -> Option<&'d StreamMap> {
|
||||
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
|
||||
descriptors
|
||||
.rev()
|
||||
.find(|d| d.span.contains_position(position))
|
||||
.map(|d| &d.stream_map)
|
||||
}
|
||||
|
||||
pub(super) fn find_closest_mut<'d>(
|
||||
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamMapDescriptor>,
|
||||
position: AirPos,
|
||||
) -> Option<&'d mut StreamMap> {
|
||||
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
|
||||
descriptors
|
||||
.rev()
|
||||
.find(|d| d.span.contains_position(position))
|
||||
.map(|d| &mut d.stream_map)
|
||||
}
|
||||
#[derive(Default)]
|
||||
pub(crate) struct StreamMaps {
|
||||
// this one is optimized for speed (not for memory), because it's unexpected
|
||||
// that a script could have a lot of new.
|
||||
// TODO: use shared string (Rc<String>) to avoid copying.
|
||||
stream_maps: HashMap<String, Vec<StreamMapDescriptor>>,
|
||||
|
||||
/// Contains stream generations from previous data that a restricted stream
|
||||
/// should have at the scope start.
|
||||
previous_restricted_stream_maps_gens: RestrictedStreamMapGens,
|
||||
|
||||
/// Contains stream generations from current data that a restricted stream
|
||||
/// should have at the scope start.
|
||||
current_restricted_stream_maps_gens: RestrictedStreamMapGens,
|
||||
|
||||
/// Contains stream generations that each private stream had at the scope end.
|
||||
/// Then it's placed into data
|
||||
new_restricted_stream_maps_gens: RestrictedStreamMapGens,
|
||||
}
|
||||
|
||||
impl StreamMaps {
|
||||
pub(crate) fn get(&self, name: &str, position: AirPos) -> Option<&StreamMap> {
|
||||
self.stream_maps
|
||||
.get(name)
|
||||
.and_then(|descriptors| find_closest(descriptors.iter(), position))
|
||||
}
|
||||
|
||||
pub(crate) fn get_mut(&mut self, name: &str, position: AirPos) -> Option<&mut StreamMap> {
|
||||
self.stream_maps
|
||||
.get_mut(name)
|
||||
.and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position))
|
||||
}
|
||||
|
||||
pub(crate) fn add_stream_map_value(
|
||||
&mut self,
|
||||
key: StreamMapKey<'_>,
|
||||
value_descriptor: StreamMapValueDescriptor<'_>,
|
||||
) -> ExecutionResult<GenerationIdx> {
|
||||
let StreamMapValueDescriptor {
|
||||
value,
|
||||
name,
|
||||
source,
|
||||
generation,
|
||||
position,
|
||||
} = value_descriptor;
|
||||
|
||||
match self.get_mut(name, position) {
|
||||
Some(stream_map) => stream_map.insert(key, &value, generation, source),
|
||||
None => {
|
||||
// streams could be created in three ways:
|
||||
// - after met new instruction with stream name that isn't present in streams
|
||||
// (it's the only way to create restricted streams)
|
||||
// - by calling add_global_stream with generation that come from data
|
||||
// for global streams
|
||||
// - and by this function, and if there is no such a streams in streams,
|
||||
// it means that a new global one should be created.
|
||||
let stream_map = StreamMap::from_value(key, &value);
|
||||
let descriptor = StreamMapDescriptor::global(stream_map);
|
||||
self.stream_maps.insert(name.to_string(), vec![descriptor]);
|
||||
let generation = 0;
|
||||
Ok(generation.into())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: usize) {
|
||||
let name = name.into();
|
||||
let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration);
|
||||
|
||||
let new_stream_map = StreamMap::from_generations_count(prev_gens_count, current_gens_count);
|
||||
let new_descriptor = StreamMapDescriptor::restricted(new_stream_map, span);
|
||||
match self.stream_maps.entry(name) {
|
||||
Occupied(mut entry) => {
|
||||
entry.get_mut().push(new_descriptor);
|
||||
}
|
||||
Vacant(entry) => {
|
||||
entry.insert(vec![new_descriptor]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn meet_scope_end(
|
||||
&mut self,
|
||||
name: String,
|
||||
position: AirPos,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
// unwraps are safe here because met_scope_end must be called after met_scope_start
|
||||
let stream_map_descriptors = self.stream_maps.get_mut(&name).unwrap();
|
||||
// delete a stream after exit from a scope
|
||||
let last_descriptor = stream_map_descriptors.pop().unwrap();
|
||||
if stream_map_descriptors.is_empty() {
|
||||
// streams should contain only non-empty stream embodiments
|
||||
self.stream_maps.remove(&name);
|
||||
}
|
||||
let gens_count = last_descriptor.stream_map.compactify(trace_ctx)?;
|
||||
|
||||
self.collect_stream_generation(name, position, gens_count);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn stream_generation_from_data(
|
||||
&self,
|
||||
name: &str,
|
||||
position: AirPos,
|
||||
iteration: usize,
|
||||
) -> (GenerationIdx, GenerationIdx) {
|
||||
let previous_generation =
|
||||
Self::restricted_stream_generation(&self.previous_restricted_stream_maps_gens, name, position, iteration)
|
||||
.unwrap_or_default();
|
||||
let current_generation =
|
||||
Self::restricted_stream_generation(&self.current_restricted_stream_maps_gens, name, position, iteration)
|
||||
.unwrap_or_default();
|
||||
|
||||
(previous_generation, current_generation)
|
||||
}
|
||||
|
||||
fn restricted_stream_generation(
|
||||
restricted_stream_maps_gens: &RestrictedStreamGens,
|
||||
name: &str,
|
||||
position: AirPos,
|
||||
iteration: usize,
|
||||
) -> Option<GenerationIdx> {
|
||||
restricted_stream_maps_gens
|
||||
.get(name)
|
||||
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
|
||||
.copied()
|
||||
}
|
||||
|
||||
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) {
|
||||
match self.new_restricted_stream_maps_gens.entry(name) {
|
||||
Occupied(mut streams) => streams.get_mut().entry(position).or_default().push(generation),
|
||||
Vacant(entry) => {
|
||||
let iterations = maplit::hashmap! {
|
||||
position => vec![generation],
|
||||
};
|
||||
entry.insert(iterations);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StreamMaps {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
for (name, descriptors) in self.stream_maps.iter() {
|
||||
if let Some(last_descriptor) = descriptors.last() {
|
||||
writeln!(f, "{name} => {last_descriptor}")?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
@ -0,0 +1,30 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use thiserror::Error as ThisError;
|
||||
|
||||
/// Describes errors related to applying lambdas to values.
|
||||
#[derive(Debug, Clone, ThisError)]
|
||||
pub enum StreamMapError {
|
||||
#[error("unsupported type for {variable_name} map's key")]
|
||||
UnsupportedMapKeyType { variable_name: String },
|
||||
}
|
||||
|
||||
pub fn unsupported_map_key_type(variable_name: &str) -> StreamMapError {
|
||||
StreamMapError::UnsupportedMapKeyType {
|
||||
variable_name: variable_name.to_string(),
|
||||
}
|
||||
}
|
@ -0,0 +1,72 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use crate::execution_step::execution_context::stream_maps_variables::errors::unsupported_map_key_type;
|
||||
use crate::CatchableError;
|
||||
use crate::ExecutionError;
|
||||
use crate::JValue;
|
||||
|
||||
use serde::Serialize;
|
||||
use std::borrow::Cow;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) enum StreamMapKey<'i> {
|
||||
Str(Cow<'i, str>),
|
||||
U64(u64),
|
||||
I64(i64),
|
||||
}
|
||||
|
||||
impl<'i> StreamMapKey<'i> {
|
||||
pub(crate) fn from_value(value: JValue, map_name: &str) -> Result<Self, ExecutionError> {
|
||||
match value {
|
||||
JValue::String(s) => Ok(StreamMapKey::Str(Cow::Owned(s))),
|
||||
JValue::Number(n) if n.is_i64() => Ok(StreamMapKey::I64(n.as_i64().unwrap())),
|
||||
JValue::Number(n) if n.is_u64() => Ok(StreamMapKey::U64(n.as_u64().unwrap())),
|
||||
_ => Err(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<i64> for StreamMapKey<'_> {
|
||||
fn from(value: i64) -> Self {
|
||||
StreamMapKey::I64(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<u64> for StreamMapKey<'_> {
|
||||
fn from(value: u64) -> Self {
|
||||
StreamMapKey::U64(value)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> From<&'i str> for StreamMapKey<'i> {
|
||||
fn from(value: &'i str) -> Self {
|
||||
StreamMapKey::Str(Cow::Borrowed(value))
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> Serialize for StreamMapKey<'i> {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
match self {
|
||||
StreamMapKey::Str(s) => serializer.serialize_str(s),
|
||||
StreamMapKey::U64(n) => serializer.serialize_u64(*n),
|
||||
StreamMapKey::I64(n) => serializer.serialize_i64(*n),
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,74 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use crate::execution_step::StreamMap;
|
||||
|
||||
use air_parser::ast::Span;
|
||||
use air_parser::AirPos;
|
||||
|
||||
use std::fmt;
|
||||
|
||||
pub(super) struct StreamMapDescriptor {
|
||||
pub(super) span: Span,
|
||||
pub(super) stream_map: StreamMap,
|
||||
}
|
||||
|
||||
impl StreamMapDescriptor {
|
||||
pub(super) fn global(stream_map: StreamMap) -> Self {
|
||||
Self {
|
||||
span: Span::new(0.into(), usize::MAX.into()),
|
||||
stream_map,
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn restricted(stream_map: StreamMap, span: Span) -> Self {
|
||||
Self { span, stream_map }
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StreamMapDescriptor {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, " <{}> - <{}>: {}", self.span.left, self.span.right, self.stream_map)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) fn find_closest<'d>(
|
||||
descriptors: impl DoubleEndedIterator<Item = &'d StreamMapDescriptor>,
|
||||
position: AirPos,
|
||||
) -> Option<&'d StreamMap> {
|
||||
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
|
||||
for descriptor in descriptors.rev() {
|
||||
if descriptor.span.contains_position(position) {
|
||||
return Some(&descriptor.stream_map);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
pub(super) fn find_closest_mut<'d>(
|
||||
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamMapDescriptor>,
|
||||
position: AirPos,
|
||||
) -> Option<&'d mut StreamMap> {
|
||||
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
|
||||
for descriptor in descriptors.rev() {
|
||||
if descriptor.span.contains_position(position) {
|
||||
return Some(&mut descriptor.stream_map);
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
@ -14,8 +14,8 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
mod apply_to_arguments;
|
||||
mod utils;
|
||||
pub(super) mod apply_to_arguments;
|
||||
pub(super) mod utils;
|
||||
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
@ -24,7 +24,8 @@ use crate::execution_step::ValueAggregate;
|
||||
use crate::log_instruction;
|
||||
use crate::trace_to_exec_err;
|
||||
use crate::JValue;
|
||||
use apply_to_arguments::*;
|
||||
|
||||
use apply_to_arguments::apply_to_arg;
|
||||
use utils::*;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
|
@ -23,7 +23,7 @@ use air_interpreter_data::Provenance;
|
||||
use air_lambda_parser::LambdaAST;
|
||||
use air_parser::ast;
|
||||
|
||||
pub(super) fn apply_to_arg(
|
||||
pub(crate) fn apply_to_arg(
|
||||
argument: &ast::ApArgument<'_>,
|
||||
exec_ctx: &ExecutionCtx<'_>,
|
||||
trace_ctx: &TraceHandler,
|
||||
|
@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use crate::execution_step::execution_context::StreamMapValueDescriptor;
|
||||
use crate::execution_step::execution_context::StreamValueDescriptor;
|
||||
use crate::execution_step::Generation;
|
||||
use crate::execution_step::ValueAggregate;
|
||||
@ -45,3 +46,28 @@ pub(super) fn generate_value_descriptor<'stream>(
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn generate_map_value_descriptor<'stream>(
|
||||
value: ValueAggregate,
|
||||
stream: &'stream ast::StreamMap<'_>,
|
||||
ap_result: &MergerApResult,
|
||||
) -> StreamMapValueDescriptor<'stream> {
|
||||
use air_trace_handler::merger::ValueSource;
|
||||
|
||||
match ap_result {
|
||||
MergerApResult::NotMet => StreamMapValueDescriptor::new(
|
||||
value,
|
||||
stream.name,
|
||||
ValueSource::PreviousData,
|
||||
Generation::Last,
|
||||
stream.position,
|
||||
),
|
||||
MergerApResult::Met(met_result) => StreamMapValueDescriptor::new(
|
||||
value,
|
||||
stream.name,
|
||||
met_result.value_source,
|
||||
Generation::Nth(met_result.generation),
|
||||
stream.position,
|
||||
),
|
||||
}
|
||||
}
|
||||
|
103
air/src/execution_step/instructions/ap_map.rs
Normal file
103
air/src/execution_step/instructions/ap_map.rs
Normal file
@ -0,0 +1,103 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::ap::utils::generate_map_value_descriptor;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::execution_context::stream_map_key::StreamMapKey;
|
||||
use crate::execution_step::instructions::ap::apply_to_arguments::apply_to_arg;
|
||||
use crate::execution_step::resolver::Resolvable;
|
||||
use crate::execution_step::ValueAggregate;
|
||||
use crate::log_instruction;
|
||||
use crate::trace_to_exec_err;
|
||||
use crate::unsupported_map_key_type;
|
||||
use crate::CatchableError;
|
||||
use crate::ExecutionError;
|
||||
|
||||
use air_interpreter_data::GenerationIdx;
|
||||
use air_parser::ast::ApMap;
|
||||
use air_parser::ast::ApMapKey;
|
||||
use air_parser::ast::Number;
|
||||
use air_parser::ast::StreamMap;
|
||||
use air_trace_handler::merger::MergerApResult;
|
||||
|
||||
impl<'i> super::ExecutableInstruction<'i> for ApMap<'i> {
|
||||
#[tracing::instrument(level = "debug", skip(exec_ctx, trace_ctx))]
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
log_instruction!(call, exec_ctx, trace_ctx);
|
||||
// this applying should be at the very beginning of this function,
|
||||
// because it's necessary to check argument lambda, for more details see
|
||||
// https://github.com/fluencelabs/aquavm/issues/216
|
||||
let result = apply_to_arg(&self.value, exec_ctx, trace_ctx, true)?;
|
||||
|
||||
let merger_ap_result = to_merger_ap_map_result(&self, trace_ctx)?;
|
||||
let key = resolve_if_needed(&self.key, exec_ctx, self.map.name)?;
|
||||
let generation = populate_context(key, &self.map, &merger_ap_result, result, exec_ctx)?;
|
||||
maybe_update_trace(generation, trace_ctx);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn to_merger_ap_map_result(instr: &impl ToString, trace_ctx: &mut TraceHandler) -> ExecutionResult<MergerApResult> {
|
||||
let merger_ap_result = trace_to_exec_err!(trace_ctx.meet_ap_start(), instr)?;
|
||||
Ok(merger_ap_result)
|
||||
}
|
||||
|
||||
fn populate_context<'ctx>(
|
||||
key: StreamMapKey<'ctx>,
|
||||
ap_map_result: &StreamMap<'ctx>,
|
||||
merger_ap_result: &MergerApResult,
|
||||
result: ValueAggregate,
|
||||
exec_ctx: &mut ExecutionCtx<'ctx>,
|
||||
) -> ExecutionResult<GenerationIdx> {
|
||||
let value_descriptor = generate_map_value_descriptor(result, ap_map_result, merger_ap_result);
|
||||
exec_ctx.stream_maps.add_stream_map_value(key, value_descriptor)
|
||||
}
|
||||
|
||||
fn resolve_if_needed<'ctx>(
|
||||
key: &ApMapKey<'ctx>,
|
||||
exec_ctx: &mut ExecutionCtx<'ctx>,
|
||||
map_name: &str,
|
||||
) -> Result<StreamMapKey<'ctx>, ExecutionError> {
|
||||
match key {
|
||||
&ApMapKey::Literal(s) => Ok(s.into()),
|
||||
ApMapKey::Number(n) => match n {
|
||||
&Number::Int(i) => Ok(i.into()),
|
||||
Number::Float(_) => Err(CatchableError::StreamMapError(unsupported_map_key_type(map_name)).into()),
|
||||
},
|
||||
ApMapKey::Scalar(s) => resolve(s, exec_ctx, map_name),
|
||||
ApMapKey::ScalarWithLambda(s) => resolve(s, exec_ctx, map_name),
|
||||
ApMapKey::CanonStreamWithLambda(c) => resolve(c, exec_ctx, map_name),
|
||||
}
|
||||
}
|
||||
|
||||
fn resolve<'ctx>(
|
||||
resolvable: &impl Resolvable,
|
||||
exec_ctx: &mut ExecutionCtx<'_>,
|
||||
map_name: &str,
|
||||
) -> Result<StreamMapKey<'ctx>, ExecutionError> {
|
||||
let (value, _, _) = resolvable.resolve(exec_ctx)?;
|
||||
StreamMapKey::from_value(value, map_name)
|
||||
}
|
||||
|
||||
fn maybe_update_trace(generation: GenerationIdx, trace_ctx: &mut TraceHandler) {
|
||||
use air_interpreter_data::ApResult;
|
||||
|
||||
let final_ap_result = ApResult::new(generation);
|
||||
trace_ctx.meet_ap_end(final_ap_result);
|
||||
}
|
@ -14,129 +14,45 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
mod completeness_updater;
|
||||
pub(super) mod completeness_updater;
|
||||
mod stream_cursor;
|
||||
pub(super) mod stream_execute_helpers;
|
||||
|
||||
use super::fold::*;
|
||||
use super::fold_scalar::fold;
|
||||
use super::ExecutableInstruction;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::boxed_value::Stream;
|
||||
use crate::execution_step::instructions::fold_stream::stream_execute_helpers::execute_with_stream;
|
||||
use crate::log_instruction;
|
||||
use crate::trace_to_exec_err;
|
||||
use completeness_updater::FoldGenerationObserver;
|
||||
use stream_cursor::StreamCursor;
|
||||
|
||||
use air_parser::ast;
|
||||
use air_parser::ast::FoldStream;
|
||||
|
||||
impl<'i> ExecutableInstruction<'i> for FoldStream<'i> {
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
log_instruction!(fold, exec_ctx, trace_ctx);
|
||||
exec_ctx.tracker.meet_fold_stream();
|
||||
|
||||
let iterable = &self.iterable;
|
||||
let stream = match exec_ctx.streams.get(iterable.name, iterable.position) {
|
||||
Some(stream) => stream,
|
||||
None => {
|
||||
// having empty streams means that it haven't been met yet, and it's needed to wait
|
||||
exec_ctx.make_subgraph_incomplete();
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), self)?;
|
||||
|
||||
let mut stream_cursor = StreamCursor::new();
|
||||
let mut stream_iterable = stream_cursor.construct_iterables(stream);
|
||||
let mut observer = FoldGenerationObserver::new();
|
||||
|
||||
// this cycle manages recursive streams
|
||||
while !stream_iterable.is_empty() {
|
||||
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
|
||||
// write operation to this stream to write to this new generation
|
||||
add_new_generation_if_non_empty(&self.iterable, exec_ctx);
|
||||
execute_iterations(stream_iterable, self, fold_id, &mut observer, exec_ctx, trace_ctx)?;
|
||||
|
||||
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
|
||||
// and likely that stream could be mutably borrowed in execute_iterations
|
||||
let stream = remove_new_generation_if_non_empty(&self.iterable, exec_ctx);
|
||||
|
||||
stream_iterable = stream_cursor.construct_iterables(stream)
|
||||
if exec_ctx.streams.get(iterable.name, iterable.position).is_none() {
|
||||
// having empty streams means that it haven't been met yet, and it's needed to wait
|
||||
exec_ctx.make_subgraph_incomplete();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
observer.update_completeness(exec_ctx);
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
let get_mut_stream: &dyn for<'ctx> Fn(&'ctx mut ExecutionCtx<'_>) -> &'ctx mut Stream =
|
||||
&|exec_ctx: &mut ExecutionCtx<'_>| -> &mut Stream {
|
||||
exec_ctx.streams.get_mut(iterable.name, iterable.position).unwrap()
|
||||
};
|
||||
|
||||
/// Executes fold iteration over all generation that stream had at the moment of call.
|
||||
/// It must return only uncatchable errors (such as ones from TraceHandler), though
|
||||
/// catchable errors are suppressed and not propagated from this function, because of determinism.
|
||||
/// The issue with determinism here lies in invariant that all previous executed states
|
||||
/// must be met.
|
||||
fn execute_iterations<'i>(
|
||||
iterables: Vec<IterableValue>,
|
||||
fold_stream: &FoldStream<'i>,
|
||||
fold_id: u32,
|
||||
generation_observer: &mut FoldGenerationObserver,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
for iterable in iterables.into_iter() {
|
||||
let value = match iterable.peek() {
|
||||
Some(value) => value,
|
||||
// it's ok, because some generation level of a stream on some point inside execution
|
||||
// flow could contain zero values
|
||||
None => continue,
|
||||
};
|
||||
|
||||
let value_pos = value.pos();
|
||||
trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos), fold_stream)?;
|
||||
let result = fold(
|
||||
iterable,
|
||||
IterableType::Stream(fold_id),
|
||||
fold_stream.iterator.name,
|
||||
fold_stream.instruction.clone(),
|
||||
fold_stream.last_instruction.clone(),
|
||||
execute_with_stream(
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
);
|
||||
throw_error_if_not_catchable(result)?;
|
||||
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream)?;
|
||||
|
||||
generation_observer.observe_completeness(exec_ctx.is_subgraph_complete());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
fn add_new_generation_if_non_empty(stream: &ast::Stream<'_>, exec_ctx: &mut ExecutionCtx<'_>) {
|
||||
let stream = exec_ctx.streams.get_mut(stream.name, stream.position).unwrap();
|
||||
stream.add_new_generation_if_non_empty();
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
fn remove_new_generation_if_non_empty<'ctx>(
|
||||
stream: &ast::Stream<'_>,
|
||||
exec_ctx: &'ctx mut ExecutionCtx<'_>,
|
||||
) -> &'ctx Stream {
|
||||
let stream = exec_ctx.streams.get_mut(stream.name, stream.position).unwrap();
|
||||
stream.remove_last_generation_if_empty();
|
||||
stream
|
||||
}
|
||||
|
||||
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
|
||||
/// not deterministic.
|
||||
fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(error) if error.is_catchable() => Ok(()),
|
||||
error @ Err(_) => error,
|
||||
get_mut_stream,
|
||||
self,
|
||||
self.iterator.name,
|
||||
self.instruction.clone(),
|
||||
self.last_instruction.clone(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,161 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::completeness_updater::FoldGenerationObserver;
|
||||
use super::stream_cursor::StreamCursor;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::boxed_value::Stream;
|
||||
use crate::execution_step::instructions::fold::IterableType;
|
||||
use crate::execution_step::instructions::fold::IterableValue;
|
||||
use crate::execution_step::instructions::fold_scalar::fold;
|
||||
use crate::trace_to_exec_err;
|
||||
|
||||
use air_parser::ast::Instruction;
|
||||
|
||||
use std::rc::Rc;
|
||||
|
||||
struct FoldStreamLikeIngredients<'i> {
|
||||
iterable_name: &'i str,
|
||||
instruction: Rc<Instruction<'i>>,
|
||||
last_instruction: Option<Rc<Instruction<'i>>>,
|
||||
fold_id: u32,
|
||||
}
|
||||
|
||||
impl<'i> FoldStreamLikeIngredients<'i> {
|
||||
fn new(
|
||||
iterable_name: &'i str,
|
||||
instruction: Rc<Instruction<'i>>,
|
||||
last_instruction: Option<Rc<Instruction<'i>>>,
|
||||
fold_id: u32,
|
||||
) -> Self {
|
||||
Self {
|
||||
iterable_name,
|
||||
instruction,
|
||||
last_instruction,
|
||||
fold_id,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn execute_with_stream<'i>(
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
get_mut_stream: impl for<'ctx> Fn(&'ctx mut ExecutionCtx<'_>) -> &'ctx mut Stream,
|
||||
fold_to_string: &impl ToString,
|
||||
iterable_name: &'i str,
|
||||
instruction: Rc<Instruction<'i>>,
|
||||
last_instruction: Option<Rc<Instruction<'i>>>,
|
||||
) -> ExecutionResult<()> {
|
||||
let fold_id = exec_ctx.tracker.meet_fold_stream();
|
||||
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), fold_to_string)?;
|
||||
|
||||
let mut stream_cursor = StreamCursor::new();
|
||||
let mut stream_iterable = stream_cursor.construct_iterables(get_mut_stream(exec_ctx));
|
||||
let mut observer = FoldGenerationObserver::new();
|
||||
// this cycle manages recursive streams
|
||||
while !stream_iterable.is_empty() {
|
||||
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
|
||||
// write operation to this stream to write to this new generation
|
||||
add_new_generation_if_non_empty(get_mut_stream(exec_ctx));
|
||||
let ingredients =
|
||||
FoldStreamLikeIngredients::new(iterable_name, instruction.clone(), last_instruction.clone(), fold_id);
|
||||
execute_iterations(
|
||||
stream_iterable,
|
||||
fold_to_string,
|
||||
ingredients,
|
||||
&mut observer,
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
)?;
|
||||
|
||||
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
|
||||
// and likely that stream could be mutably borrowed in execute_iterations
|
||||
let stream = remove_new_generation_if_non_empty(get_mut_stream(exec_ctx));
|
||||
|
||||
stream_iterable = stream_cursor.construct_iterables(stream)
|
||||
}
|
||||
|
||||
observer.update_completeness(exec_ctx);
|
||||
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), fold_to_string)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Executes fold iteration over all generation that stream had at the moment of call.
|
||||
/// It must return only uncatchable errors (such as ones from TraceHandler), though
|
||||
/// catchable errors are suppressed and not propagated from this function, because of determinism.
|
||||
/// The issue with determinism here lies in invariant that all previous executed states
|
||||
/// must be met.
|
||||
fn execute_iterations<'i>(
|
||||
iterables: Vec<IterableValue>,
|
||||
fold_to_string: &impl ToString,
|
||||
ingredients: FoldStreamLikeIngredients<'i>,
|
||||
generation_observer: &mut FoldGenerationObserver,
|
||||
exec_ctx: &mut ExecutionCtx<'i>,
|
||||
trace_ctx: &mut TraceHandler,
|
||||
) -> ExecutionResult<()> {
|
||||
for iterable in iterables {
|
||||
let value = match iterable.peek() {
|
||||
Some(value) => value,
|
||||
// it's ok, because some generation level of a stream on some point inside execution
|
||||
// flow could contain zero values
|
||||
None => continue,
|
||||
};
|
||||
let value_pos = value.pos();
|
||||
trace_to_exec_err!(
|
||||
trace_ctx.meet_iteration_start(ingredients.fold_id, value_pos),
|
||||
fold_to_string
|
||||
)?;
|
||||
let result = fold(
|
||||
iterable,
|
||||
IterableType::Stream(ingredients.fold_id),
|
||||
ingredients.iterable_name,
|
||||
ingredients.instruction.clone(),
|
||||
ingredients.last_instruction.clone(),
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
);
|
||||
throw_error_if_not_catchable(result)?;
|
||||
trace_to_exec_err!(trace_ctx.meet_generation_end(ingredients.fold_id), fold_to_string)?;
|
||||
|
||||
generation_observer.observe_completeness(exec_ctx.is_subgraph_complete());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
pub(super) fn add_new_generation_if_non_empty(stream: &mut Stream) {
|
||||
stream.add_new_generation_if_non_empty();
|
||||
}
|
||||
|
||||
/// Safety: this function should be called iff stream is present in context
|
||||
pub(super) fn remove_new_generation_if_non_empty(stream: &mut Stream) -> &Stream {
|
||||
stream.remove_last_generation_if_empty();
|
||||
stream
|
||||
}
|
||||
|
||||
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
|
||||
/// not deterministic.
|
||||
pub(super) fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(error) if error.is_catchable() => Ok(()),
|
||||
error @ Err(_) => error,
|
||||
}
|
||||
}
|
57
air/src/execution_step/instructions/fold_stream_map.rs
Normal file
57
air/src/execution_step/instructions/fold_stream_map.rs
Normal file
@ -0,0 +1,57 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use super::ExecutableInstruction;
|
||||
use super::ExecutionCtx;
|
||||
use super::ExecutionResult;
|
||||
use super::TraceHandler;
|
||||
use crate::execution_step::instructions::fold_stream::stream_execute_helpers::execute_with_stream;
|
||||
use crate::execution_step::Stream;
|
||||
use crate::log_instruction;
|
||||
|
||||
use air_parser::ast::FoldStreamMap;
|
||||
|
||||
impl<'i> ExecutableInstruction<'i> for FoldStreamMap<'i> {
|
||||
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
log_instruction!(fold, exec_ctx, trace_ctx);
|
||||
|
||||
let iterable = &self.iterable;
|
||||
if exec_ctx.stream_maps.get(iterable.name, iterable.position).is_none() {
|
||||
// having empty streams means that it haven't been met yet, and it's needed to wait
|
||||
exec_ctx.make_subgraph_incomplete();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let get_mut_stream: &dyn for<'ctx> Fn(&'ctx mut ExecutionCtx<'_>) -> &'ctx mut Stream =
|
||||
&|exec_ctx: &mut ExecutionCtx<'_>| -> &mut Stream {
|
||||
exec_ctx
|
||||
.stream_maps
|
||||
.get_mut(iterable.name, iterable.position)
|
||||
.unwrap()
|
||||
.get_mut_stream_ref()
|
||||
};
|
||||
|
||||
execute_with_stream(
|
||||
exec_ctx,
|
||||
trace_ctx,
|
||||
get_mut_stream,
|
||||
self,
|
||||
self.iterator.name,
|
||||
self.instruction.clone(),
|
||||
self.last_instruction.clone(),
|
||||
)
|
||||
}
|
||||
}
|
@ -15,6 +15,7 @@
|
||||
*/
|
||||
|
||||
mod ap;
|
||||
mod ap_map;
|
||||
mod call;
|
||||
mod canon;
|
||||
mod compare_matchable;
|
||||
@ -22,6 +23,7 @@ mod fail;
|
||||
mod fold;
|
||||
mod fold_scalar;
|
||||
mod fold_stream;
|
||||
mod fold_stream_map;
|
||||
mod match_;
|
||||
mod mismatch;
|
||||
mod never;
|
||||
@ -76,10 +78,12 @@ impl<'i> ExecutableInstruction<'i> for Instruction<'i> {
|
||||
Instruction::Call(call) => call.execute(exec_ctx, trace_ctx),
|
||||
|
||||
Instruction::Ap(ap) => execute!(self, ap, exec_ctx, trace_ctx),
|
||||
Instruction::ApMap(ap_map) => execute!(self, ap_map, exec_ctx, trace_ctx),
|
||||
Instruction::Canon(canon) => execute!(self, canon, exec_ctx, trace_ctx),
|
||||
Instruction::Fail(fail) => execute!(self, fail, exec_ctx, trace_ctx),
|
||||
Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx),
|
||||
Instruction::FoldStream(fold) => execute!(self, fold, exec_ctx, trace_ctx),
|
||||
Instruction::FoldStreamMap(fold) => execute!(self, fold, exec_ctx, trace_ctx),
|
||||
Instruction::Never(never) => execute!(self, never, exec_ctx, trace_ctx),
|
||||
Instruction::New(new) => execute!(self, new, exec_ctx, trace_ctx),
|
||||
Instruction::Next(next) => execute!(self, next, exec_ctx, trace_ctx),
|
||||
|
@ -56,6 +56,13 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
|
||||
let iteration = usize::try_from(iteration).unwrap();
|
||||
exec_ctx.streams.meet_scope_start(stream.name, new.span, iteration);
|
||||
}
|
||||
NewArgument::StreamMap(stream_map) => {
|
||||
let iteration = exec_ctx.tracker.new_tracker.get_iteration(position);
|
||||
let iteration = usize::try_from(iteration).unwrap();
|
||||
exec_ctx
|
||||
.stream_maps
|
||||
.meet_scope_start(stream_map.name, new.span, iteration);
|
||||
}
|
||||
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_start_scalar(scalar.name.to_string()),
|
||||
NewArgument::CanonStream(canon_stream) => exec_ctx
|
||||
.scalars
|
||||
@ -68,11 +75,13 @@ fn prolog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>) {
|
||||
fn epilog<'i>(new: &New<'i>, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
|
||||
let position = new.span.left;
|
||||
match &new.argument {
|
||||
NewArgument::Stream(stream) => {
|
||||
NewArgument::Stream(stream) => exec_ctx
|
||||
.streams
|
||||
.meet_scope_end(stream.name.to_string(), position, trace_ctx),
|
||||
NewArgument::StreamMap(stream_map) => {
|
||||
exec_ctx
|
||||
.streams
|
||||
.meet_scope_end(stream.name.to_string(), position, trace_ctx)?;
|
||||
Ok(())
|
||||
.stream_maps
|
||||
.meet_scope_end(stream_map.name.to_string(), position, trace_ctx)
|
||||
}
|
||||
NewArgument::Scalar(scalar) => exec_ctx.scalars.meet_new_end_scalar(scalar.name),
|
||||
NewArgument::CanonStream(canon_stream) => exec_ctx.scalars.meet_new_end_canon_stream(canon_stream.name),
|
||||
|
@ -35,6 +35,8 @@ mod utils;
|
||||
pub use air_interpreter_interface::InterpreterOutcome;
|
||||
pub use air_interpreter_interface::RunParameters;
|
||||
pub use air_interpreter_interface::INTERPRETER_SUCCESS;
|
||||
pub use execution_step::execution_context::errors::unsupported_map_key_type;
|
||||
pub use execution_step::execution_context::errors::StreamMapError;
|
||||
pub use execution_step::execution_context::ExecutionCidState;
|
||||
pub use execution_step::execution_context::LastError;
|
||||
pub use execution_step::CatchableError;
|
||||
|
@ -14,6 +14,7 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use air::ExecutionCidState;
|
||||
use air_test_utils::prelude::*;
|
||||
|
||||
use std::cell::RefCell;
|
||||
@ -456,3 +457,67 @@ fn ap_canon_stream() {
|
||||
let expected_tetraplet = RefCell::new(vec![vec![SecurityTetraplet::new(vm_1_peer_id, "", "", "")]]);
|
||||
assert_eq!(tetraplet_checker.as_ref(), &expected_tetraplet);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ap_stream_map() {
|
||||
let vm_1_peer_id = "vm_1_peer_id";
|
||||
let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id);
|
||||
|
||||
let service_name1 = "serv1";
|
||||
let service_name2 = "serv2";
|
||||
let script = f!(r#"
|
||||
(seq
|
||||
(seq
|
||||
(ap ("{vm_1_peer_id}" "{service_name1}") %map)
|
||||
(ap ("{vm_1_peer_id}" "{service_name2}") %map)
|
||||
)
|
||||
(fold %map i
|
||||
(seq
|
||||
(call i.$.key (i.$.key i.$.value) [i] u)
|
||||
(next i)
|
||||
)
|
||||
)
|
||||
)
|
||||
"#);
|
||||
|
||||
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
|
||||
let actual_trace = trace_from_result(&result);
|
||||
let generation_idx = 0;
|
||||
let mut cid_tracker = ExecutionCidState::new();
|
||||
let service_result1 = json!({
|
||||
"key": vm_1_peer_id,
|
||||
"value": service_name1,
|
||||
});
|
||||
let service_result2 = json!({
|
||||
"key": vm_1_peer_id,
|
||||
"value": service_name2,
|
||||
});
|
||||
let service_args1 = vec![service_result1.clone()];
|
||||
let service_args2 = vec![service_result2.clone()];
|
||||
|
||||
let expected_state = ExecutionTrace::from(vec![
|
||||
executed_state::ap(generation_idx),
|
||||
executed_state::ap(generation_idx),
|
||||
executed_state::fold(vec![
|
||||
subtrace_lore(0, SubTraceDesc::new(3.into(), 1), SubTraceDesc::new(5.into(), 0)),
|
||||
subtrace_lore(1, SubTraceDesc::new(4.into(), 1), SubTraceDesc::new(5.into(), 0)),
|
||||
]),
|
||||
scalar_tracked!(
|
||||
service_result1,
|
||||
cid_tracker,
|
||||
peer = vm_1_peer_id,
|
||||
service = vm_1_peer_id,
|
||||
function = service_name1,
|
||||
args = service_args1
|
||||
),
|
||||
scalar_tracked!(
|
||||
service_result2,
|
||||
cid_tracker,
|
||||
peer = vm_1_peer_id,
|
||||
service = vm_1_peer_id,
|
||||
function = service_name2,
|
||||
args = service_args2
|
||||
),
|
||||
]);
|
||||
assert_eq!(actual_trace, expected_state);
|
||||
}
|
||||
|
@ -14,8 +14,10 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
use air::unsupported_map_key_type;
|
||||
use air::CatchableError;
|
||||
use air::LambdaError;
|
||||
|
||||
use air_test_utils::prelude::*;
|
||||
|
||||
#[test]
|
||||
@ -542,3 +544,36 @@ fn canon_stream_not_have_enough_values_call_arg() {
|
||||
CatchableError::LambdaApplierError(LambdaError::CanonStreamNotHaveEnoughValues { stream_size: 0, idx: 0 });
|
||||
assert!(check_error(&result, expected_error));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unsupported_map_keytype_float() {
|
||||
let mut local_vm = create_avm(unit_call_service(), "local_peer_id");
|
||||
|
||||
let map_name = "%map";
|
||||
let join_stream_script = f!(r#"
|
||||
(ap (0.5 "serv1") %map)
|
||||
"#);
|
||||
|
||||
let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap();
|
||||
let expected_error = CatchableError::StreamMapError(unsupported_map_key_type(map_name));
|
||||
assert!(check_error(&result, expected_error));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn unsupported_map_keytype() {
|
||||
let local_peer_id = "local_peer_id";
|
||||
let obj_arg = json!({"a": {"b": 1},});
|
||||
let mut local_vm = create_avm(set_variable_call_service(obj_arg), local_peer_id);
|
||||
|
||||
let map_name = "%map";
|
||||
let join_stream_script = f!(r#"
|
||||
(seq
|
||||
(call "{local_peer_id}" ("" "") [] scalar)
|
||||
(ap (scalar.$.a "serv1") %map)
|
||||
)
|
||||
"#);
|
||||
|
||||
let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap();
|
||||
let expected_error = CatchableError::StreamMapError(unsupported_map_key_type(map_name));
|
||||
assert!(check_error(&result, expected_error));
|
||||
}
|
||||
|
@ -24,6 +24,7 @@ use super::ImmutableVariableWithLambda;
|
||||
use super::Scalar;
|
||||
use super::ScalarWithLambda;
|
||||
use super::Stream;
|
||||
use super::StreamMap;
|
||||
|
||||
use air_lambda_ast::LambdaAST;
|
||||
|
||||
@ -111,6 +112,15 @@ pub enum ApResult<'i> {
|
||||
Stream(Stream<'i>),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub enum ApMapKey<'i> {
|
||||
Literal(&'i str),
|
||||
Number(Number),
|
||||
Scalar(Scalar<'i>),
|
||||
ScalarWithLambda(ScalarWithLambda<'i>),
|
||||
CanonStreamWithLambda(CanonStreamWithLambda<'i>),
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
|
||||
pub enum Number {
|
||||
Int(i64),
|
||||
@ -136,5 +146,7 @@ pub enum NewArgument<'i> {
|
||||
#[serde(borrow)]
|
||||
Stream(Stream<'i>),
|
||||
#[serde(borrow)]
|
||||
StreamMap(StreamMap<'i>),
|
||||
#[serde(borrow)]
|
||||
CanonStream(CanonStream<'i>),
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ impl<'i> NewArgument<'i> {
|
||||
Self::Scalar(scalar) => scalar.name,
|
||||
Self::Stream(stream) => stream.name,
|
||||
Self::CanonStream(canon_stream) => canon_stream.name,
|
||||
Self::StreamMap(stream_map) => stream_map.name,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -108,6 +108,20 @@ impl fmt::Display for ApArgument<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ApMapKey<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
use ApMapKey::*;
|
||||
|
||||
match self {
|
||||
Literal(str) => write!(f, r#""{str}""#),
|
||||
Number(number) => write!(f, "{number}"),
|
||||
Scalar(scalar) => write!(f, "{scalar}"),
|
||||
ScalarWithLambda(scalar) => write!(f, "{scalar}"),
|
||||
CanonStreamWithLambda(canon_stream) => write!(f, "{canon_stream}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Triplet<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(
|
||||
@ -124,6 +138,7 @@ impl fmt::Display for NewArgument<'_> {
|
||||
Self::Scalar(scalar) => write!(f, "{scalar}"),
|
||||
Self::Stream(stream) => write!(f, "{stream}"),
|
||||
Self::CanonStream(canon_stream) => write!(f, "{canon_stream}"),
|
||||
Self::StreamMap(stream_map) => write!(f, "{stream_map}"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,7 @@ use std::rc::Rc;
|
||||
pub enum Instruction<'i> {
|
||||
Call(Call<'i>),
|
||||
Ap(Ap<'i>),
|
||||
ApMap(ApMap<'i>),
|
||||
Canon(Canon<'i>),
|
||||
Seq(Seq<'i>),
|
||||
Par(Par<'i>),
|
||||
@ -38,6 +39,7 @@ pub enum Instruction<'i> {
|
||||
Fail(Fail<'i>),
|
||||
FoldScalar(FoldScalar<'i>),
|
||||
FoldStream(FoldStream<'i>),
|
||||
FoldStreamMap(FoldStreamMap<'i>),
|
||||
Never(Never),
|
||||
New(New<'i>),
|
||||
Next(Next<'i>),
|
||||
@ -60,6 +62,14 @@ pub struct Ap<'i> {
|
||||
pub result: ApResult<'i>,
|
||||
}
|
||||
|
||||
/// (ap key value %map)
|
||||
#[derive(Serialize, Debug, PartialEq)]
|
||||
pub struct ApMap<'i> {
|
||||
pub key: ApMapKey<'i>,
|
||||
pub value: ApArgument<'i>,
|
||||
pub map: StreamMap<'i>,
|
||||
}
|
||||
|
||||
/// (canon peer_id $stream #canon_stream)
|
||||
#[derive(Serialize, Debug, PartialEq, Eq)]
|
||||
pub struct Canon<'i> {
|
||||
@ -136,6 +146,18 @@ pub struct FoldStream<'i> {
|
||||
pub span: Span,
|
||||
}
|
||||
|
||||
/// (fold stream_iterable iterator instruction)
|
||||
#[derive(Serialize, Debug, PartialEq)]
|
||||
pub struct FoldStreamMap<'i> {
|
||||
pub iterable: StreamMap<'i>,
|
||||
#[serde(borrow)]
|
||||
pub iterator: Scalar<'i>,
|
||||
pub instruction: Rc<Instruction<'i>>,
|
||||
// option is needed to provide a graceful period of adoption
|
||||
pub last_instruction: Option<Rc<Instruction<'i>>>,
|
||||
pub span: Span,
|
||||
}
|
||||
|
||||
/// (fold stream_iterable iterator instruction)
|
||||
#[derive(Serialize, Debug, PartialEq, Eq)]
|
||||
pub struct Next<'i> {
|
||||
|
@ -22,6 +22,12 @@ impl<'i> Ap<'i> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> ApMap<'i> {
|
||||
pub fn new(key: ApMapKey<'i>, value: ApArgument<'i>, map: StreamMap<'i>) -> Self {
|
||||
Self { key, value, map }
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> Call<'i> {
|
||||
pub fn new(
|
||||
triplet: Triplet<'i>,
|
||||
@ -141,6 +147,24 @@ impl<'i> FoldStream<'i> {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> FoldStreamMap<'i> {
|
||||
pub fn new(
|
||||
iterable: StreamMap<'i>,
|
||||
iterator: Scalar<'i>,
|
||||
instruction: Instruction<'i>,
|
||||
last_instruction: Option<Instruction<'i>>,
|
||||
span: Span,
|
||||
) -> Self {
|
||||
Self {
|
||||
iterable,
|
||||
iterator,
|
||||
instruction: Rc::new(instruction),
|
||||
last_instruction: last_instruction.map(Rc::new),
|
||||
span,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> Next<'i> {
|
||||
pub fn new(iterator: Scalar<'i>) -> Self {
|
||||
Self { iterator }
|
||||
|
@ -26,6 +26,7 @@ impl fmt::Display for Instruction<'_> {
|
||||
Call(call) => write!(f, "{call}"),
|
||||
Canon(canon) => write!(f, "{canon}"),
|
||||
Ap(ap) => write!(f, "{ap}"),
|
||||
ApMap(ap_map) => write!(f, "{ap_map}"),
|
||||
Seq(seq) => write!(f, "{seq}"),
|
||||
Par(par) => write!(f, "{par}"),
|
||||
Xor(xor) => write!(f, "{xor}"),
|
||||
@ -34,6 +35,7 @@ impl fmt::Display for Instruction<'_> {
|
||||
Fail(fail) => write!(f, "{fail}"),
|
||||
FoldScalar(fold) => write!(f, "{fold}"),
|
||||
FoldStream(fold) => write!(f, "{fold}"),
|
||||
FoldStreamMap(fold) => write!(f, "{fold}"),
|
||||
Never(never) => write!(f, "{never}"),
|
||||
Next(next) => write!(f, "{next}"),
|
||||
New(new) => write!(f, "{new}"),
|
||||
@ -68,6 +70,12 @@ impl fmt::Display for Ap<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for ApMap<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "ap ({} {}) {}", self.key, self.value, self.map)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Fail<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
@ -97,6 +105,12 @@ impl fmt::Display for FoldStream<'_> {
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for FoldStreamMap<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "fold {} {}", self.iterable, self.iterator)
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for Seq<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "seq")
|
||||
|
@ -79,3 +79,10 @@ pub enum ImmutableVariableWithLambda<'i> {
|
||||
#[serde(borrow)]
|
||||
CanonStream(CanonStreamWithLambda<'i>),
|
||||
}
|
||||
|
||||
/// A map based on top of a stream.
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
|
||||
pub struct StreamMap<'i> {
|
||||
pub name: &'i str,
|
||||
pub position: AirPos,
|
||||
}
|
||||
|
@ -119,3 +119,9 @@ impl<'i> ImmutableVariableWithLambda<'i> {
|
||||
Self::Scalar(scalar)
|
||||
}
|
||||
}
|
||||
|
||||
impl<'i> StreamMap<'i> {
|
||||
pub fn new(name: &'i str, position: AirPos) -> Self {
|
||||
Self { name, position }
|
||||
}
|
||||
}
|
||||
|
@ -68,3 +68,9 @@ impl fmt::Display for ImmutableVariableWithLambda<'_> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl fmt::Display for StreamMap<'_> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
write!(f, "{}", self.name)
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,16 @@ Instr: Box<Instruction<'input>> = {
|
||||
Box::new(Instruction::Ap(apply))
|
||||
},
|
||||
|
||||
<left: @L> "(" ap "(" <key:ApMapKey> <value:ApArgument> ")" <map:StreamMap> ")" <right: @R> => {
|
||||
let map = StreamMap::new(map.0, map.1);
|
||||
let apply = ApMap::new(key, value, map);
|
||||
|
||||
let span = Span::new(left, right);
|
||||
validator.met_ap_map(&apply, span);
|
||||
|
||||
Box::new(Instruction::ApMap(apply))
|
||||
},
|
||||
|
||||
"(" seq <l:Instr> <r:Instr> ")" => Box::new(Instruction::Seq(Seq::new(l, r))),
|
||||
"(" par <l:Instr> <r:Instr> ")" => Box::new(Instruction::Par(Par::new(l, r))),
|
||||
"(" never ")" => Box::new(Instruction::Never(Never)),
|
||||
@ -83,6 +93,17 @@ Instr: Box<Instruction<'input>> = {
|
||||
Box::new(Instruction::FoldStream(fold))
|
||||
},
|
||||
|
||||
<left: @L> "(" fold <stream_map:StreamMap> <iterator:Scalar> <instruction:Instr> <last_instruction:Instr?> ")" <right: @R> => {
|
||||
let iterable = StreamMap::new(stream_map.0, stream_map.1);
|
||||
let iterator = Scalar::new(iterator.0, iterator.1);
|
||||
let span = Span::new(left, right);
|
||||
let fold = FoldStreamMap::new(iterable, iterator, *instruction, last_instruction.map(|v| *v), span);
|
||||
|
||||
validator.meet_fold_stream_map(&fold, span);
|
||||
|
||||
Box::new(Instruction::FoldStreamMap(fold))
|
||||
},
|
||||
|
||||
<left: @L> "(" next <iterator:Scalar> ")" <right: @R> => {
|
||||
let iterator = Scalar::new(iterator.0, iterator.1);
|
||||
let next = Next::new(iterator);
|
||||
@ -130,6 +151,14 @@ ApResult: ApResult<'input> = {
|
||||
<stream:Stream> => ApResult::stream(stream.0, stream.1),
|
||||
};
|
||||
|
||||
ApMapKey: ApMapKey<'input> = {
|
||||
<l:Literal> => ApMapKey::Literal(l),
|
||||
<n:Number> => ApMapKey::Number(n),
|
||||
<scalar:Scalar> => ApMapKey::Scalar(Scalar::new(scalar.0, scalar.1)),
|
||||
<scalar:ScalarWithLambda> => ApMapKey::ScalarWithLambda(ScalarWithLambda::new(scalar.0, scalar.1, scalar.2)),
|
||||
<canon_stream:CanonStreamWithLambda> => ApMapKey::CanonStreamWithLambda(CanonStreamWithLambda::new(canon_stream.0, canon_stream.1, canon_stream.2)),
|
||||
};
|
||||
|
||||
CallOutput: CallOutputValue<'input> = {
|
||||
<scalar:Scalar> => CallOutputValue::scalar(scalar.0, scalar.1),
|
||||
<stream:Stream> => CallOutputValue::stream(stream.0, stream.1),
|
||||
@ -177,6 +206,7 @@ ResolvableToStringVariable: ResolvableToStringVariable<'input> = {
|
||||
NewArgument: NewArgument<'input> = {
|
||||
<scalar:Scalar> => NewArgument::Scalar(Scalar::new(scalar.0, scalar.1)),
|
||||
<stream:Stream> => NewArgument::Stream(Stream::new(stream.0, stream.1)),
|
||||
<stream:StreamMap> => NewArgument::StreamMap(StreamMap::new(stream.0, stream.1)),
|
||||
<canon_stream:CanonStream> => NewArgument::CanonStream(CanonStream::new(canon_stream.0, canon_stream.1)),
|
||||
}
|
||||
|
||||
@ -240,6 +270,7 @@ extern {
|
||||
Scalar => Token::Scalar { name:<&'input str>, position: <AirPos> },
|
||||
ScalarWithLambda => Token::ScalarWithLambda { name: <&'input str>, lambda: <LambdaAST<'input>>, position: <AirPos> },
|
||||
Stream => Token::Stream { name: <&'input str>, position: <AirPos> },
|
||||
StreamMap => Token::StreamMap { name: <&'input str>, position: <AirPos> },
|
||||
CanonStream => Token::CanonStream { name: <&'input str>, position: <AirPos> },
|
||||
CanonStreamWithLambda => Token::CanonStreamWithLambda {name: <&'input str>, lambda:<LambdaAST<'input>>, position: <AirPos>},
|
||||
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -46,6 +46,13 @@ pub enum ParserError {
|
||||
"multiple next instructions for iterator '{iterator_name}' found for one fold, that is prohibited"
|
||||
)]
|
||||
MultipleNextInFold { span: Span, iterator_name: String },
|
||||
|
||||
#[error("unsupported variable key type in (ap {ap_key_type} value {ap_result_name})")]
|
||||
UnsupportedMapKeyType {
|
||||
span: Span,
|
||||
ap_key_type: String,
|
||||
ap_result_name: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl ParserError {
|
||||
@ -59,6 +66,7 @@ impl ParserError {
|
||||
Self::IteratorRestrictionNotAllowed { span, .. } => *span,
|
||||
Self::MultipleIterableValuesForOneIterator { span, .. } => *span,
|
||||
Self::MultipleNextInFold { span, .. } => *span,
|
||||
Self::UnsupportedMapKeyType { span, .. } => *span,
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,6 +104,18 @@ impl ParserError {
|
||||
iterator_name: iterator_name.into(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unsupported_map_key_type(
|
||||
span: Span,
|
||||
ap_key_type: impl Into<String>,
|
||||
ap_result_name: impl Into<String>,
|
||||
) -> Self {
|
||||
Self::UnsupportedMapKeyType {
|
||||
span,
|
||||
ap_key_type: ap_key_type.into(),
|
||||
ap_result_name: ap_result_name.into(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<std::convert::Infallible> for ParserError {
|
||||
|
@ -34,6 +34,7 @@ pub(super) fn try_parse_call_variable(
|
||||
enum MetTag {
|
||||
None,
|
||||
Stream,
|
||||
StreamMap,
|
||||
CanonStream,
|
||||
}
|
||||
|
||||
@ -291,6 +292,10 @@ impl<'input> CallVariableParser<'input> {
|
||||
name,
|
||||
position: self.start_pos,
|
||||
},
|
||||
MetTag::StreamMap => Token::StreamMap {
|
||||
name,
|
||||
position: self.start_pos,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -311,6 +316,11 @@ impl<'input> CallVariableParser<'input> {
|
||||
lambda,
|
||||
position: self.start_pos,
|
||||
},
|
||||
MetTag::StreamMap => Token::StreamMapWithLambda {
|
||||
name,
|
||||
lambda,
|
||||
position: self.start_pos,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@ -378,6 +388,7 @@ impl MetTag {
|
||||
match tag {
|
||||
'$' => Self::Stream,
|
||||
'#' => Self::CanonStream,
|
||||
'%' => Self::StreamMap,
|
||||
_ => Self::None,
|
||||
}
|
||||
}
|
||||
|
@ -198,6 +198,23 @@ fn stream() {
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn stream_map() {
|
||||
const STREAM_MAP: &str = "%stream_map____asdasd";
|
||||
|
||||
lexer_test(
|
||||
STREAM_MAP,
|
||||
Single(Ok((
|
||||
0.into(),
|
||||
Token::StreamMap {
|
||||
name: STREAM_MAP,
|
||||
position: 0.into(),
|
||||
},
|
||||
STREAM_MAP.len().into(),
|
||||
))),
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn canon_stream() {
|
||||
const CANON_STREAM: &str = "#stream____asdasd";
|
||||
|
@ -46,6 +46,11 @@ pub enum Token<'input> {
|
||||
lambda: LambdaAST<'input>,
|
||||
position: AirPos,
|
||||
},
|
||||
StreamMapWithLambda {
|
||||
name: &'input str,
|
||||
lambda: LambdaAST<'input>,
|
||||
position: AirPos,
|
||||
},
|
||||
CanonStream {
|
||||
name: &'input str,
|
||||
position: AirPos,
|
||||
@ -55,6 +60,10 @@ pub enum Token<'input> {
|
||||
lambda: LambdaAST<'input>,
|
||||
position: AirPos,
|
||||
},
|
||||
StreamMap {
|
||||
name: &'input str,
|
||||
position: AirPos,
|
||||
},
|
||||
|
||||
StringLiteral(&'input str),
|
||||
I64(i64),
|
||||
|
@ -184,3 +184,86 @@ fn ap_with_canon_stream_with_lambda() {
|
||||
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ap_with_stream_map() {
|
||||
// 4 variants
|
||||
let var_name = "%stream_map";
|
||||
let key_name = "keyo";
|
||||
let value = "some_string";
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap ("{key_name}" "{value}") %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
let expected = ap_with_map(
|
||||
ApMapKey::Literal(key_name),
|
||||
ApArgument::Literal(value),
|
||||
StreamMap::new(var_name, source_code.find(var_name).unwrap().into()),
|
||||
);
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
// It is possible to use Scalar as a key in the context of a parser
|
||||
// but populate_context will raise an error
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap ({key_name} "{value}") %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
let expected = ap_with_map(
|
||||
ApMapKey::Scalar(Scalar::new(
|
||||
key_name,
|
||||
source_code.find(key_name).unwrap().into(),
|
||||
)),
|
||||
ApArgument::Literal(value),
|
||||
StreamMap::new(var_name, source_code.find(var_name).unwrap().into()),
|
||||
);
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap ("{key_name}" {value}) %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
let expected = ap_with_map(
|
||||
ApMapKey::Literal(key_name),
|
||||
ApArgument::Scalar(Scalar::new(value, source_code.find(value).unwrap().into())),
|
||||
StreamMap::new(var_name, source_code.find(var_name).unwrap().into()),
|
||||
);
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
// It is possible to use Scalar as a key in the context of a parser
|
||||
// but populate_context will raise an error
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap ({key_name} {value}) %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
let expected = ap_with_map(
|
||||
ApMapKey::Scalar(Scalar::new(
|
||||
key_name,
|
||||
source_code.find(key_name).unwrap().into(),
|
||||
)),
|
||||
ApArgument::Scalar(Scalar::new(value, source_code.find(value).unwrap().into())),
|
||||
StreamMap::new(var_name, source_code.find(var_name).unwrap().into()),
|
||||
);
|
||||
assert_eq!(actual, expected);
|
||||
|
||||
let key_name = 123;
|
||||
let source_code = format!(
|
||||
r#"
|
||||
(ap ({key_name} {value}) %stream_map)
|
||||
"#
|
||||
);
|
||||
let actual = parse(source_code.as_str());
|
||||
let expected = ap_with_map(
|
||||
ApMapKey::Number(Number::Int(key_name)),
|
||||
ApArgument::Scalar(Scalar::new(value, source_code.find(value).unwrap().into())),
|
||||
StreamMap::new(var_name, source_code.find(var_name).unwrap().into()),
|
||||
);
|
||||
assert_eq!(actual, expected);
|
||||
}
|
||||
|
@ -199,6 +199,18 @@ pub(super) fn ap<'i>(argument: ApArgument<'i>, result: ApResult<'i>) -> Instruct
|
||||
Instruction::Ap(Ap { argument, result })
|
||||
}
|
||||
|
||||
pub(super) fn ap_with_map<'i>(
|
||||
key: ApMapKey<'i>,
|
||||
argument: ApArgument<'i>,
|
||||
result: StreamMap<'i>,
|
||||
) -> Instruction<'i> {
|
||||
Instruction::ApMap(ApMap {
|
||||
key: key,
|
||||
value: argument,
|
||||
map: result,
|
||||
})
|
||||
}
|
||||
|
||||
pub(super) fn canon<'i>(
|
||||
peer_pk: ResolvableToPeerIdVariable<'i>,
|
||||
stream: Stream<'i>,
|
||||
|
@ -105,3 +105,19 @@ fn iterators_cant_be_restricted() {
|
||||
ParserError::IteratorRestrictionNotAllowed { .. }
|
||||
));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_new_with_stream_map() {
|
||||
let source_code = r#"(new %stream
|
||||
(null)
|
||||
)
|
||||
"#;
|
||||
|
||||
let instruction = parse(source_code);
|
||||
let expected = new(
|
||||
NewArgument::StreamMap(StreamMap::new("%stream", 5.into())),
|
||||
null(),
|
||||
Span::new(0.into(), 41.into()),
|
||||
);
|
||||
assert_eq!(instruction, expected);
|
||||
}
|
||||
|
@ -58,6 +58,10 @@ pub struct VariableValidator<'i> {
|
||||
|
||||
/// Contains all names that should be checked that they are not iterators.
|
||||
not_iterators_candidates: Vec<(&'i str, Span)>,
|
||||
|
||||
// This contains info about unssuported map key arguments used with ap instruction,
|
||||
// namely (key map ApArgument)
|
||||
unsupported_map_keys: Vec<(String, &'i str, Span)>,
|
||||
}
|
||||
|
||||
impl<'i> VariableValidator<'i> {
|
||||
@ -112,6 +116,11 @@ impl<'i> VariableValidator<'i> {
|
||||
self.met_iterator_definition(&fold.iterator, span);
|
||||
}
|
||||
|
||||
pub(super) fn meet_fold_stream_map(&mut self, fold: &FoldStreamMap<'i>, span: Span) {
|
||||
self.met_variable_name(fold.iterable.name, span);
|
||||
self.met_iterator_definition(&fold.iterator, span);
|
||||
}
|
||||
|
||||
pub(super) fn met_new(&mut self, new: &New<'i>, span: Span) {
|
||||
self.not_iterators_candidates
|
||||
.push((new.argument.name(), span));
|
||||
@ -148,6 +157,17 @@ impl<'i> VariableValidator<'i> {
|
||||
self.met_variable_name_definition(ap.result.name(), span);
|
||||
}
|
||||
|
||||
pub(super) fn met_ap_map(&mut self, ap_map: &ApMap<'i>, span: Span) {
|
||||
let key = &ap_map.key;
|
||||
match key {
|
||||
ApMapKey::Literal(_) | ApMapKey::Number(_) => {}
|
||||
ApMapKey::Scalar(scalar) => self.met_scalar(scalar, span),
|
||||
ApMapKey::ScalarWithLambda(scalar) => self.met_scalar_wl(scalar, span),
|
||||
ApMapKey::CanonStreamWithLambda(stream) => self.met_canon_stream_wl(stream, span),
|
||||
}
|
||||
self.met_variable_name_definition(ap_map.map.name, span);
|
||||
}
|
||||
|
||||
pub(super) fn finalize(self) -> Vec<ErrorRecovery<AirPos, Token<'i>, ParserError>> {
|
||||
ValidatorErrorBuilder::new(self)
|
||||
.check_undefined_variables()
|
||||
@ -155,6 +175,7 @@ impl<'i> VariableValidator<'i> {
|
||||
.check_multiple_next_in_fold()
|
||||
.check_new_on_iterators()
|
||||
.check_iterator_for_multiple_definitions()
|
||||
.check_for_unsupported_map_keys()
|
||||
.build()
|
||||
}
|
||||
|
||||
@ -417,6 +438,19 @@ impl<'i> ValidatorErrorBuilder<'i> {
|
||||
self
|
||||
}
|
||||
|
||||
// Unsupported StreamMap keys check, e.g. Stream can not be a map key (ap ($stream "value") %map)
|
||||
fn check_for_unsupported_map_keys(mut self) -> Self {
|
||||
for (arg_key_type, ap_result_name, span) in self.validator.unsupported_map_keys.iter_mut() {
|
||||
let error = ParserError::unsupported_map_key_type(
|
||||
*span,
|
||||
arg_key_type.to_string(),
|
||||
*ap_result_name,
|
||||
);
|
||||
add_to_errors(&mut self.errors, *span, Token::New, error);
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
fn build(self) -> Vec<ErrorRecovery<AirPos, Token<'i>, ParserError>> {
|
||||
self.errors
|
||||
}
|
||||
|
@ -68,6 +68,7 @@ pub struct NewTracker {
|
||||
pub executed_count: HashMap<AirPos, u32>,
|
||||
}
|
||||
|
||||
// TODO: return seen_count from other methods of this structure
|
||||
impl InstructionTracker {
|
||||
pub fn meet_ap(&mut self) {
|
||||
self.ap.seen_count += 1;
|
||||
@ -89,8 +90,9 @@ impl InstructionTracker {
|
||||
self.fold.seen_scalar_count += 1;
|
||||
}
|
||||
|
||||
pub fn meet_fold_stream(&mut self) {
|
||||
pub fn meet_fold_stream(&mut self) -> u32 {
|
||||
self.fold.seen_stream_count += 1;
|
||||
self.fold.seen_stream_count
|
||||
}
|
||||
|
||||
pub fn meet_match(&mut self) {
|
||||
|
@ -22,6 +22,7 @@ use std::collections::HashMap;
|
||||
/// Mapping from a stream name to it's generation count.
|
||||
/// Similar to pi-calculus non-restricted names/channels.
|
||||
pub type GlobalStreamGens = HashMap<String, GenerationIdx>;
|
||||
pub type GlobalStreamMapGens = GlobalStreamGens;
|
||||
|
||||
/// Mapping from a stream name to
|
||||
/// position of a new instruction in a script that creates a scope for a stream
|
||||
@ -32,3 +33,4 @@ pub type GlobalStreamGens = HashMap<String, GenerationIdx>;
|
||||
/// where it was met.
|
||||
/// Similar to pi-calculus restricted names/channels.
|
||||
pub type RestrictedStreamGens = HashMap<String, HashMap<AirPos, Vec<GenerationIdx>>>;
|
||||
pub type RestrictedStreamMapGens = RestrictedStreamGens;
|
||||
|
@ -125,6 +125,7 @@ impl<W: io::Write> Beautifier<W> {
|
||||
match node {
|
||||
ast::Instruction::Call(call) => self.beautify_call(call, indent),
|
||||
ast::Instruction::Ap(ap) => self.beautify_simple(ap, indent),
|
||||
ast::Instruction::ApMap(ap_map) => self.beautify_simple(ap_map, indent),
|
||||
ast::Instruction::Canon(canon) => self.beautify_simple(canon, indent),
|
||||
ast::Instruction::Seq(seq) => self.beautify_seq(seq, indent),
|
||||
ast::Instruction::Par(par) => self.beautify_par(par, indent),
|
||||
@ -138,6 +139,9 @@ impl<W: io::Write> Beautifier<W> {
|
||||
ast::Instruction::FoldStream(fold_stream) => {
|
||||
self.beautify_fold_stream(fold_stream, indent)
|
||||
}
|
||||
ast::Instruction::FoldStreamMap(fold_stream_map) => {
|
||||
self.beautify_fold_stream_map(fold_stream_map, indent)
|
||||
}
|
||||
ast::Instruction::Never(never) => self.beautify_simple(never, indent),
|
||||
ast::Instruction::New(new) => self.beautify_new(new, indent),
|
||||
ast::Instruction::Next(next) => self.beautify_simple(next, indent),
|
||||
@ -215,6 +219,14 @@ impl<W: io::Write> Beautifier<W> {
|
||||
compound!(self, indent, fold)
|
||||
}
|
||||
|
||||
fn beautify_fold_stream_map(
|
||||
&mut self,
|
||||
fold: &ast::FoldStreamMap<'_>,
|
||||
indent: usize,
|
||||
) -> io::Result<()> {
|
||||
compound!(self, indent, fold)
|
||||
}
|
||||
|
||||
fn beautify_new(&mut self, new: &ast::New<'_>, indent: usize) -> io::Result<()> {
|
||||
compound!(self, indent, new)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user