feat(execution-engine): Stream Map initial support [fixes VM-283,VM-284]

This commit is contained in:
Roman Nozdrin 2023-04-11 14:23:18 +00:00
parent 80486ed6ad
commit deeb1fb03a
46 changed files with 3274 additions and 1268 deletions

View File

@ -18,6 +18,7 @@ mod canon_stream;
mod cell_vec_resolved_call_result; mod cell_vec_resolved_call_result;
mod iterable_item; mod iterable_item;
mod resolved_call_result; mod resolved_call_result;
mod stream_map;
use super::iterable::IterableItem; use super::iterable::IterableItem;
use super::ExecutionResult; use super::ExecutionResult;

View File

@ -0,0 +1,83 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionResult;
use super::JValuable;
use crate::execution_step::boxed_value::Generation;
use crate::execution_step::boxed_value::StreamMap;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::RcSecurityTetraplets;
use crate::JValue;
use crate::LambdaAST;
use crate::SecurityTetraplet;
use std::borrow::Cow;
#[derive(Debug)]
pub(crate) struct StreamMapJvaluableIngredients<'stream> {
pub(crate) stream_map: &'stream StreamMap,
pub(crate) generation: Generation,
}
// TODO: this will be deleted soon, because it would be impossible to use streams without
// canonicalization as an arg of a call
impl JValuable for StreamMapJvaluableIngredients<'_> {
fn apply_lambda(&self, _lambda: &LambdaAST<'_>, _exec_ctx: &ExecutionCtx<'_>) -> ExecutionResult<Cow<'_, JValue>> {
unimplemented!("No such method for StreamMap");
}
fn apply_lambda_with_tetraplets(
&self,
_lambda: &LambdaAST<'_>,
_exec_ctx: &ExecutionCtx<'_>,
) -> ExecutionResult<(Cow<'_, JValue>, SecurityTetraplet)> {
unimplemented!("No such method for StreamMap");
}
fn as_jvalue(&self) -> Cow<'_, JValue> {
unimplemented!("No such method for StreamMap");
}
fn into_jvalue(self: Box<Self>) -> JValue {
unimplemented!("No such method for StreamMap");
}
fn as_tetraplets(&self) -> RcSecurityTetraplets {
unimplemented!("No such method for StreamMap");
}
}
use crate::execution_step::boxed_value::StreamIter;
#[allow(dead_code)]
impl<'stream> StreamMapJvaluableIngredients<'stream> {
pub(crate) fn new(stream_map: &'stream StreamMap, generation: Generation) -> Self {
Self { stream_map, generation }
}
pub(self) fn iter(&self) -> ExecutionResult<StreamIter<'_>> {
use crate::execution_step::UncatchableError::StreamMapDontHaveSuchGeneration;
match self.stream_map.iter(self.generation) {
Some(iter) => Ok(iter),
None => Err(StreamMapDontHaveSuchGeneration {
stream_map: self.stream_map.clone(),
generation: self.generation,
}
.into()),
}
}
}

View File

@ -19,6 +19,7 @@ mod iterable;
mod jvaluable; mod jvaluable;
mod scalar; mod scalar;
mod stream; mod stream;
mod stream_map;
mod utils; mod utils;
pub(crate) use canon_stream::*; pub(crate) use canon_stream::*;
@ -28,6 +29,8 @@ pub(crate) use scalar::ScalarRef;
pub(crate) use scalar::ValueAggregate; pub(crate) use scalar::ValueAggregate;
pub(crate) use stream::Generation; pub(crate) use stream::Generation;
pub(crate) use stream::Stream; pub(crate) use stream::Stream;
pub(crate) use stream::StreamIter;
pub(crate) use stream_map::*;
pub(crate) use utils::populate_tetraplet_with_lambda; pub(crate) use utils::populate_tetraplet_with_lambda;
use super::ExecutionResult; use super::ExecutionResult;

View File

@ -23,6 +23,8 @@ use air_interpreter_data::GenerationIdx;
use air_trace_handler::merger::ValueSource; use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler; use air_trace_handler::TraceHandler;
use std::slice::Iter;
/// Streams are CRDT-like append only data structures. They are guaranteed to have the same order /// Streams are CRDT-like append only data structures. They are guaranteed to have the same order
/// of values on each peer. /// of values on each peer.
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
@ -208,9 +210,13 @@ impl Stream {
} }
/// Removes empty generations from current values. /// Removes empty generations from current values.
fn remove_empty_generations(&mut self) { pub(crate) fn remove_empty_generations(&mut self) {
self.values.retain(|values| !values.is_empty()); self.values.retain(|values| !values.is_empty());
} }
pub(crate) fn get_values_iter(&self) -> Iter<'_, Vec<ValueAggregate>> {
self.values.iter()
}
} }
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]

View File

@ -0,0 +1,200 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::rc::Rc;
use super::ValueAggregate;
use crate::execution_step::ExecutionResult;
use crate::JValue;
use super::stream::*;
use air_interpreter_data::GenerationIdx;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
use serde::Serialize;
use serde_json::json;
#[derive(Debug, Default, Clone)]
pub struct StreamMap {
stream: Stream,
}
impl StreamMap {
fn envelope(key: &(impl Into<JValue> + Serialize + Clone), value: Rc<JValue>) -> Rc<JValue> {
Rc::new(json!({ "key": key.clone(), "value": value }))
}
pub(crate) fn from_generations_count(previous_count: GenerationIdx, current_count: GenerationIdx) -> Self {
Self {
stream: Stream::from_generations_count(previous_count, current_count),
}
}
pub(crate) fn from_value(key: &(impl Into<JValue> + Serialize + Clone), value: ValueAggregate) -> Self {
let obj = StreamMap::envelope(key, value.result);
Self {
stream: Stream::from_value(ValueAggregate::new(obj, value.tetraplet, value.trace_pos)),
}
}
pub(crate) fn insert(
&mut self,
key: &(impl Into<JValue> + Serialize + Clone),
value: ValueAggregate,
generation: Generation,
source: ValueSource,
) -> ExecutionResult<GenerationIdx> {
let obj = StreamMap::envelope(key, value.result);
self.stream.add_value(
ValueAggregate::new(obj, value.tetraplet, value.trace_pos),
generation,
source,
)
}
pub(crate) fn slice_iter(&self, start: Generation, end: Generation) -> Option<StreamSliceIter<'_>> {
self.stream.slice_iter(start, end)
}
pub(crate) fn iter(&self, generation: Generation) -> Option<StreamIter<'_>> {
self.stream.iter(generation)
}
pub(crate) fn compactify(self, trace_ctx: &mut TraceHandler) -> ExecutionResult<GenerationIdx> {
self.stream.compactify(trace_ctx)
}
pub(crate) fn last_non_empty_generation(&self) -> GenerationIdx {
self.stream.last_non_empty_generation()
}
pub(crate) fn remove_last_generation_if_empty(&mut self) -> bool {
self.stream.remove_last_generation_if_empty()
}
pub(crate) fn add_new_generation_if_non_empty(&mut self) -> bool {
self.stream.add_new_generation_if_non_empty()
}
}
use std::fmt;
impl fmt::Display for StreamMap {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if self.stream.is_empty() {
return write!(f, "[]");
}
writeln!(f, "[")?;
for (id, generation) in self.stream.get_values_iter().enumerate() {
write!(f, " -- {id}: ")?;
for value in generation.iter() {
write!(f, "{value:?}, ")?;
}
writeln!(f)?;
}
write!(f, "]")
}
}
#[cfg(test)]
mod test {
use super::Generation;
use super::StreamMap;
use crate::execution_step::ValueAggregate;
use air_trace_handler::merger::ValueSource;
use serde_json::json;
use std::rc::Rc;
#[test]
fn test_from_value() {
let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]);
let key = String::from("some_key");
let value = Rc::new(obj.clone());
let generation_idx = 0;
let generation = Generation::Nth(generation_idx.into());
let stream_map = StreamMap::from_value(
&key.clone(),
ValueAggregate::new(value.clone(), <_>::default(), 0.into()),
);
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
let examplar = StreamMap::envelope(&key, value.clone());
assert_eq!(*v, *examplar.as_ref());
assert_eq!(internal_stream_iter.next(), None);
let key = 42;
let stream_map = StreamMap::from_value(
&key.clone(),
ValueAggregate::new(value.clone(), <_>::default(), 0.into()),
);
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
let examplar = StreamMap::envelope(&key, value);
assert_eq!(*v, *examplar.as_ref());
assert_eq!(internal_stream_iter.next(), None);
}
#[test]
fn test_insert() {
let obj = json!([{"top_level": [{"first": 42},{"second": 43}]}]);
let key12 = String::from("some_key");
let value = Rc::new(obj.clone());
let generation_idx = 0;
let va = ValueAggregate::new(value.clone(), <_>::default(), 0.into());
let mut stream_map = StreamMap::from_value(&key12, va.clone());
let generation = Generation::Nth(generation_idx.into());
let generation_idx_res = stream_map
.insert(&key12.clone(), va.clone(), generation, ValueSource::CurrentData)
.unwrap();
assert_eq!(generation_idx_res, generation_idx);
let examplar = StreamMap::envelope(&key12.clone(), value.clone());
let s = stream_map
.stream
.iter(generation)
.unwrap()
.all(|e| *e.result.as_ref() == *examplar.as_ref());
assert!(s);
let key3 = "other_key";
let generation_idx = stream_map
.insert(&key3.clone(), va.clone(), generation, ValueSource::CurrentData)
.unwrap();
assert_eq!(generation_idx_res, generation_idx);
let key4 = 42;
let generation_idx = stream_map
.insert(&key4, va, generation, ValueSource::CurrentData)
.unwrap();
assert_eq!(generation_idx_res, generation_idx);
let mut internal_stream_iter = stream_map.stream.iter(generation).unwrap();
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
assert_eq!(*v, *examplar.as_ref());
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
assert_eq!(*v, *examplar.as_ref());
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
let examplar = StreamMap::envelope(&key3.clone(), value.clone());
assert_eq!(*v, *examplar.as_ref());
let v = internal_stream_iter.next().map(|e| e.result.as_ref()).unwrap();
let examplar = StreamMap::envelope(&key4, value.clone());
assert_eq!(*v, *examplar.as_ref());
assert_eq!(internal_stream_iter.next(), None);
}
}

