Populate restricted streams generations from data (#301)

This commit is contained in:
Mike Voronov
2022-09-05 17:57:26 +03:00
committed by GitHub
parent 2d400b4028
commit 1cb6901caa
5 changed files with 77 additions and 23 deletions

View File

@@ -20,6 +20,7 @@ use super::Scalars;
use super::Streams;
use air_execution_info_collector::InstructionTracker;
use air_interpreter_data::InterpreterData;
use air_interpreter_interface::*;
use std::rc::Rc;
@@ -65,14 +66,16 @@ pub(crate) struct ExecutionCtx<'i> {
}
impl<'i> ExecutionCtx<'i> {
pub(crate) fn new(run_parameters: RunParameters, call_results: CallResults, last_call_request_id: u32) -> Self {
pub(crate) fn new(prev_data: &InterpreterData, call_results: CallResults, run_parameters: RunParameters) -> Self {
let run_parameters = RcRunParameters::from_run_parameters(run_parameters);
let streams = Streams::from_data(&prev_data.global_streams, prev_data.restricted_streams.clone());
Self {
run_parameters,
subgraph_complete: true,
last_call_request_id,
last_call_request_id: prev_data.last_call_request_id,
call_results,
streams,
..<_>::default()
}
}

View File

@@ -33,7 +33,7 @@ pub(crate) struct Streams {
streams: HashMap<String, Vec<StreamDescriptor>>,
/// Contains stream generation that private stream should have at the scope start.
data_restr_stream_generations: RestrictedStreamGens,
data_restricted_stream_gens: RestrictedStreamGens,
/// Contains stream generations that each private stream had at the scope end.
/// Then it's placed into data
@@ -46,6 +46,26 @@ struct StreamDescriptor {
}
impl Streams {
pub(crate) fn from_data(
global_streams: &GlobalStreamGens,
data_restricted_stream_gens: RestrictedStreamGens,
) -> Self {
let global_streams = global_streams
.iter()
.map(|(stream_name, &generations_count)| {
let global_stream = Stream::from_generations_count(generations_count as usize);
let descriptor = StreamDescriptor::global(global_stream);
(stream_name.to_string(), vec![descriptor])
})
.collect::<HashMap<_, _>>();
Self {
streams: global_streams,
data_restricted_stream_gens,
collected_restricted_stream_gens: <_>::default(),
}
}
pub(crate) fn get(&self, name: &str, position: usize) -> Option<&Stream> {
self.streams
.get(name)
@@ -76,18 +96,14 @@ impl Streams {
// - and by this function, and if there is no such a streams in streams,
// it means that a new global one should be created.
let stream = Stream::from_value(value);
self.add_global_stream(stream_name.to_string(), stream);
let descriptor = StreamDescriptor::global(stream);
self.streams.insert(stream_name.to_string(), vec![descriptor]);
let generation = 0;
Ok(generation)
}
}
}
pub(crate) fn add_global_stream(&mut self, name: String, stream: Stream) {
let descriptor = StreamDescriptor::global(stream);
self.streams.insert(name, vec![descriptor]);
}
pub(crate) fn meet_scope_start(&mut self, name: impl Into<String>, span: Span, iteration: u32) {
let name = name.into();
let generations_count = self
@@ -140,7 +156,7 @@ impl Streams {
}
fn stream_generation_from_data(&self, name: &str, position: u32, iteration: usize) -> Option<u32> {
self.data_restr_stream_generations
self.data_restricted_stream_gens
.get(name)
.and_then(|scopes| scopes.get(&position).and_then(|iterations| iterations.get(iteration)))
.copied()

View File

@@ -16,7 +16,6 @@
use super::PreparationError;
use crate::execution_step::ExecutionCtx;
use crate::execution_step::Stream;
use crate::execution_step::TraceHandler;
use air_interpreter_data::InterpreterData;
@@ -73,17 +72,6 @@ fn make_exec_ctx(
let call_results = serde_json::from_slice(call_results)
.map_err(|e| PreparationError::CallResultsDeFailed(e, call_results.to_vec()))?;
let mut ctx = ExecutionCtx::new(run_parameters, call_results, prev_data.last_call_request_id);
create_streams(&mut ctx, prev_data);
let ctx = ExecutionCtx::new(prev_data, call_results, run_parameters);
Ok(ctx)
}
fn create_streams(ctx: &mut ExecutionCtx<'_>, prev_data: &InterpreterData) {
for (stream_name, generation_count) in prev_data.global_streams.iter() {
let new_stream = Stream::from_generations_count(*generation_count as usize);
// it's impossible to have duplicates of streams in data because of HashMap in data
ctx.streams.add_global_stream(stream_name.to_string(), new_stream);
}
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2022 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use air_test_utils::prelude::*;
#[test]
// test for github.com/fluencelabs/aquavm/issues/300
fn issue_300() {
let peer_id_1 = "peer_id_1";
let mut peer_vm_1 = create_avm(echo_call_service(), peer_id_1);
let peer_id_2 = "peer_id_2";
let mut peer_vm_2 = create_avm(echo_call_service(), peer_id_2);
let script = f!(r#"
(new $stream
(par
(call "{peer_id_1}" ("" "") [2] $stream)
(call "{peer_id_2}" ("" "") [1] $stream)
)
)
"#);
let result_1 = checked_call_vm!(peer_vm_2, <_>::default(), &script, "", "");
let result_2 = checked_call_vm!(peer_vm_1, <_>::default(), &script, "", result_1.data.clone());
let actual_trace = trace_from_result(&result_2);
let expected_trace = vec![
executed_state::par(1, 1),
executed_state::stream_number(2, 1),
executed_state::stream_number(1, 0),
];
assert_eq!(actual_trace, expected_trace);
}

View File

@@ -27,3 +27,4 @@ mod issue_221;
mod issue_222;
mod issue_241;
mod issue_295;
mod issue_300;