View File

@ -16,6 +16,7 @@
use super::Joinable; use super::Joinable;
use super::LastErrorAffectable; use super::LastErrorAffectable;
use crate::execution_step::execution_context::errors::StreamMapError;
use crate::execution_step::execution_context::LastErrorObjectError; use crate::execution_step::execution_context::LastErrorObjectError;
use crate::execution_step::lambda_applier::LambdaError; use crate::execution_step::lambda_applier::LambdaError;
use crate::JValue; use crate::JValue;
@ -93,6 +94,10 @@ pub enum CatchableError {
variable_name: String, variable_name: String,
actual_value: JValue, actual_value: JValue,
}, },
/// Stream map related errors.
#[error(transparent)]
StreamMapError(#[from] StreamMapError),
} }
impl From<LambdaError> for Rc<CatchableError> { impl From<LambdaError> for Rc<CatchableError> {

View File

@ -15,6 +15,7 @@
*/ */
use super::Stream; use super::Stream;
use crate::execution_step::boxed_value::StreamMap;
use crate::execution_step::Generation; use crate::execution_step::Generation;
use crate::ToErrorCode; use crate::ToErrorCode;
@ -95,6 +96,16 @@ pub enum UncatchableError {
)] )]
StreamDontHaveSuchGeneration { stream: Stream, generation: Generation }, StreamDontHaveSuchGeneration { stream: Stream, generation: Generation },
/// Errors occurred while insertion of a value inside stream map that doesn't have corresponding generation.
#[error(
"stream map doesn't have generation with number {generation}, supplied to the interpreter data is corrupted,\n\
stream map is {stream_map:?}"
)]
StreamMapDontHaveSuchGeneration {
stream_map: StreamMap,
generation: Generation,
},
#[error("failed to deserialize to CallServiceFailed: {0}")] #[error("failed to deserialize to CallServiceFailed: {0}")]
MalformedCallServiceFailed(serde_json::Error), MalformedCallServiceFailed(serde_json::Error),
} }

View File

@ -18,6 +18,7 @@ use super::ExecutionCidState;
use super::LastError; use super::LastError;
use super::LastErrorDescriptor; use super::LastErrorDescriptor;
use super::Scalars; use super::Scalars;
use super::StreamMaps;
use super::Streams; use super::Streams;
use air_execution_info_collector::InstructionTracker; use air_execution_info_collector::InstructionTracker;
@ -37,6 +38,9 @@ pub(crate) struct ExecutionCtx<'i> {
/// Contains all streams. /// Contains all streams.
pub(crate) streams: Streams, pub(crate) streams: Streams,
/// Contains all streams.
pub(crate) stream_maps: StreamMaps,
/// Set of peer public keys that should receive resulted data. /// Set of peer public keys that should receive resulted data.
pub(crate) next_peer_pks: Vec<String>, pub(crate) next_peer_pks: Vec<String>,

View File

@ -18,6 +18,7 @@ mod cid_state;
mod context; mod context;
mod last_error; mod last_error;
mod scalar_variables; mod scalar_variables;
mod stream_maps_variables;
mod streams_variables; mod streams_variables;
pub use last_error::*; pub use last_error::*;
@ -25,4 +26,5 @@ pub use last_error::*;
pub use cid_state::ExecutionCidState; pub use cid_state::ExecutionCidState;
pub(crate) use context::*; pub(crate) use context::*;
pub(crate) use scalar_variables::*; pub(crate) use scalar_variables::*;
pub(crate) use stream_maps_variables::*;
pub(crate) use streams_variables::*; pub(crate) use streams_variables::*;

View File

@ -0,0 +1,320 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
pub(crate) mod errors;
use crate::execution_step::ExecutionResult;
use crate::ExecutionError;
use crate::JValue;
use crate::execution_step::boxed_value::StreamMap;
use crate::execution_step::execution_context::streams_variables::utils::*;
use crate::execution_step::Generation;
use crate::execution_step::ValueAggregate;
use air_interpreter_data::GenerationIdx;
use air_interpreter_data::GlobalStreamGens;
use air_interpreter_data::RestrictedStreamGens;
use air_interpreter_data::RestrictedStreamMapGens;
use air_parser::ast::Span;
use air_parser::AirPos;
use air_trace_handler::merger::ValueSource;
use air_trace_handler::TraceHandler;
use serde::Serialize;
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;
use std::fmt;
pub(crate) struct StreamMapValueDescriptor<'stream_name> {
pub value: ValueAggregate,
pub name: &'stream_name str,
pub source: ValueSource,
pub generation: Generation,
pub position: AirPos,
}
impl<'stream_name> StreamMapValueDescriptor<'stream_name> {
pub fn new(
value: ValueAggregate,
name: &'stream_name str,
source: ValueSource,
generation: Generation,
position: AirPos,
) -> Self {
Self {
value,
name,
source,
generation,
position,
}
}
}
pub(crate) struct StreamMapDescriptor {
pub(super) span: Span,
pub(super) stream_map: StreamMap,
}
impl StreamMapDescriptor {
pub(super) fn global(stream_map: StreamMap) -> Self {
Self {
span: Span::new(0.into(), usize::MAX.into()),
stream_map,
}
}
#[allow(dead_code)]
pub(super) fn restricted(stream_map: StreamMap, span: Span) -> Self {
Self { span, stream_map }
}
}
impl fmt::Display for StreamMapDescriptor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, " <{}> - <{}>: {}", self.span.left, self.span.right, self.stream_map)
}
}
pub(super) fn find_closest<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d StreamMapDescriptor>,
position: AirPos,
) -> Option<&'d StreamMap> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&descriptor.stream_map);
}
}
None
}
pub(super) fn find_closest_mut<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamMapDescriptor>,
position: AirPos,
) -> Option<&'d mut StreamMap> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&mut descriptor.stream_map);
}
}
None
}
#[allow(dead_code)]
#[derive(Default)]
pub(crate) struct StreamMaps {
// this one is optimized for speed (not for memory), because it's unexpected
// that a script could have a lot of new.
// TODO: use shared string (Rc<String>) to avoid copying.
stream_maps: HashMap<String, Vec<StreamMapDescriptor>>,
/// Contains stream generations from previous data that a restricted stream
/// should have at the scope start.
previous_restricted_stream_maps_gens: RestrictedStreamMapGens,
/// Contains stream generations from current data that a restricted stream
/// should have at the scope start.
current_restricted_stream_maps_gens: RestrictedStreamMapGens,
/// Contains stream generations that each private stream had at the scope end.
/// Then it's placed into data
new_restricted_stream_maps_gens: RestrictedStreamMapGens,
}
impl StreamMaps {
#[allow(dead_code)]
pub(crate) fn from_data(
previous_global_streams: GlobalStreamGens,
current_global_streams: GlobalStreamGens,
previous_restricted_stream_maps_gens: RestrictedStreamGens,
current_restricted_stream_maps_gens: RestrictedStreamGens,
) -> Self {
let stream_maps = merge_global_stream_like(previous_global_streams, current_global_streams);
Self {
stream_maps,
previous_restricted_stream_maps_gens,
current_restricted_stream_maps_gens,
new_restricted_stream_maps_gens: <_>::default(),
}
}
pub(crate) fn get(&self, name: &str, position: AirPos) -> Option<&StreamMap> {
self.stream_maps
.get(name)
.and_then(|descriptors| find_closest(descriptors.iter(), position))
}
pub(crate) fn get_mut(&mut self, name: &str, position: AirPos) -> Option<&mut StreamMap> {
self.stream_maps
.get_mut(name)
.and_then(|descriptors| find_closest_mut(descriptors.iter_mut(), position))
}
pub(crate) fn add_stream_map_value(
&mut self,
key: &(impl Into<JValue> + Serialize + Clone),
value_descriptor: StreamMapValueDescriptor<'_>,
) -> ExecutionResult<GenerationIdx> {
let StreamMapValueDescriptor {
value,
name,
source,
generation,
position,
} = value_descriptor;
match self.get_mut(name, position) {
Some(stream_map) => stream_map.insert(key, value, generation, source),
None => {
// streams could be created in three ways:
// - after met new instruction with stream name that isn't present in streams
// (it's the only way to create restricted streams)
// - by calling add_global_stream with generation that come from data
// for global streams
// - and by this function, and if there is no such a streams in streams,
// it means that a new global one should be created.
let stream_map = StreamMap::from_value(key, value);
let descriptor = StreamMapDescriptor::global(stream_map);
self.stream_maps.insert(name.to_string(), vec![descriptor]);
let generation = 0;
Ok(generation.into())
}
}
}
#[allow(dead_code)]
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: usize) {
let name = name.into();
let (prev_gens_count, current_gens_count) = self.stream_generation_from_data(&name, span.left, iteration);
let new_stream_map = StreamMap::from_generations_count(prev_gens_count, current_gens_count);
let new_descriptor = StreamMapDescriptor::restricted(new_stream_map, span);
match self.stream_maps.entry(name) {
Occupied(mut entry) => {
entry.get_mut().push(new_descriptor);
}
Vacant(entry) => {
entry.insert(vec![new_descriptor]);
}
}
}
#[allow(dead_code)]
pub(crate) fn meet_scope_end(
&mut self,
name: String,
position: AirPos,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
// unwraps are safe here because met_scope_end must be called after met_scope_start
let stream_map_descriptors = self.stream_maps.get_mut(&name).unwrap();
// delete a stream after exit from a scope
let last_descriptor = stream_map_descriptors.pop().unwrap();
if stream_map_descriptors.is_empty() {
// streams should contain only non-empty stream embodiments
self.stream_maps.remove(&name);
}
let gens_count = last_descriptor.stream_map.compactify(trace_ctx)?;
self.collect_stream_generation(name, position, gens_count);
Ok(())
}
#[allow(dead_code)]
pub(crate) fn into_streams_data(
self,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<(GlobalStreamGens, RestrictedStreamGens)> {
// since it's called at the end of execution, streams contains only global ones,
// because all private's been deleted after exiting a scope
let global_stream_maps = self
.stream_maps
.into_iter()
.map(|(name, mut descriptors)| -> Result<_, ExecutionError> {
// unwrap is safe here because of invariant that streams contains non-empty vectors,
// moreover it must contain only one value, because this method is called at the end
// of the execution
let stream_map = descriptors.pop().unwrap().stream_map;
let gens_count = stream_map.compactify(trace_ctx)?;
Ok((name, gens_count))
})
.collect::<Result<GlobalStreamGens, _>>()?;
Ok((global_stream_maps, self.new_restricted_stream_maps_gens))
}
fn stream_generation_from_data(
&self,
name: &str,
position: AirPos,
iteration: usize,
) -> (GenerationIdx, GenerationIdx) {
let previous_generation =
Self::restricted_stream_generation(&self.previous_restricted_stream_maps_gens, name, position, iteration)
.unwrap_or_default();
let current_generation =
Self::restricted_stream_generation(&self.current_restricted_stream_maps_gens, name, position, iteration)
.unwrap_or_default();
(previous_generation, current_generation)
}
fn restricted_stream_generation(
restricted_stream_maps_gens: &RestrictedStreamGens,
name: &str,
position: AirPos,
iteration: usize,
) -> Option<GenerationIdx> {
restricted_stream_maps_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()
}
fn collect_stream_generation(&mut self, name: String, position: AirPos, generation: GenerationIdx) {
match self.new_restricted_stream_maps_gens.entry(name) {
Occupied(mut streams) => match streams.get_mut().entry(position) {
Occupied(mut iterations) => iterations.get_mut().push(generation),
Vacant(entry) => {
entry.insert(vec![generation]);
}
},
Vacant(entry) => {
let iterations = maplit::hashmap! {
position => vec![generation],
};
entry.insert(iterations);
}
}
}
}
impl fmt::Display for StreamMaps {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
for (name, descriptors) in self.stream_maps.iter() {
if let Some(last_descriptor) = descriptors.last() {
writeln!(f, "{name} => {last_descriptor}")?;
}
}
Ok(())
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use thiserror::Error as ThisError;
/// Describes errors related to applying lambdas to values.
#[derive(Debug, Clone, ThisError)]
pub enum StreamMapError {
#[error("map {variable_name} key can not be float")]
FloatMapKeyIsUnsupported { variable_name: String },
#[error("unsupported type for {variable_name} map's key")]
UnsupportedMapKeyType { variable_name: String },
#[error("there must be a key to add a value into {variable_name} map")]
MapKeyIsAbsent { variable_name: String },
}

View File

@ -14,9 +14,9 @@
* limitations under the License. * limitations under the License.
*/ */
mod stream_descriptor; pub(crate) mod stream_descriptor;
mod stream_value_descriptor; pub(crate) mod stream_value_descriptor;
mod utils; pub(crate) mod utils;
use crate::execution_step::ExecutionResult; use crate::execution_step::ExecutionResult;
use crate::execution_step::Stream; use crate::execution_step::Stream;

View File

@ -21,20 +21,20 @@ use air_parser::AirPos;
use std::fmt; use std::fmt;
pub(super) struct StreamDescriptor { pub(crate) struct StreamDescriptor {
pub(super) span: Span, pub(crate) span: Span,
pub(super) stream: Stream, pub(crate) stream: Stream,
} }
impl StreamDescriptor { impl StreamDescriptor {
pub(super) fn global(stream: Stream) -> Self { pub(crate) fn global(stream: Stream) -> Self {
Self { Self {
span: Span::new(0.into(), usize::MAX.into()), span: Span::new(0.into(), usize::MAX.into()),
stream, stream,
} }
} }
pub(super) fn restricted(stream: Stream, span: Span) -> Self { pub(crate) fn restricted(stream: Stream, span: Span) -> Self {
Self { span, stream } Self { span, stream }
} }
} }
@ -45,7 +45,7 @@ impl fmt::Display for StreamDescriptor {
} }
} }
pub(super) fn find_closest<'d>( pub(crate) fn find_closest<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d StreamDescriptor>, descriptors: impl DoubleEndedIterator<Item = &'d StreamDescriptor>,
position: AirPos, position: AirPos,
) -> Option<&'d Stream> { ) -> Option<&'d Stream> {
@ -59,7 +59,7 @@ pub(super) fn find_closest<'d>(
None None
} }
pub(super) fn find_closest_mut<'d>( pub(crate) fn find_closest_mut<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamDescriptor>, descriptors: impl DoubleEndedIterator<Item = &'d mut StreamDescriptor>,
position: AirPos, position: AirPos,
) -> Option<&'d mut Stream> { ) -> Option<&'d mut Stream> {

View File

@ -0,0 +1,74 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use crate::execution_step::StreamMap;
use air_parser::ast::Span;
use air_parser::AirPos;
use std::fmt;
pub(super) struct StreamMapDescriptor {
pub(super) span: Span,
pub(super) stream_map: StreamMap,
}
impl StreamMapDescriptor {
pub(super) fn global(stream_map: StreamMap) -> Self {
Self {
span: Span::new(0.into(), usize::MAX.into()),
stream_map,
}
}
pub(super) fn restricted(stream_map: StreamMap, span: Span) -> Self {
Self { span, stream_map }
}
}
impl fmt::Display for StreamMapDescriptor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, " <{}> - <{}>: {}", self.span.left, self.span.right, self.stream_map)
}
}
pub(super) fn find_closest<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d StreamMapDescriptor>,
position: AirPos,
) -> Option<&'d StreamMap> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&descriptor.stream_map);
}
}
None
}
pub(super) fn find_closest_mut<'d>(
descriptors: impl DoubleEndedIterator<Item = &'d mut StreamMapDescriptor>,
position: AirPos,
) -> Option<&'d mut StreamMap> {
// descriptors are placed in a order of decreasing scopes, so it's enough to get the latest suitable
for descriptor in descriptors.rev() {
if descriptor.span.contains_position(position) {
return Some(&mut descriptor.stream_map);
}
}
None
}

View File

@ -15,13 +15,15 @@
*/ */
use super::StreamDescriptor; use super::StreamDescriptor;
use crate::execution_step::boxed_value::StreamMap;
use crate::execution_step::execution_context::StreamMapDescriptor;
use crate::execution_step::Stream; use crate::execution_step::Stream;
use air_interpreter_data::GlobalStreamGens; use air_interpreter_data::{GlobalStreamGens, GlobalStreamMapGens};
use std::collections::HashMap; use std::collections::HashMap;
pub(super) fn merge_global_streams( pub(crate) fn merge_global_streams(
previous_global_streams: GlobalStreamGens, previous_global_streams: GlobalStreamGens,
current_global_streams: GlobalStreamGens, current_global_streams: GlobalStreamGens,
) -> HashMap<String, Vec<StreamDescriptor>> { ) -> HashMap<String, Vec<StreamDescriptor>> {
@ -47,3 +49,33 @@ pub(super) fn merge_global_streams(
global_streams global_streams
} }
pub(crate) fn merge_global_stream_like(
previous_global_streams: GlobalStreamMapGens,
current_global_streams: GlobalStreamMapGens,
) -> HashMap<String, Vec<StreamMapDescriptor>> {
let mut global_streams_like = previous_global_streams
.iter()
.map(|(stream_like_name, &prev_gens_count)| {
let current_gens_count = current_global_streams
.get(stream_like_name)
.cloned()
.unwrap_or_default();
let global_stream = StreamMap::from_generations_count(prev_gens_count, current_gens_count);
let descriptor = StreamMapDescriptor::global(global_stream);
(stream_like_name.to_string(), vec![descriptor])
})
.collect::<HashMap<_, _>>();
for (stream_like_name, current_gens_count) in current_global_streams {
if previous_global_streams.contains_key(&stream_like_name) {
continue;
}
let global_stream_map = StreamMap::from_generations_count(0.into(), current_gens_count);
let descriptor = StreamMapDescriptor::global(global_stream_map);
global_streams_like.insert(stream_like_name, vec![descriptor]);
}
global_streams_like
}

View File

@ -20,11 +20,17 @@ mod utils;
use super::ExecutionCtx; use super::ExecutionCtx;
use super::ExecutionResult; use super::ExecutionResult;
use super::TraceHandler; use super::TraceHandler;
use crate::execution_step::execution_context::errors::StreamMapError::FloatMapKeyIsUnsupported;
use crate::execution_step::execution_context::errors::StreamMapError::MapKeyIsAbsent;
use crate::execution_step::execution_context::errors::StreamMapError::UnsupportedMapKeyType;
use crate::execution_step::instructions::ValueAggregate; use crate::execution_step::instructions::ValueAggregate;
use crate::log_instruction; use crate::log_instruction;
use crate::trace_to_exec_err; use crate::trace_to_exec_err;
use crate::CatchableError;
use crate::JValue; use crate::JValue;
use crate::SecurityTetraplet; use crate::SecurityTetraplet;
use air_parser::ast::ApArgument;
use air_parser::ast::Number;
use apply_to_arguments::*; use apply_to_arguments::*;
use utils::*; use utils::*;
@ -46,7 +52,13 @@ impl<'i> super::ExecutableInstruction<'i> for Ap<'i> {
let result = apply_to_arg(&self.argument, exec_ctx, trace_ctx, should_touch_trace)?; let result = apply_to_arg(&self.argument, exec_ctx, trace_ctx, should_touch_trace)?;
let merger_ap_result = to_merger_ap_result(self, trace_ctx)?; let merger_ap_result = to_merger_ap_result(self, trace_ctx)?;
let maybe_generation = populate_context(&self.result, &merger_ap_result, result, exec_ctx)?; let maybe_generation = populate_context(
&self.result,
&merger_ap_result,
self.key_argument.as_ref(),
result,
exec_ctx,
)?;
maybe_update_trace(maybe_generation, trace_ctx); maybe_update_trace(maybe_generation, trace_ctx);
Ok(()) Ok(())
@ -56,13 +68,13 @@ impl<'i> super::ExecutableInstruction<'i> for Ap<'i> {
/// This function is intended to check whether a Ap instruction should produce /// This function is intended to check whether a Ap instruction should produce
/// a new state in data. /// a new state in data.
fn should_touch_trace(ap: &Ap<'_>) -> bool { fn should_touch_trace(ap: &Ap<'_>) -> bool {
matches!(ap.result, ast::ApResult::Stream(_)) matches!(ap.result, ast::ApResult::Stream(_) | ast::ApResult::StreamMap(_))
} }
fn to_merger_ap_result(instr: &Ap<'_>, trace_ctx: &mut TraceHandler) -> ExecutionResult<MergerApResult> { fn to_merger_ap_result(instr: &Ap<'_>, trace_ctx: &mut TraceHandler) -> ExecutionResult<MergerApResult> {
match instr.result { match instr.result {
ast::ApResult::Scalar(_) => Ok(MergerApResult::NotMet), ast::ApResult::Scalar(_) => Ok(MergerApResult::NotMet),
ast::ApResult::Stream(_) => { ast::ApResult::Stream(_) | ast::ApResult::StreamMap(_) => {
let merger_ap_result = trace_to_exec_err!(trace_ctx.meet_ap_start(), instr)?; let merger_ap_result = trace_to_exec_err!(trace_ctx.meet_ap_start(), instr)?;
Ok(merger_ap_result) Ok(merger_ap_result)
} }
@ -72,6 +84,7 @@ fn to_merger_ap_result(instr: &Ap<'_>, trace_ctx: &mut TraceHandler) -> Executio
fn populate_context<'ctx>( fn populate_context<'ctx>(
ap_result: &ast::ApResult<'ctx>, ap_result: &ast::ApResult<'ctx>,
merger_ap_result: &MergerApResult, merger_ap_result: &MergerApResult,
key_argument: Option<&ApArgument<'ctx>>,
result: ValueAggregate, result: ValueAggregate,
exec_ctx: &mut ExecutionCtx<'ctx>, exec_ctx: &mut ExecutionCtx<'ctx>,
) -> ExecutionResult<Option<GenerationIdx>> { ) -> ExecutionResult<Option<GenerationIdx>> {
@ -81,6 +94,32 @@ fn populate_context<'ctx>(
let value_descriptor = generate_value_descriptor(result, stream, merger_ap_result); let value_descriptor = generate_value_descriptor(result, stream, merger_ap_result);
exec_ctx.streams.add_stream_value(value_descriptor).map(Some) exec_ctx.streams.add_stream_value(value_descriptor).map(Some)
} }
ast::ApResult::StreamMap(stream_map) => {
let value_descriptor = generate_map_value_descriptor(result, stream_map, merger_ap_result);
match key_argument {
Some(key) => match key {
ApArgument::Literal(s) => exec_ctx.stream_maps.add_stream_map_value(s, value_descriptor).map(Some),
ApArgument::Number(n) => match n {
Number::Int(int) => exec_ctx
.stream_maps
.add_stream_map_value(int, value_descriptor)
.map(Some),
Number::Float(_) => Err(CatchableError::StreamMapError(FloatMapKeyIsUnsupported {
variable_name: String::from(stream_map.name),
})
.into()),
},
_ => Err(CatchableError::StreamMapError(UnsupportedMapKeyType {
variable_name: String::from(stream_map.name),
})
.into()),
},
None => Err(CatchableError::StreamMapError(MapKeyIsAbsent {
variable_name: String::from(stream_map.name),
})
.into()),
}
}
} }
} }

View File

@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
use crate::execution_step::execution_context::StreamMapValueDescriptor;
use crate::execution_step::execution_context::StreamValueDescriptor; use crate::execution_step::execution_context::StreamValueDescriptor;
use crate::execution_step::Generation; use crate::execution_step::Generation;
use crate::execution_step::ValueAggregate; use crate::execution_step::ValueAggregate;
@ -45,3 +46,28 @@ pub(super) fn generate_value_descriptor<'stream>(
), ),
} }
} }
pub(super) fn generate_map_value_descriptor<'stream>(
value: ValueAggregate,
stream: &'stream ast::StreamMap<'_>,
ap_result: &MergerApResult,
) -> StreamMapValueDescriptor<'stream> {
use air_trace_handler::merger::ValueSource;
match ap_result {
MergerApResult::NotMet => StreamMapValueDescriptor::new(
value,
stream.name,
ValueSource::PreviousData,
Generation::Last,
stream.position,
),
MergerApResult::Met(met_result) => StreamMapValueDescriptor::new(
value,
stream.name,
met_result.value_source,
Generation::Nth(met_result.generation),
stream.position,
),
}
}

View File

@ -115,6 +115,28 @@ pub(crate) fn construct_stream_iterable_values(
.collect::<Vec<_>>() .collect::<Vec<_>>()
} }
/// Constructs iterable value for given stream iterable.
pub(crate) fn construct_stream_map_iterable_values(
stream_map: &StreamMap,
start: Generation,
end: Generation,
) -> Vec<IterableValue> {
let stream_map_iter = match stream_map.slice_iter(start, end) {
Some(stream_map_iter) => stream_map_iter,
None => return vec![],
};
stream_map_iter
.filter(|iterable| !iterable.is_empty())
.map(|iterable| {
let call_results = iterable.to_vec();
let foldable = IterableVecResolvedCall::init(call_results);
let foldable: IterableValue = Box::new(foldable);
foldable
})
.collect::<Vec<_>>()
}
/// Constructs iterable value from resolved call result. /// Constructs iterable value from resolved call result.
fn from_value(call_result: ValueAggregate, variable_name: &str) -> ExecutionResult<FoldIterableScalar> { fn from_value(call_result: ValueAggregate, variable_name: &str) -> ExecutionResult<FoldIterableScalar> {
let len = match &call_result.result.deref() { let len = match &call_result.result.deref() {

View File

@ -14,7 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
mod completeness_updater; pub(crate) mod completeness_updater;
mod stream_cursor; mod stream_cursor;
use super::fold::*; use super::fold::*;

View File

@ -16,22 +16,22 @@
use super::ExecutionCtx; use super::ExecutionCtx;
pub(super) struct FoldGenerationObserver { pub(crate) struct FoldGenerationObserver {
subgraph_complete: bool, subgraph_complete: bool,
} }
impl FoldGenerationObserver { impl FoldGenerationObserver {
pub(super) fn new() -> Self { pub(crate) fn new() -> Self {
Self { Self {
subgraph_complete: false, subgraph_complete: false,
} }
} }
pub(super) fn observe_completeness(&mut self, completeness: bool) { pub(crate) fn observe_completeness(&mut self, completeness: bool) {
self.subgraph_complete |= completeness; self.subgraph_complete |= completeness;
} }
pub(super) fn update_completeness(self, exec_ctx: &mut ExecutionCtx<'_>) { pub(crate) fn update_completeness(self, exec_ctx: &mut ExecutionCtx<'_>) {
exec_ctx.set_subgraph_completeness(self.subgraph_complete); exec_ctx.set_subgraph_completeness(self.subgraph_complete);
} }
} }

View File

@ -0,0 +1,147 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod stream_map_cursor;
use super::fold::*;
use super::fold_scalar::fold;
use super::ExecutableInstruction;
use super::ExecutionCtx;
use super::ExecutionResult;
use super::TraceHandler;
use crate::execution_step::boxed_value::StreamMap;
use crate::execution_step::instructions::fold_stream::completeness_updater::FoldGenerationObserver;
use crate::execution_step::instructions::fold_stream_map::stream_map_cursor::StreamMapCursor;
use crate::log_instruction;
use crate::trace_to_exec_err;
use air_parser::ast;
use air_parser::ast::FoldStreamMap;
impl<'i> ExecutableInstruction<'i> for FoldStreamMap<'i> {
fn execute(&self, exec_ctx: &mut ExecutionCtx<'i>, trace_ctx: &mut TraceHandler) -> ExecutionResult<()> {
log_instruction!(fold, exec_ctx, trace_ctx);
exec_ctx.tracker.meet_fold_stream();
let iterable = &self.iterable;
let stream_map = match exec_ctx.stream_maps.get(iterable.name, iterable.position) {
Some(stream_map) => stream_map,
None => {
// having empty streams means that it haven't been met yet, and it's needed to wait
exec_ctx.make_subgraph_incomplete();
return Ok(());
}
};
let fold_id = exec_ctx.tracker.fold.seen_stream_count;
trace_to_exec_err!(trace_ctx.meet_fold_start(fold_id), self)?;
let mut stream_map_cursor = StreamMapCursor::new();
let mut stream_map_iterable = stream_map_cursor.construct_iterables(stream_map);
let mut observer = FoldGenerationObserver::new();
// this cycle manages recursive streams
while !stream_map_iterable.is_empty() {
// add a new generation to made all consequence "new" (meaning that they are just executed on this peer)
// write operation to this stream to write to this new generation
add_new_generation_if_non_empty(&self.iterable, exec_ctx);
execute_iterations(stream_map_iterable, self, fold_id, &mut observer, exec_ctx, trace_ctx)?;
// it's needed to get stream again, because RefCell allows only one mutable borrowing at time,
// and likely that stream could be mutably borrowed in execute_iterations
let stream_map = remove_new_generation_if_non_empty(&self.iterable, exec_ctx);
stream_map_iterable = stream_map_cursor.construct_iterables(stream_map)
}
observer.update_completeness(exec_ctx);
trace_to_exec_err!(trace_ctx.meet_fold_end(fold_id), self)?;
Ok(())
}
}
/// Executes fold iteration over all generation that stream had at the moment of call.
/// It must return only uncatchable errors (such as ones from TraceHandler), though
/// catchable errors are suppressed and not propagated from this function, because of determinism.
/// The issue with determinism here lies in invariant that all previous executed states
/// must be met.
fn execute_iterations<'i>(
iterables: Vec<IterableValue>,
fold_stream_map: &FoldStreamMap<'i>,
fold_id: u32,
generation_observer: &mut FoldGenerationObserver,
exec_ctx: &mut ExecutionCtx<'i>,
trace_ctx: &mut TraceHandler,
) -> ExecutionResult<()> {
for iterable in iterables.into_iter() {
let value = match iterable.peek() {
Some(value) => value,
// it's ok, because some generation level of a stream on some point inside execution
// flow could contain zero values
None => continue,
};
let value_pos = value.pos();
trace_to_exec_err!(trace_ctx.meet_iteration_start(fold_id, value_pos), fold_stream_map)?;
let result = fold(
iterable,
IterableType::Stream(fold_id),
fold_stream_map.iterator.name,
fold_stream_map.instruction.clone(),
fold_stream_map.last_instruction.clone(),
exec_ctx,
trace_ctx,
);
throw_error_if_not_catchable(result)?;
trace_to_exec_err!(trace_ctx.meet_generation_end(fold_id), fold_stream_map)?;
generation_observer.observe_completeness(exec_ctx.is_subgraph_complete());
}
Ok(())
}
/// Safety: this function should be called iff stream is present in context
fn add_new_generation_if_non_empty(stream_map: &ast::StreamMap<'_>, exec_ctx: &mut ExecutionCtx<'_>) {
let stream_map = exec_ctx
.stream_maps
.get_mut(stream_map.name, stream_map.position)
.unwrap();
stream_map.add_new_generation_if_non_empty();
}
/// Safety: this function should be called iff stream is present in context
fn remove_new_generation_if_non_empty<'ctx>(
stream_map: &ast::StreamMap<'_>,
exec_ctx: &'ctx mut ExecutionCtx<'_>,
) -> &'ctx StreamMap {
let stream_map = exec_ctx
.stream_maps
.get_mut(stream_map.name, stream_map.position)
.unwrap();
stream_map.remove_last_generation_if_empty();
stream_map
}
/// Fold over streams doesn't throw an error if it's a catchable one, because otherwise it would be
/// not deterministic.
fn throw_error_if_not_catchable(result: ExecutionResult<()>) -> ExecutionResult<()> {
match result {
Ok(_) => Ok(()),
Err(error) if error.is_catchable() => Ok(()),
error @ Err(_) => error,
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright 2023 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_interpreter_data::GenerationIdx;
use crate::execution_step::boxed_value::Generation;
use crate::execution_step::boxed_value::StreamMap;
use crate::execution_step::instructions::fold::construct_stream_map_iterable_values;
use crate::execution_step::instructions::fold::IterableValue;
pub(super) struct StreamMapCursor {
last_seen_generation: GenerationIdx,
}
impl StreamMapCursor {
pub(super) fn new() -> Self {
Self {
last_seen_generation: GenerationIdx::from(0),
}
}
pub(super) fn construct_iterables(&mut self, stream_map: &StreamMap) -> Vec<IterableValue> {
let iterables = construct_stream_map_iterable_values(
stream_map,
Generation::Nth(self.last_seen_generation),
Generation::Last,
);
self.last_seen_generation = stream_map.last_non_empty_generation();
iterables
}
}

View File

@ -22,6 +22,7 @@ mod fail;
mod fold; mod fold;
mod fold_scalar; mod fold_scalar;
mod fold_stream; mod fold_stream;
mod fold_stream_map;
mod match_; mod match_;
mod mismatch; mod mismatch;
mod never; mod never;
@ -81,6 +82,7 @@ impl<'i> ExecutableInstruction<'i> for Instruction<'i> {
Instruction::Fail(fail) => execute!(self, fail, exec_ctx, trace_ctx), Instruction::Fail(fail) => execute!(self, fail, exec_ctx, trace_ctx),
Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx), Instruction::FoldScalar(fold) => execute!(self, fold, exec_ctx, trace_ctx),
Instruction::FoldStream(fold) => execute!(self, fold, exec_ctx, trace_ctx), Instruction::FoldStream(fold) => execute!(self, fold, exec_ctx, trace_ctx),
Instruction::FoldStreamMap(fold) => execute!(self, fold, exec_ctx, trace_ctx),
Instruction::Never(never) => execute!(self, never, exec_ctx, trace_ctx), Instruction::Never(never) => execute!(self, never, exec_ctx, trace_ctx),
Instruction::New(new) => execute!(self, new, exec_ctx, trace_ctx), Instruction::New(new) => execute!(self, new, exec_ctx, trace_ctx),
Instruction::Next(next) => execute!(self, next, exec_ctx, trace_ctx), Instruction::Next(next) => execute!(self, next, exec_ctx, trace_ctx),

View File

@ -35,6 +35,7 @@ mod utils;
pub use air_interpreter_interface::InterpreterOutcome; pub use air_interpreter_interface::InterpreterOutcome;
pub use air_interpreter_interface::RunParameters; pub use air_interpreter_interface::RunParameters;
pub use air_interpreter_interface::INTERPRETER_SUCCESS; pub use air_interpreter_interface::INTERPRETER_SUCCESS;
pub use execution_step::execution_context::errors::StreamMapError;
pub use execution_step::execution_context::ExecutionCidState; pub use execution_step::execution_context::ExecutionCidState;
pub use execution_step::execution_context::LastError; pub use execution_step::execution_context::LastError;
pub use execution_step::CatchableError; pub use execution_step::CatchableError;

View File

@ -14,6 +14,7 @@
* limitations under the License. * limitations under the License.
*/ */
use air::ExecutionCidState;
use air_test_utils::prelude::*; use air_test_utils::prelude::*;
use std::cell::RefCell; use std::cell::RefCell;
@ -421,3 +422,67 @@ fn ap_canon_stream() {
let expected_tetraplet = RefCell::new(vec![vec![SecurityTetraplet::new(vm_1_peer_id, "", "", "")]]); let expected_tetraplet = RefCell::new(vec![vec![SecurityTetraplet::new(vm_1_peer_id, "", "", "")]]);
assert_eq!(tetraplet_checker.as_ref(), &expected_tetraplet); assert_eq!(tetraplet_checker.as_ref(), &expected_tetraplet);
} }
#[test]
fn ap_stream_map() {
let vm_1_peer_id = "vm_1_peer_id";
let mut vm_1 = create_avm(echo_call_service(), vm_1_peer_id);
let service_name1 = "serv1";
let service_name2 = "serv2";
let script = f!(r#"
(seq
(seq
(ap "{vm_1_peer_id}" "{service_name1}" %map)
(ap "{vm_1_peer_id}" "{service_name2}" %map)
)
(fold %map i
(seq
(call i.$.key (i.$.key i.$.value) [i] u)
(next i)
)
)
)
"#);
let result = checked_call_vm!(vm_1, <_>::default(), &script, "", "");
let actual_trace = trace_from_result(&result);
let generation_idx = 0;
let mut cid_tracker = ExecutionCidState::new();
let service_result1 = json!({
"key": vm_1_peer_id,
"value": service_name1,
});
let service_result2 = json!({
"key": vm_1_peer_id,
"value": service_name2,
});
let service_args1 = vec![service_result1.clone()];
let service_args2 = vec![service_result2.clone()];
let expected_state = ExecutionTrace::from(vec![
executed_state::ap(generation_idx),
executed_state::ap(generation_idx),
executed_state::fold(vec![
subtrace_lore(0, SubTraceDesc::new(3.into(), 1), SubTraceDesc::new(5.into(), 0)),
subtrace_lore(1, SubTraceDesc::new(4.into(), 1), SubTraceDesc::new(5.into(), 0)),
]),
scalar_tracked!(
service_result1,
cid_tracker,
peer = vm_1_peer_id,
service = vm_1_peer_id,
function = service_name1,
args = service_args1
),
scalar_tracked!(
service_result2,
cid_tracker,
peer = vm_1_peer_id,
service = vm_1_peer_id,
function = service_name2,
args = service_args2
),
]);
assert_eq!(actual_trace, expected_state);
}

View File

@ -16,6 +16,9 @@
use air::CatchableError; use air::CatchableError;
use air::LambdaError; use air::LambdaError;
use air::StreamMapError::FloatMapKeyIsUnsupported;
use air::StreamMapError::UnsupportedMapKeyType;
use air_test_utils::prelude::*; use air_test_utils::prelude::*;
#[test] #[test]
@ -542,3 +545,38 @@ fn canon_stream_not_have_enough_values_call_arg() {
CatchableError::LambdaApplierError(LambdaError::CanonStreamNotHaveEnoughValues { stream_size: 0, idx: 0 }); CatchableError::LambdaApplierError(LambdaError::CanonStreamNotHaveEnoughValues { stream_size: 0, idx: 0 });
assert!(check_error(&result, expected_error)); assert!(check_error(&result, expected_error));
} }
#[test]
fn float_map_key_is_unsupported() {
let mut local_vm = create_avm(echo_call_service(), "local_peer_id");
let map_name = "%map";
let join_stream_script = f!(r#"
(ap 0.5 "serv1" %map)
"#);
let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap();
let expected_error = CatchableError::StreamMapError(FloatMapKeyIsUnsupported {
variable_name: String::from(map_name),
});
assert!(check_error(&result, expected_error));
}
#[test]
fn unsupported_map_key_type() {
let mut local_vm = create_avm(echo_call_service(), "local_peer_id");
let map_name = "%map";
let join_stream_script = f!(r#"
(seq
(ap "a" some)
(ap some "serv1" %map)
)
"#);
let result = local_vm.call(&join_stream_script, "", "", <_>::default()).unwrap();
let expected_error = CatchableError::StreamMapError(UnsupportedMapKeyType {
variable_name: String::from(map_name),
});
assert!(check_error(&result, expected_error));
}

View File

@ -24,6 +24,7 @@ use super::ImmutableVariableWithLambda;
use super::Scalar; use super::Scalar;
use super::ScalarWithLambda; use super::ScalarWithLambda;
use super::Stream; use super::Stream;
use super::StreamMap;
use air_lambda_ast::LambdaAST; use air_lambda_ast::LambdaAST;
@ -109,6 +110,8 @@ pub enum ApResult<'i> {
Scalar(Scalar<'i>), Scalar(Scalar<'i>),
#[serde(borrow)] #[serde(borrow)]
Stream(Stream<'i>), Stream(Stream<'i>),
#[serde(borrow)]
StreamMap(StreamMap<'i>),
} }
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)] #[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]

View File

@ -19,6 +19,7 @@ use super::CallOutputValue;
use super::NewArgument; use super::NewArgument;
use super::Scalar; use super::Scalar;
use super::Stream; use super::Stream;
use super::StreamMap;
use crate::parser::lexer::AirPos; use crate::parser::lexer::AirPos;
impl<'i> NewArgument<'i> { impl<'i> NewArgument<'i> {
@ -40,10 +41,15 @@ impl<'i> ApResult<'i> {
Self::Stream(Stream { name, position }) Self::Stream(Stream { name, position })
} }
pub fn stream_map(name: &'i str, position: AirPos) -> Self {
Self::StreamMap(StreamMap { name, position })
}
pub fn name(&self) -> &'i str { pub fn name(&self) -> &'i str {
match self { match self {
Self::Scalar(scalar) => scalar.name, Self::Scalar(scalar) => scalar.name,
Self::Stream(stream) => stream.name, Self::Stream(stream) => stream.name,
Self::StreamMap(stream_map) => stream_map.name,
} }
} }
} }

View File

@ -25,6 +25,7 @@ impl fmt::Display for ApResult<'_> {
match self { match self {
Scalar(scalar) => write!(f, "{scalar}"), Scalar(scalar) => write!(f, "{scalar}"),
Stream(stream) => write!(f, "{stream}"), Stream(stream) => write!(f, "{stream}"),
StreamMap(stream_map) => write!(f, "{stream_map}"),
} }
} }
} }

View File

@ -38,6 +38,7 @@ pub enum Instruction<'i> {
Fail(Fail<'i>), Fail(Fail<'i>),
FoldScalar(FoldScalar<'i>), FoldScalar(FoldScalar<'i>),
FoldStream(FoldStream<'i>), FoldStream(FoldStream<'i>),
FoldStreamMap(FoldStreamMap<'i>),
Never(Never), Never(Never),
New(New<'i>), New(New<'i>),
Next(Next<'i>), Next(Next<'i>),
@ -56,6 +57,7 @@ pub struct Call<'i> {
/// (ap argument result) /// (ap argument result)
#[derive(Serialize, Debug, PartialEq)] #[derive(Serialize, Debug, PartialEq)]
pub struct Ap<'i> { pub struct Ap<'i> {
pub key_argument: Option<ApArgument<'i>>,
pub argument: ApArgument<'i>, pub argument: ApArgument<'i>,
pub result: ApResult<'i>, pub result: ApResult<'i>,
} }
@ -136,6 +138,18 @@ pub struct FoldStream<'i> {
pub span: Span, pub span: Span,
} }
/// (fold stream_iterable iterator instruction)
#[derive(Serialize, Debug, PartialEq)]
pub struct FoldStreamMap<'i> {
pub iterable: StreamMap<'i>,
#[serde(borrow)]
pub iterator: Scalar<'i>,
pub instruction: Rc<Instruction<'i>>,
// option is needed to provide a graceful period of adoption
pub last_instruction: Option<Rc<Instruction<'i>>>,
pub span: Span,
}
/// (fold stream_iterable iterator instruction) /// (fold stream_iterable iterator instruction)
#[derive(Serialize, Debug, PartialEq, Eq)] #[derive(Serialize, Debug, PartialEq, Eq)]
pub struct Next<'i> { pub struct Next<'i> {

View File

@ -17,8 +17,23 @@
use super::*; use super::*;
impl<'i> Ap<'i> { impl<'i> Ap<'i> {
pub fn new(argument: ApArgument<'i>, result: ApResult<'i>) -> Self { pub fn new(
Self { argument, result } key_option: Option<ApArgument<'i>>,
argument: ApArgument<'i>,
result: ApResult<'i>,
) -> Self {
match key_option {
Some(key) => Self {
key_argument: Some(key),
argument,
result,
},
None => Self {
key_argument: None,
argument,
result,
},
}
} }
} }
@ -141,6 +156,24 @@ impl<'i> FoldStream<'i> {
} }
} }
impl<'i> FoldStreamMap<'i> {
pub fn new(
iterable: StreamMap<'i>,
iterator: Scalar<'i>,
instruction: Instruction<'i>,
last_instruction: Option<Instruction<'i>>,
span: Span,
) -> Self {
Self {
iterable,
iterator,
instruction: Rc::new(instruction),
last_instruction: last_instruction.map(Rc::new),
span,
}
}
}
impl<'i> Next<'i> { impl<'i> Next<'i> {
pub fn new(iterator: Scalar<'i>) -> Self { pub fn new(iterator: Scalar<'i>) -> Self {
Self { iterator } Self { iterator }

View File

@ -34,6 +34,7 @@ impl fmt::Display for Instruction<'_> {
Fail(fail) => write!(f, "{fail}"), Fail(fail) => write!(f, "{fail}"),
FoldScalar(fold) => write!(f, "{fold}"), FoldScalar(fold) => write!(f, "{fold}"),
FoldStream(fold) => write!(f, "{fold}"), FoldStream(fold) => write!(f, "{fold}"),
FoldStreamMap(fold) => write!(f, "{fold}"),
Never(never) => write!(f, "{never}"), Never(never) => write!(f, "{never}"),
Next(next) => write!(f, "{next}"), Next(next) => write!(f, "{next}"),
New(new) => write!(f, "{new}"), New(new) => write!(f, "{new}"),
@ -97,6 +98,12 @@ impl fmt::Display for FoldStream<'_> {
} }
} }
impl fmt::Display for FoldStreamMap<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "fold {} {}", self.iterable, self.iterator)
}
}
impl fmt::Display for Seq<'_> { impl fmt::Display for Seq<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "seq") write!(f, "seq")

View File

@ -79,3 +79,10 @@ pub enum ImmutableVariableWithLambda<'i> {
#[serde(borrow)] #[serde(borrow)]
CanonStream(CanonStreamWithLambda<'i>), CanonStream(CanonStreamWithLambda<'i>),
} }
/// A map based on top of a stream.
#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
pub struct StreamMap<'i> {
pub name: &'i str,
pub position: AirPos,
}

View File

@ -119,3 +119,9 @@ impl<'i> ImmutableVariableWithLambda<'i> {
Self::Scalar(scalar) Self::Scalar(scalar)
} }
} }
impl<'i> StreamMap<'i> {
pub fn new(name: &'i str, position: AirPos) -> Self {
Self { name, position }
}
}

View File

@ -68,3 +68,9 @@ impl fmt::Display for ImmutableVariableWithLambda<'_> {
} }
} }
} }
impl fmt::Display for StreamMap<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{}", self.name)
}
}

View File

@ -36,7 +36,16 @@ Instr: Box<Instruction<'input>> = {
<left: @L> "(" ap <arg:ApArgument> <result:ApResult> ")" <right: @R> => { <left: @L> "(" ap <arg:ApArgument> <result:ApResult> ")" <right: @R> => {
let apply = Ap::new(arg, result); let apply = Ap::new(None, arg, result);
let span = Span::new(left, right);
validator.met_ap(&apply, span);
Box::new(Instruction::Ap(apply))
},
<left: @L> "(" ap <key:ApArgument> <arg:ApArgument> <result:ApResult> ")" <right: @R> => {
let apply = Ap::new(Some(key), arg, result);
let span = Span::new(left, right); let span = Span::new(left, right);
validator.met_ap(&apply, span); validator.met_ap(&apply, span);
@ -83,6 +92,17 @@ Instr: Box<Instruction<'input>> = {
Box::new(Instruction::FoldStream(fold)) Box::new(Instruction::FoldStream(fold))
}, },
<left: @L> "(" fold <stream_map:StreamMap> <iterator:Scalar> <instruction:Instr> <last_instruction:Instr?> ")" <right: @R> => {
let iterable = StreamMap::new(stream_map.0, stream_map.1);
let iterator = Scalar::new(iterator.0, iterator.1);
let span = Span::new(left, right);
let fold = FoldStreamMap::new(iterable, iterator, *instruction, last_instruction.map(|v| *v), span);
validator.meet_fold_stream_map(&fold, span);
Box::new(Instruction::FoldStreamMap(fold))
},
<left: @L> "(" next <iterator:Scalar> ")" <right: @R> => { <left: @L> "(" next <iterator:Scalar> ")" <right: @R> => {
let iterator = Scalar::new(iterator.0, iterator.1); let iterator = Scalar::new(iterator.0, iterator.1);
let next = Next::new(iterator); let next = Next::new(iterator);
@ -128,6 +148,7 @@ Triplet: Triplet<'input> = {
ApResult: ApResult<'input> = { ApResult: ApResult<'input> = {
<scalar:Scalar> => ApResult::scalar(scalar.0, scalar.1), <scalar:Scalar> => ApResult::scalar(scalar.0, scalar.1),
<stream:Stream> => ApResult::stream(stream.0, stream.1), <stream:Stream> => ApResult::stream(stream.0, stream.1),
<stream_map:StreamMap> => ApResult::stream_map(stream_map.0, stream_map.1),
}; };
CallOutput: CallOutputValue<'input> = { CallOutput: CallOutputValue<'input> = {
@ -240,6 +261,7 @@ extern {
Scalar => Token::Scalar { name:<&'input str>, position: <AirPos> }, Scalar => Token::Scalar { name:<&'input str>, position: <AirPos> },
ScalarWithLambda => Token::ScalarWithLambda { name: <&'input str>, lambda: <LambdaAST<'input>>, position: <AirPos> }, ScalarWithLambda => Token::ScalarWithLambda { name: <&'input str>, lambda: <LambdaAST<'input>>, position: <AirPos> },
Stream => Token::Stream { name: <&'input str>, position: <AirPos> }, Stream => Token::Stream { name: <&'input str>, position: <AirPos> },
StreamMap => Token::StreamMap { name: <&'input str>, position: <AirPos> },
CanonStream => Token::CanonStream { name: <&'input str>, position: <AirPos> }, CanonStream => Token::CanonStream { name: <&'input str>, position: <AirPos> },
CanonStreamWithLambda => Token::CanonStreamWithLambda {name: <&'input str>, lambda:<LambdaAST<'input>>, position: <AirPos>}, CanonStreamWithLambda => Token::CanonStreamWithLambda {name: <&'input str>, lambda:<LambdaAST<'input>>, position: <AirPos>},

File diff suppressed because it is too large Load Diff

View File

@ -46,6 +46,13 @@ pub enum ParserError {
"multiple next instructions for iterator '{iterator_name}' found for one fold, that is prohibited" "multiple next instructions for iterator '{iterator_name}' found for one fold, that is prohibited"
)] )]
MultipleNextInFold { span: Span, iterator_name: String }, MultipleNextInFold { span: Span, iterator_name: String },
#[error("unsupported variable key type in (ap {ap_key_type} value {ap_result_name})")]
UnsupportedMapKeyType {
span: Span,
ap_key_type: String,
ap_result_name: String,
},
} }
impl ParserError { impl ParserError {
@ -59,6 +66,7 @@ impl ParserError {
Self::IteratorRestrictionNotAllowed { span, .. } => *span, Self::IteratorRestrictionNotAllowed { span, .. } => *span,
Self::MultipleIterableValuesForOneIterator { span, .. } => *span, Self::MultipleIterableValuesForOneIterator { span, .. } => *span,
Self::MultipleNextInFold { span, .. } => *span, Self::MultipleNextInFold { span, .. } => *span,
Self::UnsupportedMapKeyType { span, .. } => *span,
} }
} }
@ -96,6 +104,18 @@ impl ParserError {
iterator_name: iterator_name.into(), iterator_name: iterator_name.into(),
} }
} }
pub fn unsupported_map_key_type(
span: Span,
ap_key_type: impl Into<String>,
ap_result_name: impl Into<String>,
) -> Self {
Self::UnsupportedMapKeyType {
span,
ap_key_type: ap_key_type.into(),
ap_result_name: ap_result_name.into(),
}
}
} }
impl From<std::convert::Infallible> for ParserError { impl From<std::convert::Infallible> for ParserError {

View File

@ -34,6 +34,7 @@ pub(super) fn try_parse_call_variable(
enum MetTag { enum MetTag {
None, None,
Stream, Stream,
StreamMap,
CanonStream, CanonStream,
} }
@ -291,6 +292,10 @@ impl<'input> CallVariableParser<'input> {
name, name,
position: self.start_pos, position: self.start_pos,
}, },
MetTag::StreamMap => Token::StreamMap {
name,
position: self.start_pos,
},
} }
} }
@ -311,6 +316,11 @@ impl<'input> CallVariableParser<'input> {
lambda, lambda,
position: self.start_pos, position: self.start_pos,
}, },
MetTag::StreamMap => Token::StreamMapWithLambda {
name,
lambda,
position: self.start_pos,
},
} }
} }
@ -378,6 +388,7 @@ impl MetTag {
match tag { match tag {
'$' => Self::Stream, '$' => Self::Stream,
'#' => Self::CanonStream, '#' => Self::CanonStream,
'%' => Self::StreamMap,
_ => Self::None, _ => Self::None,
} }
} }

View File

@ -198,6 +198,23 @@ fn stream() {
); );
} }
#[test]
fn stream_map() {
const STREAM_MAP: &str = "%stream_map____asdasd";
lexer_test(
STREAM_MAP,
Single(Ok((
0.into(),
Token::StreamMap {
name: STREAM_MAP,
position: 0.into(),
},
STREAM_MAP.len().into(),
))),
);
}
#[test] #[test]
fn canon_stream() { fn canon_stream() {
const CANON_STREAM: &str = "#stream____asdasd"; const CANON_STREAM: &str = "#stream____asdasd";

View File

@ -46,6 +46,11 @@ pub enum Token<'input> {
lambda: LambdaAST<'input>, lambda: LambdaAST<'input>,
position: AirPos, position: AirPos,
}, },
StreamMapWithLambda {
name: &'input str,
lambda: LambdaAST<'input>,
position: AirPos,
},
CanonStream { CanonStream {
name: &'input str, name: &'input str,
position: AirPos, position: AirPos,
@ -55,6 +60,10 @@ pub enum Token<'input> {
lambda: LambdaAST<'input>, lambda: LambdaAST<'input>,
position: AirPos, position: AirPos,
}, },
StreamMap {
name: &'input str,
position: AirPos,
},
StringLiteral(&'input str), StringLiteral(&'input str),
I64(i64), I64(i64),

View File

@ -184,3 +184,101 @@ fn ap_with_canon_stream_with_lambda() {
assert_eq!(actual, expected); assert_eq!(actual, expected);
} }
#[test]
fn ap_with_stream_map() {
// 4 variants
let var_name = "%stream_map";
let key_name = "keyo";
let value = "some_string";
let source_code = format!(
r#"
(ap "{key_name}" "{value}" %stream_map)
"#
);
let actual = parse(source_code.as_str());
let expected = ap_with_map(
ApArgument::Literal(key_name),
ApArgument::Literal(value),
ApResult::StreamMap(StreamMap::new(
var_name,
source_code.find(var_name).unwrap().into(),
)),
);
assert_eq!(actual, expected);
// It is possible to use Scalar as a key in the context of a parser
// but populate_context will raise an error
let source_code = format!(
r#"
(ap {key_name} "{value}" %stream_map)
"#
);
let actual = parse(source_code.as_str());
let expected = ap_with_map(
ApArgument::Scalar(Scalar::new(
key_name,
source_code.find(key_name).unwrap().into(),
)),
ApArgument::Literal(value),
ApResult::StreamMap(StreamMap::new(
var_name,
source_code.find(var_name).unwrap().into(),
)),
);
assert_eq!(actual, expected);
let source_code = format!(
r#"
(ap "{key_name}" {value} %stream_map)
"#
);
let actual = parse(source_code.as_str());
let expected = ap_with_map(
ApArgument::Literal(key_name),
ApArgument::Scalar(Scalar::new(value, source_code.find(value).unwrap().into())),
ApResult::StreamMap(StreamMap::new(
var_name,
source_code.find(var_name).unwrap().into(),
)),
);
assert_eq!(actual, expected);
// It is possible to use Scalar as a key in the context of a parser
// but populate_context will raise an error
let source_code = format!(
r#"
(ap {key_name} {value} %stream_map)
"#
);
let actual = parse(source_code.as_str());
let expected = ap_with_map(
ApArgument::Scalar(Scalar::new(
key_name,
source_code.find(key_name).unwrap().into(),
)),
ApArgument::Scalar(Scalar::new(value, source_code.find(value).unwrap().into())),
ApResult::StreamMap(StreamMap::new(
var_name,
source_code.find(var_name).unwrap().into(),
)),
);
assert_eq!(actual, expected);
let key_name = 123;
let source_code = format!(
r#"
(ap {key_name} {value} %stream_map)
"#
);
let actual = parse(source_code.as_str());
let expected = ap_with_map(
ApArgument::Number(Number::Int(key_name)),
ApArgument::Scalar(Scalar::new(value, source_code.find(value).unwrap().into())),
ApResult::StreamMap(StreamMap::new(
var_name,
source_code.find(var_name).unwrap().into(),
)),
);
assert_eq!(actual, expected);
}

View File

@ -196,7 +196,23 @@ pub(super) fn mismatch<'i>(
} }
pub(super) fn ap<'i>(argument: ApArgument<'i>, result: ApResult<'i>) -> Instruction<'i> { pub(super) fn ap<'i>(argument: ApArgument<'i>, result: ApResult<'i>) -> Instruction<'i> {
Instruction::Ap(Ap { argument, result }) Instruction::Ap(Ap {
key_argument: None,
argument,
result,
})
}
pub(super) fn ap_with_map<'i>(
key: ApArgument<'i>,
argument: ApArgument<'i>,
result: ApResult<'i>,
) -> Instruction<'i> {
Instruction::Ap(Ap {
key_argument: Some(key),
argument,
result,
})
} }
pub(super) fn canon<'i>( pub(super) fn canon<'i>(

View File

@ -58,6 +58,10 @@ pub struct VariableValidator<'i> {
/// Contains all names that should be checked that they are not iterators. /// Contains all names that should be checked that they are not iterators.
not_iterators_candidates: Vec<(&'i str, Span)>, not_iterators_candidates: Vec<(&'i str, Span)>,
// This contains info about unssuported map key arguments used with ap instruction,
// namely (key map ApArgument)
unsupported_map_keys: Vec<(String, &'i str, Span)>,
} }
impl<'i> VariableValidator<'i> { impl<'i> VariableValidator<'i> {
@ -112,6 +116,11 @@ impl<'i> VariableValidator<'i> {
self.met_iterator_definition(&fold.iterator, span); self.met_iterator_definition(&fold.iterator, span);
} }
pub(super) fn meet_fold_stream_map(&mut self, fold: &FoldStreamMap<'i>, span: Span) {
self.met_variable_name(fold.iterable.name, span);
self.met_iterator_definition(&fold.iterator, span);
}
pub(super) fn met_new(&mut self, new: &New<'i>, span: Span) { pub(super) fn met_new(&mut self, new: &New<'i>, span: Span) {
self.not_iterators_candidates self.not_iterators_candidates
.push((new.argument.name(), span)); .push((new.argument.name(), span));
@ -129,6 +138,21 @@ impl<'i> VariableValidator<'i> {
} }
pub(super) fn met_ap(&mut self, ap: &Ap<'i>, span: Span) { pub(super) fn met_ap(&mut self, ap: &Ap<'i>, span: Span) {
match &ap.key_argument {
Some(key) => match key {
ApArgument::Literal(_) | ApArgument::Number(_) => {}
ApArgument::Scalar(scalar) => self.met_scalar(scalar, span),
ApArgument::ScalarWithLambda(scalar) => self.met_scalar_wl(scalar, span),
ApArgument::CanonStreamWithLambda(stream) => self.met_canon_stream_wl(stream, span),
_ => {
let key_type = key.to_string();
self.unsupported_map_keys
.push((key_type, ap.result.name(), span));
}
},
None => (),
}
match &ap.argument { match &ap.argument {
ApArgument::Number(_) ApArgument::Number(_)
| ApArgument::Timestamp | ApArgument::Timestamp
@ -155,6 +179,7 @@ impl<'i> VariableValidator<'i> {
.check_multiple_next_in_fold() .check_multiple_next_in_fold()
.check_new_on_iterators() .check_new_on_iterators()
.check_iterator_for_multiple_definitions() .check_iterator_for_multiple_definitions()
.check_for_unsupported_map_keys()
.build() .build()
} }
@ -417,6 +442,19 @@ impl<'i> ValidatorErrorBuilder<'i> {
self self
} }
// Unsupported StreamMap keys check, e.g. Stream can not be a map key (ap $stream "value" %map)
fn check_for_unsupported_map_keys(mut self) -> Self {
for (arg_key_type, ap_result_name, span) in self.validator.unsupported_map_keys.iter_mut() {
let error = ParserError::unsupported_map_key_type(
*span,
arg_key_type.to_string(),
*ap_result_name,
);
add_to_errors(&mut self.errors, *span, Token::New, error);
}
self
}
fn build(self) -> Vec<ErrorRecovery<AirPos, Token<'i>, ParserError>> { fn build(self) -> Vec<ErrorRecovery<AirPos, Token<'i>, ParserError>> {
self.errors self.errors
} }

View File

@ -22,6 +22,7 @@ use std::collections::HashMap;
/// Mapping from a stream name to it's generation count. /// Mapping from a stream name to it's generation count.
/// Similar to pi-calculus non-restricted names/channels. /// Similar to pi-calculus non-restricted names/channels.
pub type GlobalStreamGens = HashMap<String, GenerationIdx>; pub type GlobalStreamGens = HashMap<String, GenerationIdx>;
pub type GlobalStreamMapGens = GlobalStreamGens;
/// Mapping from a stream name to /// Mapping from a stream name to
/// position of a new instruction in a script that creates a scope for a stream /// position of a new instruction in a script that creates a scope for a stream
@ -32,3 +33,4 @@ pub type GlobalStreamGens = HashMap<String, GenerationIdx>;
/// where it was met. /// where it was met.
/// Similar to pi-calculus restricted names/channels. /// Similar to pi-calculus restricted names/channels.
pub type RestrictedStreamGens = HashMap<String, HashMap<AirPos, Vec<GenerationIdx>>>; pub type RestrictedStreamGens = HashMap<String, HashMap<AirPos, Vec<GenerationIdx>>>;
pub type RestrictedStreamMapGens = RestrictedStreamGens;

View File

@ -138,6 +138,9 @@ impl<W: io::Write> Beautifier<W> {
ast::Instruction::FoldStream(fold_stream) => { ast::Instruction::FoldStream(fold_stream) => {
self.beautify_fold_stream(fold_stream, indent) self.beautify_fold_stream(fold_stream, indent)
} }
ast::Instruction::FoldStreamMap(fold_stream_map) => {
self.beautify_fold_stream_map(fold_stream_map, indent)
}
ast::Instruction::Never(never) => self.beautify_simple(never, indent), ast::Instruction::Never(never) => self.beautify_simple(never, indent),
ast::Instruction::New(new) => self.beautify_new(new, indent), ast::Instruction::New(new) => self.beautify_new(new, indent),
ast::Instruction::Next(next) => self.beautify_simple(next, indent), ast::Instruction::Next(next) => self.beautify_simple(next, indent),
@ -215,6 +218,14 @@ impl<W: io::Write> Beautifier<W> {
compound!(self, indent, fold) compound!(self, indent, fold)
} }
fn beautify_fold_stream_map(
&mut self,
fold: &ast::FoldStreamMap<'_>,
indent: usize,
) -> io::Result<()> {
compound!(self, indent, fold)
}
fn beautify_new(&mut self, new: &ast::New<'_>, indent: usize) -> io::Result<()> { fn beautify_new(&mut self, new: &ast::New<'_>, indent: usize) -> io::Result<()> {
compound!(self, indent, new) compound!(self, indent, new)
} }