Decouple trace handler (#150)

This commit is contained in:
Mike Voronov
2021-10-05 14:07:38 +03:00
committed by GitHub
parent 37ed77cb81
commit adba9e8e65
47 changed files with 212 additions and 117 deletions

View File

@ -0,0 +1 @@
max_width = 120

View File

@ -0,0 +1,23 @@
[package]
name = "air-trace-handler"
version = "0.1.0"
description = "Implementation of AIR trace handler"
authors = ["Fluence Labs"]
edition = "2018"
license = "Apache-2.0"
repository = "https://github.com/fluencelabs/air/crates/trace-handler"
publish = false
keywords = ["fluence", "air", "webassembly", "programming-language"]
categories = ["wasm"]
[lib]
name = "air_trace_handler"
path = "src/lib.rs"
[dependencies]
air-interpreter-data = { path = "../interpreter-data" }
air-parser = { path = "../air-parser" }
serde_json = "1.0.68"
log = "0.4.14"
thiserror = "1.0.29"

View File

@ -0,0 +1,41 @@
## AIR trace handler
This crate contains implementation of the CRDT-based merging data algorithm. It exposes the `TraceHandler` struct that based on the visitor pattern and has public methods that should be called in certain places of AIR instructions execution. Internally `TraceHandler` contains several FSM and each such public methods do state transitioning of one or more these FSMs. Below are state transition sequences for all instructions that caller must follow.
### Ap instruction
Expected sequence of `TraceHandler` calls for the `ap` instruction:
```
meet_ap_start
-> meet_ap_end
```
### Call instruction
Expected sequence of `TraceHandler` calls for the `call` instruction:
```
meet_call_start
-> meet_call_end
```
### Par instruction
Expected sequence of `TraceHandler` calls for the `par` instruction:
```
meet_par_start
-> meet_par_subtree_end(..., SubtreeType::Left)
-> meet_par_subtree_end(..., SubtreeType::Right)
```
### Fold instruction
Expected sequence of `TraceHandler` calls for the `fold` instruction:
```
meet_fold_start.1 ->
meet_generation_start.N ->
meet_next.M ->
meet_prev.M ->
meet_generation_end.N ->
meet_fold_end.1
```
where .T means that this function should be called exactly T times.

View File

@ -0,0 +1,55 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutedState;
use thiserror::Error as ThisError;
/// Errors arose out while accessing various interpreter data.
#[derive(ThisError, Debug, PartialEq, Eq)]
pub enum KeeperError {
/// Errors occurred when trace_len - trace_position < requested_subtrace_len.
#[error(
"executed trace has {trace_len} elements and current position is {trace_position}, \
but tried to set {requested_subtrace_len} subtrace_len"
)]
SetSubtraceLenFailed {
requested_subtrace_len: usize,
trace_position: usize,
trace_len: usize,
},
/// Errors occurred when
/// requested_subtrace_len != 0 && requested_pos + requested_subtrace_len > trace_len.
#[error(
"executed trace has {trace_len} elements, \
but tried to set {requested_subtrace_len} subtrace_len and {requested_pos} position"
)]
SetSubtraceLenAndPosFailed {
requested_pos: usize,
requested_subtrace_len: usize,
trace_len: usize,
},
/// Errors occurred when Fold FSM tries to obtain stream generation by value_pos from a trace,
/// but this value_pos is bigger than the trace length.
#[error("requested an element at position '{position}', but executed trace contains only '{trace_len}' elements")]
NoElementAtPosition { position: usize, trace_len: usize },
/// Errors occurred when Fold FSM tries to obtain stream generation by value_pos from a trace,
/// but such state doesn't belong to values in streams (it doesn't contain a generation).
#[error("expected a state of CallResult(Value::Stream) or Ap types but '{state}' obtained")]
NoStreamState { state: ExecutedState },
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionTrace;
use super::MergeCtx;
use super::TraceSlider;
use air_interpreter_data::InterpreterData;
use std::collections::HashMap;
/// Keeps all necessary data for merging.
#[derive(Debug, Default, PartialEq)]
pub(crate) struct DataKeeper {
pub(crate) prev_ctx: MergeCtx,
pub(crate) current_ctx: MergeCtx,
pub(crate) new_to_old_pos: HashMap<usize, DataPositions>,
pub(crate) result_trace: ExecutionTrace,
}
impl DataKeeper {
pub(crate) fn from_data(prev_data: InterpreterData, current_data: InterpreterData) -> Self {
let prev_ctx = MergeCtx::from_data(prev_data);
let current_ctx = MergeCtx::from_data(current_data);
Self {
prev_ctx,
current_ctx,
new_to_old_pos: <_>::default(),
result_trace: <_>::default(),
}
}
pub(crate) fn result_states_count(&self) -> usize {
self.result_trace.len()
}
pub(crate) fn prev_slider(&self) -> &TraceSlider {
&self.prev_ctx.slider
}
pub(crate) fn prev_slider_mut(&mut self) -> &mut TraceSlider {
&mut self.prev_ctx.slider
}
pub(crate) fn current_slider(&self) -> &TraceSlider {
&self.current_ctx.slider
}
pub(crate) fn current_slider_mut(&mut self) -> &mut TraceSlider {
&mut self.current_ctx.slider
}
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub(crate) struct DataPositions {
pub(crate) prev_pos: Option<usize>,
pub(crate) current_pos: Option<usize>,
}

View File

@ -0,0 +1,79 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutionTrace;
use super::KeeperError;
use super::KeeperResult;
use super::TraceSlider;
use air_interpreter_data::InterpreterData;
use air_interpreter_data::StreamGenerations;
use std::collections::HashMap;
/// Contains all necessary information about data.
#[derive(Debug, Default, PartialEq)]
pub struct MergeCtx {
pub slider: TraceSlider,
pub streams: StreamGenerations,
}
impl MergeCtx {
#[allow(dead_code)]
pub(crate) fn from_trace(trace: ExecutionTrace) -> Self {
let slider = TraceSlider::new(trace);
Self {
slider,
streams: HashMap::new(),
}
}
pub(crate) fn from_data(data: InterpreterData) -> Self {
let slider = TraceSlider::new(data.trace);
Self {
slider,
streams: data.streams,
}
}
pub(crate) fn try_get_generation(&self, position: u32) -> KeeperResult<u32> {
use air_interpreter_data::*;
let position = position as usize;
let state = self
.slider
.state_at_position(position)
.ok_or_else(|| KeeperError::NoElementAtPosition {
position,
trace_len: self.slider.trace_len(),
})?;
match state {
ExecutedState::Call(CallResult::Executed(Value::Stream { generation, .. })) => Ok(*generation),
// such Aps are always preceded by Fold where corresponding stream could be used,
// so it's been already checked that res_generation is well-formed
// and accessing 0th element is safe here
ExecutedState::Ap(ap_result) => Ok(ap_result.res_generations[0]),
state => Err(KeeperError::NoStreamState { state: state.clone() }),
}
}
pub(crate) fn stream_generation(&self, stream_name: &str) -> Option<u32> {
self.streams.get(stream_name).copied()
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod errors;
mod keeper;
mod merge_ctx;
mod trace_slider;
pub use errors::KeeperError;
pub use merge_ctx::MergeCtx;
pub use trace_slider::TraceSlider;
pub(crate) use keeper::DataKeeper;
pub(crate) use keeper::DataPositions;
pub(self) type KeeperResult<T> = std::result::Result<T, KeeperError>;
use super::ExecutedState;
use super::ExecutionTrace;

View File

@ -0,0 +1,113 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutedState;
use super::ExecutionTrace;
use super::KeeperError::*;
use super::KeeperResult;
/// This slider is intended to slide on a subtrace inside provided trace. This subtrace
/// is identified by position and len.
// TODO: check for overflow
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct TraceSlider {
/// Trace that slider slide on.
trace: ExecutionTrace,
/// Position of current subtrace inside trace.
position: usize,
/// Length of a current subtrace.
subtrace_len: usize,
/// Count of seen elements since the last position update.
seen_elements: usize,
}
impl TraceSlider {
pub(super) fn new(trace: ExecutionTrace) -> Self {
let subtrace_len = trace.len();
Self {
trace,
subtrace_len,
..<_>::default()
}
}
/// Returns the next state if current interval length hasn't been reached
/// and None otherwise.
#[allow(clippy::suspicious_operation_groupings)]
pub(crate) fn next_state(&mut self) -> Option<ExecutedState> {
if self.seen_elements >= self.subtrace_len || self.position >= self.trace.len() {
return None;
}
let result = self.trace[self.position].clone();
self.position += 1;
self.seen_elements += 1;
Some(result)
}
pub(crate) fn set_position_and_len(&mut self, position: usize, subtrace_len: usize) -> KeeperResult<()> {
// it's possible to set empty subtrace_len and inconsistent position
if subtrace_len != 0 && position + subtrace_len > self.trace.len() {
return Err(SetSubtraceLenAndPosFailed {
requested_pos: position,
requested_subtrace_len: subtrace_len,
trace_len: self.trace.len(),
});
}
self.position = position;
self.subtrace_len = subtrace_len;
self.seen_elements = 0;
Ok(())
}
pub(crate) fn set_subtrace_len(&mut self, subtrace_len: usize) -> KeeperResult<()> {
let trace_remainder = self.trace.len() - self.position;
if trace_remainder < subtrace_len {
return Err(SetSubtraceLenFailed {
requested_subtrace_len: subtrace_len,
trace_position: self.position,
trace_len: self.trace.len(),
});
}
self.seen_elements = 0;
self.subtrace_len = subtrace_len;
Ok(())
}
pub(crate) fn position(&self) -> usize {
self.position
}
pub(crate) fn subtrace_len(&self) -> usize {
self.subtrace_len - self.seen_elements
}
pub(super) fn state_at_position(&self, position: usize) -> Option<&ExecutedState> {
self.trace.get(position)
}
pub(super) fn trace_len(&self) -> usize {
self.trace.len()
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::data_keeper::KeeperError;
use super::merger::MergeError;
use super::state_automata::StateFSMError;
use thiserror::Error as ThisError;
/// Errors arose out of merging previous data with a new.
#[derive(ThisError, Debug)]
#[allow(clippy::enum_variant_names)]
pub enum TraceHandlerError {
#[error(transparent)]
KeeperError(#[from] KeeperError),
#[error(transparent)]
MergeError(#[from] MergeError),
#[error(transparent)]
StateFSMError(#[from] StateFSMError),
}

View File

@ -0,0 +1,166 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use merger::*;
use air_interpreter_data::InterpreterData;
use air_parser::ast::CallOutputValue;
#[derive(Debug, Default)]
pub struct TraceHandler {
data_keeper: DataKeeper,
fsm_keeper: FSMKeeper,
}
impl TraceHandler {
pub fn from_data(prev_data: InterpreterData, current_data: InterpreterData) -> Self {
let data_keeper = DataKeeper::from_data(prev_data, current_data);
Self {
data_keeper,
fsm_keeper: <_>::default(),
}
}
/// Returns size of elements inside result trace and intended to provide
/// a position of next inserted elements.
pub fn trace_pos(&self) -> usize {
self.data_keeper.result_trace.len()
}
pub fn into_result_trace(self) -> ExecutionTrace {
self.data_keeper.result_trace
}
pub fn as_result_trace(&self) -> &ExecutionTrace {
&self.data_keeper.result_trace
}
pub fn subtree_sizes(&self) -> (usize, usize) {
let prev_len = self.data_keeper.prev_slider().subtrace_len();
let current_len = self.data_keeper.current_slider().subtrace_len();
(prev_len, current_len)
}
}
impl TraceHandler {
/// Should be called at the beginning of a call execution.
pub fn meet_call_start(&mut self, output_value: &CallOutputValue<'_>) -> TraceHandlerResult<MergerCallResult> {
try_merge_next_state_as_call(&mut self.data_keeper, output_value).map_err(Into::into)
}
/// Should be called when a call instruction was executed successfully. It adds the supplied
/// state to the result trace.
pub fn meet_call_end(&mut self, call_result: CallResult) {
log::trace!(
target: crate::EXECUTED_STATE_CHANGING,
" adding new call executed state {:?}",
call_result
);
self.data_keeper.result_trace.push(ExecutedState::Call(call_result));
}
}
impl TraceHandler {
pub fn meet_ap_start(&mut self) -> TraceHandlerResult<MergerApResult> {
try_merge_next_state_as_ap(&mut self.data_keeper).map_err(Into::into)
}
pub fn meet_ap_end(&mut self, ap_result: ApResult) {
self.data_keeper.result_trace.push(ExecutedState::Ap(ap_result));
}
}
impl TraceHandler {
pub fn meet_par_start(&mut self) -> TraceHandlerResult<()> {
let ingredients = merger::try_merge_next_state_as_par(&mut self.data_keeper)?;
let par_fsm = ParFSM::from_left_started(ingredients, &mut self.data_keeper)?;
self.fsm_keeper.push_par(par_fsm);
Ok(())
}
pub fn meet_par_subtree_end(&mut self, subtree_type: SubtreeType) -> TraceHandlerResult<()> {
match subtree_type {
SubtreeType::Left => {
let par_fsm = self.fsm_keeper.last_par()?;
par_fsm.left_completed(&mut self.data_keeper);
}
SubtreeType::Right => {
let par_fsm = self.fsm_keeper.pop_par()?;
par_fsm.right_completed(&mut self.data_keeper);
}
}
Ok(())
}
}
impl TraceHandler {
pub fn meet_fold_start(&mut self, fold_id: u32) -> TraceHandlerResult<()> {
let ingredients = try_merge_next_state_as_fold(&mut self.data_keeper)?;
let fold_fsm = FoldFSM::from_fold_start(ingredients, &mut self.data_keeper)?;
self.fsm_keeper.add_fold(fold_id, fold_fsm);
Ok(())
}
pub fn meet_iteration_start(&mut self, fold_id: u32, value_pos: usize) -> TraceHandlerResult<()> {
let fold_fsm = self.fsm_keeper.fold_mut(fold_id)?;
fold_fsm.meet_iteration_start(value_pos, &mut self.data_keeper)?;
Ok(())
}
pub fn meet_iteration_end(&mut self, fold_id: u32) -> TraceHandlerResult<()> {
let fold_fsm = self.fsm_keeper.fold_mut(fold_id)?;
fold_fsm.meet_iteration_end(&mut self.data_keeper);
Ok(())
}
pub fn meet_back_iterator(&mut self, fold_id: u32) -> TraceHandlerResult<()> {
let fold_fsm = self.fsm_keeper.fold_mut(fold_id)?;
fold_fsm.meet_back_iterator(&mut self.data_keeper)?;
Ok(())
}
pub fn meet_generation_end(&mut self, fold_id: u32) -> TraceHandlerResult<()> {
let fold_fsm = self.fsm_keeper.fold_mut(fold_id)?;
fold_fsm.meet_generation_end(&mut self.data_keeper);
Ok(())
}
pub fn meet_fold_end(&mut self, fold_id: u32) -> TraceHandlerResult<()> {
let fold_fsm = self.fsm_keeper.extract_fold(fold_id)?;
fold_fsm.meet_fold_end(&mut self.data_keeper);
Ok(())
}
pub fn fold_end_with_error(&mut self, fold_id: u32) {
let fold_fsm = match self.fsm_keeper.extract_fold(fold_id) {
Ok(fold_fsm) => fold_fsm,
// just passing here is ok, because error could be produced while fold initialization
Err(_) => return,
};
fold_fsm.fold_end_with_error(&mut self.data_keeper);
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod data_keeper;
mod errors;
mod handler;
mod merger;
mod state_automata;
pub use errors::TraceHandlerError;
pub use handler::TraceHandler;
pub use merger::ApResultError;
pub use merger::CallResultError;
pub use merger::FoldResultError;
pub use merger::MergeCtxType;
pub use merger::MergeError;
pub use merger::MergerApResult;
pub use merger::MergerCallResult;
pub use state_automata::SubtreeType;
pub type TraceHandlerResult<T> = std::result::Result<T, TraceHandlerError>;
pub const EXECUTED_STATE_CHANGING: &str = "executed_state_changing";
use air_interpreter_data::*;
use data_keeper::DataKeeper;
use data_keeper::MergeCtx;
use merger::MergerFoldResult;
use merger::ResolvedFold;
use merger::ResolvedSubTraceDescs;
use state_automata::FSMKeeper;
use state_automata::FoldFSM;
use state_automata::ParFSM;

View File

@ -0,0 +1,66 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
#[derive(Debug, Clone)]
pub enum MergerApResult {
/// There is no corresponding state in a trace for this call.
Empty,
/// There was a state in at least one of the contexts. If there were two states in
/// both contexts, they were successfully merged.
ApResult { res_generation: Option<u32> },
}
pub(crate) fn try_merge_next_state_as_ap(data_keeper: &mut DataKeeper) -> MergeResult<MergerApResult> {
use ExecutedState::Ap;
let prev_state = data_keeper.prev_slider_mut().next_state();
let current_state = data_keeper.current_slider_mut().next_state();
let ap = match (prev_state, current_state) {
(Some(Ap(prev_ap)), _) => prev_ap,
// check that current state is Ap, but it's impossible to use it, because prev_data
// could not have streams with such generations
(None, Some(Ap(_))) => return Ok(MergerApResult::Empty),
(None, None) => return Ok(MergerApResult::Empty),
(prev_state, current_state) => return Err(MergeError::incompatible_states(prev_state, current_state, "ap")),
};
to_merger_result(ap)
}
macro_rules! to_maybe_generation {
($ap_result:ident, $generations:expr, $error_ty:ident) => {
match $generations.len() {
0 => None,
1 => Some($generations[0]),
_ => {
let ap_error = super::ApResultError::$error_ty($ap_result);
return Err(super::MergeError::IncorrectApResult(ap_error));
}
}
};
}
fn to_merger_result(ap_result: ApResult) -> MergeResult<MergerApResult> {
let res_generation = to_maybe_generation!(ap_result, &ap_result.res_generations, TooManyDstGenerations);
let ap_result = MergerApResult::ApResult { res_generation };
Ok(ap_result)
}

View File

@ -0,0 +1,119 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod call_result_constructor;
mod utils;
use super::*;
use air_parser::ast::CallOutputValue;
use call_result_constructor::*;
use utils::*;
#[derive(Debug, Clone)]
pub enum MergerCallResult {
/// There is no corresponding state in a trace for this call.
Empty,
/// There was a state in at least one of the contexts. If there were two states in
/// both contexts, they were successfully merged.
CallResult { value: CallResult, trace_pos: usize },
}
pub(crate) fn try_merge_next_state_as_call(
data_keeper: &mut DataKeeper,
output_value: &CallOutputValue<'_>,
) -> MergeResult<MergerCallResult> {
use ExecutedState::Call;
use PrepareScheme::*;
let prev_state = data_keeper.prev_slider_mut().next_state();
let current_state = data_keeper.current_slider_mut().next_state();
let value_type = ValueType::from_output_value(output_value);
let (prev_call, current_call) = match (prev_state, current_state) {
(Some(Call(prev_call)), Some(Call(current_call))) => (prev_call, current_call),
// this special case is needed to merge stream generation in a right way
(None, Some(Call(CallResult::Executed(value)))) => {
let call_result = merge_current_executed(value, value_type, data_keeper)?;
return Ok(prepare_call_result(call_result, Current, data_keeper));
}
(None, Some(Call(current_call))) => return Ok(prepare_call_result(current_call, Current, data_keeper)),
(Some(Call(prev_call)), None) => return Ok(prepare_call_result(prev_call, Previous, data_keeper)),
(None, None) => return Ok(MergerCallResult::Empty),
(prev_state, current_state) => return Err(MergeError::incompatible_states(prev_state, current_state, "call")),
};
let merged_call = merge_call_result(prev_call, current_call, value_type, data_keeper)?;
let call_result = prepare_call_result(merged_call, Both, data_keeper);
try_match_value_type(&call_result, value_type)?;
Ok(call_result)
}
fn merge_call_result(
prev_call: CallResult,
current_call: CallResult,
value_type: ValueType<'_>,
data_keeper: &DataKeeper,
) -> MergeResult<CallResult> {
use CallResult::*;
let merged_state = match (prev_call, current_call) {
(prev @ CallServiceFailed(..), current @ CallServiceFailed(..)) => {
check_equal(&prev, &current)?;
prev
}
(RequestSentBy(_), current @ CallServiceFailed(..)) => current,
(prev @ CallServiceFailed(..), RequestSentBy(_)) => prev,
// senders shouldn't be checked for equality, for more info please look at
// github.com/fluencelabs/aquavm/issues/137
(prev @ RequestSentBy(_), RequestSentBy(_)) => prev,
// this special case is needed to merge stream generation in a right way
(RequestSentBy(_), Executed(value)) => merge_current_executed(value, value_type, data_keeper)?,
(prev @ Executed(..), RequestSentBy(_)) => prev,
(Executed(prev_value), Executed(current_value)) => merge_executed(prev_value, current_value)?,
(prev_call, current_call) => return Err(CallResultError::incompatible_calls(prev_call, current_call)),
};
Ok(merged_state)
}
#[derive(Debug, Copy, Clone)]
pub(crate) enum ValueType<'i> {
Scalar,
Stream(&'i str),
}
impl<'i> ValueType<'i> {
pub(self) fn from_output_value(output_value: &'i CallOutputValue<'_>) -> Self {
use air_parser::ast::AstVariable;
match output_value {
CallOutputValue::Variable(AstVariable::Stream(stream_name)) => ValueType::Stream(stream_name),
_ => ValueType::Scalar,
}
}
}
use std::fmt;
impl fmt::Display for ValueType<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ValueType::Scalar => write!(f, "scalar"),
ValueType::Stream(stream_name) => write!(f, "${}", stream_name),
}
}
}

View File

@ -0,0 +1,46 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
pub(super) enum PrepareScheme {
Previous,
Current,
Both,
}
pub(super) fn prepare_call_result(
value: CallResult,
scheme: PrepareScheme,
data_keeper: &mut DataKeeper,
) -> MergerCallResult {
let prev_pos = match scheme {
PrepareScheme::Previous | PrepareScheme::Both => Some(data_keeper.prev_slider().position() - 1),
PrepareScheme::Current => None,
};
let current_pos = match scheme {
PrepareScheme::Current | PrepareScheme::Both => Some(data_keeper.current_slider().position() - 1),
PrepareScheme::Previous => None,
};
let data_positions = DataPositions { prev_pos, current_pos };
let trace_pos = data_keeper.result_states_count();
data_keeper.new_to_old_pos.insert(trace_pos, data_positions);
MergerCallResult::CallResult { value, trace_pos }
}

View File

@ -0,0 +1,109 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
type JValue = serde_json::Value;
use std::rc::Rc;
pub(super) fn merge_executed(prev_value: Value, current_value: Value) -> MergeResult<CallResult> {
match (&prev_value, &current_value) {
(Value::Scalar(_), Value::Scalar(_)) => {
are_scalars_equal(&prev_value, &current_value)?;
Ok(CallResult::Executed(prev_value))
}
(Value::Stream { value: pr, .. }, Value::Stream { value: cr, .. }) => {
are_streams_equal(pr, cr, &prev_value, &current_value)?;
Ok(CallResult::Executed(prev_value))
}
_ => Err(CallResultError::not_equal_values(prev_value, current_value)),
}
}
fn are_scalars_equal(prev_value: &Value, current_value: &Value) -> MergeResult<()> {
if prev_value == current_value {
return Ok(());
}
Err(CallResultError::not_equal_values(
prev_value.clone(),
current_value.clone(),
))
}
fn are_streams_equal(
prev_result_value: &Rc<JValue>,
current_result_value: &Rc<JValue>,
prev_value: &Value,
current_value: &Value,
) -> MergeResult<()> {
// values from streams could have different generations and it's ok
if prev_result_value == current_result_value {
return Ok(());
}
Err(CallResultError::not_equal_values(
prev_value.clone(),
current_value.clone(),
))
}
/// Merging of value from only current data to a stream is a something special, because it's
/// needed to choose generation not from current data, but a maximum from streams on a current peer.
/// Maximum versions are tracked in data in a special field called streams.
pub(super) fn merge_current_executed(
value: Value,
value_type: ValueType<'_>,
data_keeper: &DataKeeper,
) -> MergeResult<CallResult> {
match (value, value_type) {
(scalar @ Value::Scalar(_), ValueType::Scalar) => Ok(CallResult::Executed(scalar)),
(Value::Stream { value, .. }, ValueType::Stream(stream_name)) => {
let generation = data_keeper.prev_ctx.stream_generation(stream_name).unwrap_or_default();
let stream = Value::Stream { value, generation };
Ok(CallResult::Executed(stream))
}
(value, value_type) => Err(CallResultError::data_not_match(value, value_type)),
}
}
pub(super) fn check_equal(prev_call: &CallResult, current_call: &CallResult) -> MergeResult<()> {
if prev_call != current_call {
Err(CallResultError::incompatible_calls(
prev_call.clone(),
current_call.clone(),
))
} else {
Ok(())
}
}
pub(super) fn try_match_value_type(merged_call: &MergerCallResult, value_type: ValueType<'_>) -> MergeResult<()> {
if let MergerCallResult::CallResult { value, .. } = merged_call {
return match (value, value_type) {
(CallResult::Executed(value @ Value::Scalar(_)), ValueType::Stream(_)) => {
Err(CallResultError::data_not_match(value.clone(), value_type))
}
(CallResult::Executed(value @ Value::Stream { .. }), ValueType::Scalar) => {
Err(CallResultError::data_not_match(value.clone(), value_type))
}
_ => Ok(()),
};
}
Ok(())
}

View File

@ -0,0 +1,155 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::call_merger::ValueType;
use super::ApResult;
use super::CallResult;
use super::ExecutedState;
use super::FoldResult;
use super::KeeperError;
use super::Value;
use thiserror::Error as ThisError;
/// Errors arose out of merging previous data with a new.
#[derive(ThisError, Debug)]
pub enum MergeError {
/// Errors occurred when previous and current executed states are incompatible.
#[error("previous and current data have incompatible states: '{0:?}' '{1:?}'")]
IncompatibleExecutedStates(ExecutedState, ExecutedState),
/// Merger was expected to see other state that was obtained from one of traces
/// (the other state was absent).
#[error("state from {1} `{0:?}` is incompatible with expected {2}")]
DifferentExecutedStateExpected(ExecutedState, DataType, &'static str),
#[error(transparent)]
KeeperError(#[from] KeeperError),
#[error(transparent)]
IncorrectApResult(#[from] ApResultError),
#[error(transparent)]
IncorrectCallResult(#[from] CallResultError),
#[error(transparent)]
IncorrectFoldResult(#[from] FoldResultError),
}
#[derive(ThisError, Debug)]
pub enum ApResultError {
/// Error occurred when Ap results contains more then 1 generation in destination.
#[error("{0:?} ap result contains too many generations in destination")]
TooManyDstGenerations(ApResult),
}
#[derive(ThisError, Debug)]
pub enum CallResultError {
#[error("values in call results are not equal: {prev_value:?} != {current_value:?}")]
ValuesNotEqual { prev_value: Value, current_value: Value },
/// Errors occurred when previous and current call results are incompatible.
#[error("previous and current call results are incompatible: '{prev_call:?}' '{current_call:?}'")]
IncompatibleCallResults {
prev_call: CallResult,
current_call: CallResult,
},
#[error("air scripts has the following value type '{air_type}' while data other '{data_value:?}'")]
DataNotMatchAIR { air_type: String, data_value: Value },
}
#[derive(ThisError, Debug)]
pub enum FoldResultError {
#[error("the first {count} subtrace descriptors lens of fold {fold_result:?} overflows")]
SubtraceLenOverflow { fold_result: FoldResult, count: usize },
/// There are several lores with the same value_pos.
#[error("{0:?} contains several subtraces with the same value_pos {1}")]
SeveralRecordsWithSamePos(FoldResult, usize),
/// Errors occurred when one of the fold subtrace lore doesn't contain 2 descriptors.
#[error("fold contains {0} sublore descriptors, but 2 is expected")]
FoldIncorrectSubtracesCount(usize),
}
impl MergeError {
// shouldn't be called with both Nones
pub(crate) fn incompatible_states(
prev_state: Option<ExecutedState>,
current_state: Option<ExecutedState>,
expected_state: &'static str,
) -> Self {
match (prev_state, current_state) {
(Some(prev_state), Some(current_state)) => {
MergeError::IncompatibleExecutedStates(prev_state, current_state)
}
(None, Some(current_state)) => {
MergeError::DifferentExecutedStateExpected(current_state, DataType::Current, expected_state)
}
(Some(prev_state), None) => {
MergeError::DifferentExecutedStateExpected(prev_state, DataType::Previous, expected_state)
}
(None, None) => unreachable!("shouldn't be called with both None"),
}
}
}
// these impl methods allow construction of MergeError and are used to make code more clean
impl CallResultError {
pub(crate) fn not_equal_values(prev_value: Value, current_value: Value) -> MergeError {
let call_result_error = CallResultError::ValuesNotEqual {
prev_value,
current_value,
};
MergeError::IncorrectCallResult(call_result_error)
}
pub(crate) fn incompatible_calls(prev_call: CallResult, current_call: CallResult) -> MergeError {
let call_result_error = CallResultError::IncompatibleCallResults {
prev_call,
current_call,
};
MergeError::IncorrectCallResult(call_result_error)
}
pub(crate) fn data_not_match(data_value: Value, air_type: ValueType<'_>) -> MergeError {
let air_type = air_type.to_string();
let call_result_error = CallResultError::DataNotMatchAIR { air_type, data_value };
MergeError::IncorrectCallResult(call_result_error)
}
}
#[derive(Clone, Copy, Debug)]
pub enum DataType {
Previous,
Current,
}
use std::fmt;
impl fmt::Display for DataType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DataType::Previous => write!(f, "previous"),
DataType::Current => write!(f, "current"),
}
}
}

View File

@ -0,0 +1,91 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod fold_lore_resolver;
use super::*;
pub use fold_lore_resolver::*;
#[derive(Debug, Default, Clone)]
pub struct MergerFoldResult {
pub prev_fold_lore: ResolvedFold,
pub current_fold_lore: ResolvedFold,
}
pub(crate) fn try_merge_next_state_as_fold(data_keeper: &mut DataKeeper) -> MergeResult<MergerFoldResult> {
use ExecutedState::Fold;
let prev_state = data_keeper.prev_slider_mut().next_state();
let current_state = data_keeper.current_slider_mut().next_state();
let fold_result = match (prev_state, current_state) {
(Some(Fold(prev_fold)), Some(Fold(current_fold))) => {
MergerFoldResult::from_fold_results(&prev_fold, &current_fold, data_keeper)
}
(None, Some(Fold(current_fold))) => {
MergerFoldResult::from_fold_result(&current_fold, MergeCtxType::Current, data_keeper)
}
(Some(Fold(prev_fold)), None) => {
MergerFoldResult::from_fold_result(&prev_fold, MergeCtxType::Previous, data_keeper)
}
(None, None) => return Ok(MergerFoldResult::default()),
(prev_state, current_state) => return Err(MergeError::incompatible_states(prev_state, current_state, "fold")),
}?;
Ok(fold_result)
}
impl MergerFoldResult {
pub(self) fn from_fold_result(
fold: &FoldResult,
ctx_type: MergeCtxType,
data_keeper: &DataKeeper,
) -> MergeResult<Self> {
let (prev_fold_lore, current_fold_lore) = match ctx_type {
MergeCtxType::Previous => {
let fold_lore = resolve_fold_lore(fold, &data_keeper.prev_ctx)?;
(fold_lore, <_>::default())
}
MergeCtxType::Current => {
let fold_lore = resolve_fold_lore(fold, &data_keeper.current_ctx)?;
(<_>::default(), fold_lore)
}
};
let merge_result = Self {
prev_fold_lore,
current_fold_lore,
};
Ok(merge_result)
}
pub(self) fn from_fold_results(
prev_fold: &FoldResult,
current_fold: &FoldResult,
data_keeper: &DataKeeper,
) -> MergeResult<Self> {
let prev_fold_lore = resolve_fold_lore(prev_fold, &data_keeper.prev_ctx)?;
let current_fold_lore = resolve_fold_lore(current_fold, &data_keeper.current_ctx)?;
let merge_result = Self {
prev_fold_lore,
current_fold_lore,
};
Ok(merge_result)
}
}

View File

@ -0,0 +1,186 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use crate::data_keeper::MergeCtx;
use air_interpreter_data::FoldSubTraceLore;
use air_interpreter_data::SubTraceDesc;
use std::collections::HashMap;
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct ResolvedFold {
pub lore: HashMap<usize, ResolvedSubTraceDescs>,
pub fold_states_count: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ResolvedSubTraceDescs {
pub before_subtrace: SubTraceDesc,
pub after_subtrace: SubTraceDesc,
}
pub(super) fn resolve_fold_lore(fold: &FoldResult, merge_ctx: &MergeCtx) -> MergeResult<ResolvedFold> {
let (fold_states_count, lens) = compute_lens_convolution(fold, merge_ctx)?;
let lore = fold.lore.iter().zip(lens).try_fold::<_, _, MergeResult<_>>(
HashMap::with_capacity(fold.lore.len()),
|mut resolved_lore, (lore, lens)| {
let before_subtrace = SubTraceDesc::new(lore.subtraces_desc[0].begin_pos as _, lens.before_len as _);
let after_subtrace = SubTraceDesc::new(lore.subtraces_desc[1].begin_pos as _, lens.after_len as _);
let resolved_descs = ResolvedSubTraceDescs::new(before_subtrace, after_subtrace);
match resolved_lore.insert(lore.value_pos as usize, resolved_descs) {
Some(_) => Err(FoldResultError::SeveralRecordsWithSamePos(
fold.clone(),
lore.value_pos as usize,
))
.map_err(Into::into),
None => Ok(resolved_lore),
}
},
)?;
let resolved_fold_lore = ResolvedFold::new(lore, fold_states_count);
Ok(resolved_fold_lore)
}
/// This function does conversion subtrace_lens of a fold result, it's better to explain it on
/// examples.
///
/// Imagine a fold on stream with 3 elements that have the same generation, in this case the
/// conversion will look like this:
/// [1, 1] [2, 2] [3, 3] => [6, 1] [5, 3] [3, 6]
/// g0 g0 g0
/// here a number before comma represents count of elements before next, and after the comma - after
///
/// For fold with 5 elements of two generations:
/// [1, 1] [2, 2] [3, 3] [4, 4] [5, 5] [1, 1] => [6, 1] [5, 3] [3, 6] [9, 4] [5, 9] [1, 1]
/// g0 g0 g0 g1 g1 g2
///
/// It could be seen that this function does a convolution of lens with respect to generations.
/// This is needed to handle (fold (par (next ... cases, because of subtrace_len of a Fold state
/// describes only states inside this iteration without states that next brings, however a Par
/// lens describe the whole subtree, where "next" states are included.
// TODO: in future it's possible to change a format of a Fold state to one behaves like Par,
// because this function adds some overhead
fn compute_lens_convolution(fold: &FoldResult, merge_ctx: &MergeCtx) -> MergeResult<(usize, Vec<LoresLen>)> {
let subtraces_count = fold.lore.len();
let mut lens = Vec::with_capacity(subtraces_count);
let mut fold_states_count: usize = 0;
let mut last_seen_generation = 0;
let mut last_seen_generation_pos = 0;
let mut cum_after_len = 0;
for subtrace_id in 0..subtraces_count {
let subtrace_lore = &fold.lore[subtrace_id];
check_subtrace_lore(subtrace_lore)?;
let current_generation = merge_ctx.try_get_generation(subtrace_lore.value_pos)?;
// TODO: check sequence for monotone
if last_seen_generation != current_generation {
if subtrace_id > 0 {
// do a back traversal for
compute_before_lens(&mut lens, last_seen_generation_pos, subtrace_id - 1);
}
last_seen_generation = current_generation;
last_seen_generation_pos = subtrace_id;
cum_after_len = 0;
}
let before_len = subtrace_lore.subtraces_desc[0].subtrace_len;
let after_len = subtrace_lore.subtraces_desc[1].subtrace_len;
// this checks for overflow both cum_before_len and cum_after_len
fold_states_count = fold_states_count
.checked_add(before_len as usize)
.and_then(|v| v.checked_add(after_len as usize))
.ok_or_else(|| FoldResultError::SubtraceLenOverflow {
fold_result: fold.clone(),
count: subtrace_id,
})?;
cum_after_len += after_len;
// temporary set not cumulative before len
let new_lens = LoresLen::new(before_len, cum_after_len);
lens.push(new_lens);
}
if subtraces_count > 0 {
compute_before_lens(&mut lens, last_seen_generation_pos, subtraces_count - 1);
}
Ok((fold_states_count, lens))
}
fn compute_before_lens(lore_lens: &mut [LoresLen], begin_pos: usize, end_pos: usize) {
let mut cum_before_len = 0;
for subtrace_id in (begin_pos..=end_pos).rev() {
let lens = &mut lore_lens[subtrace_id];
let current_before_len = lens.before_len;
cum_before_len += current_before_len;
lens.before_len = cum_before_len;
}
}
fn check_subtrace_lore(subtrace_lore: &FoldSubTraceLore) -> MergeResult<()> {
// this limitation is due to current constraint on count of next inside one fold,
// for more info please see comments in the interpreter-data crate
const SUBTRACE_DESC_COUNT: usize = 2;
if subtrace_lore.subtraces_desc.len() != SUBTRACE_DESC_COUNT {
return Err(FoldResultError::FoldIncorrectSubtracesCount(
subtrace_lore.subtraces_desc.len(),
))
.map_err(Into::into);
}
Ok(())
}
impl ResolvedFold {
pub(crate) fn new(lore: HashMap<usize, ResolvedSubTraceDescs>, fold_states_count: usize) -> Self {
Self {
lore,
fold_states_count,
}
}
}
impl ResolvedSubTraceDescs {
pub(self) fn new(before_subtrace: SubTraceDesc, after_subtrace: SubTraceDesc) -> Self {
Self {
before_subtrace,
after_subtrace,
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
struct LoresLen {
pub(self) before_len: u32,
pub(self) after_len: u32,
}
impl LoresLen {
pub(self) fn new(before_len: u32, after_len: u32) -> Self {
Self { before_len, after_len }
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod ap_merger;
mod call_merger;
mod errors;
mod fold_merger;
mod par_merger;
pub use ap_merger::MergerApResult;
pub use call_merger::MergerCallResult;
pub use fold_merger::MergerFoldResult;
pub use par_merger::MergerParResult;
pub use errors::ApResultError;
pub use errors::CallResultError;
pub use errors::FoldResultError;
pub use errors::MergeError;
pub use fold_merger::ResolvedFold;
pub use fold_merger::ResolvedSubTraceDescs;
pub(super) use ap_merger::try_merge_next_state_as_ap;
pub(super) use call_merger::try_merge_next_state_as_call;
pub(crate) use fold_merger::try_merge_next_state_as_fold;
pub(crate) use par_merger::try_merge_next_state_as_par;
type MergeResult<T> = std::result::Result<T, MergeError>;
use super::data_keeper::DataPositions;
use super::data_keeper::KeeperError;
use super::DataKeeper;
use air_interpreter_data::*;
#[derive(Debug, Copy, Clone)]
pub enum MergeCtxType {
Current,
Previous,
}
use std::fmt;
impl fmt::Display for MergeCtxType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MergeCtxType::Previous => write!(f, "previous"),
MergeCtxType::Current => write!(f, "current"),
}
}
}

View File

@ -0,0 +1,62 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use ExecutedState::Par;
#[derive(Default, Debug, Copy, Clone)]
pub struct MergerParResult {
pub prev_par: Option<ParResult>,
pub current_par: Option<ParResult>,
}
pub(crate) fn try_merge_next_state_as_par(data_keeper: &mut DataKeeper) -> MergeResult<MergerParResult> {
let prev_state = data_keeper.prev_slider_mut().next_state();
let current_state = data_keeper.current_slider_mut().next_state();
let result = match (prev_state, current_state) {
(Some(Par(prev_par)), Some(Par(current_par))) => MergerParResult::from_pars(prev_par, current_par),
(None, Some(Par(current_par))) => MergerParResult::from_current_par(current_par),
(Some(Par(prev_par)), None) => MergerParResult::from_prev_par(prev_par),
(None, None) => MergerParResult::default(),
(prev_state, current_state) => return Err(MergeError::incompatible_states(prev_state, current_state, "par")),
};
Ok(result)
}
impl MergerParResult {
pub(self) fn from_pars(prev_par: ParResult, current_par: ParResult) -> Self {
Self {
prev_par: Some(prev_par),
current_par: Some(current_par),
}
}
pub(self) fn from_prev_par(prev_par: ParResult) -> Self {
Self {
prev_par: Some(prev_par),
current_par: None,
}
}
pub(self) fn from_current_par(current_par: ParResult) -> Self {
Self {
prev_par: None,
current_par: Some(current_par),
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::KeeperError;
use super::ParResult;
use crate::MergeCtxType;
use crate::ResolvedFold;
use thiserror::Error as ThisError;
/// Errors arose out of merging previous data with a new.
#[derive(ThisError, Debug)]
pub enum StateFSMError {
/// Error occurred while trying to access or pop elements from an empty par queue.
#[error("par queue is empty, while par FSM is requested")]
ParQueueIsEmpty,
/// Errors occurred while trying to access or pop elements from queue,
/// which contains element of different type.
#[error("fold FSM for fold id {0} wasn't found")]
FoldFSMNotFound(u32),
/// Errors occurred when ParResult.0 + ParResult.1 overflows.
#[error("overflow is occurred while calculating the entire len occupied by executed states corresponded to current par: '{0:?}'")]
ParLenOverflow(ParResult),
/// Errors occurred when slider.position() + ParResult.0 + ParResult.1 overflows.
#[error("overflow is occurred while calculating the new position of a {2} slider for resolved par {0:?} and current position {1}'")]
ParPosOverflow(ParResult, usize, MergeCtxType),
/// Errors occurred when ParResult.0 + ParResult.1 value is bigger than current subtree size.
#[error("underflow is occurred while calculating the new position of a {2} slider for resolved par {0:?} and current subtrace len {1}'")]
ParLenUnderflow(ParResult, usize, MergeCtxType),
/// Errors occurred when {0}.fold_states_count + {1} overflows.
#[error("overflow is occurred while calculating the new position of a {2} slider for resolved fold {0:?} and current position {1}'")]
FoldPosOverflow(ResolvedFold, usize, MergeCtxType),
/// Errors occurred when {1} - 1{0}.fold_states_count underflows.
#[error("underflow is occurred while calculating the new position of a {2} slider for resolved fold {0:?} and current subtrace len {1}'")]
FoldLenUnderflow(ResolvedFold, usize, MergeCtxType),
/// Errors bubbled from DataKeeper.
#[error(transparent)]
KeeperError(#[from] KeeperError),
}

View File

@ -0,0 +1,157 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod lore_applier;
mod lore_ctor;
mod lore_ctor_queue;
mod state_handler;
use super::*;
use lore_applier::*;
use lore_ctor::*;
use lore_ctor_queue::*;
use state_handler::CtxStateHandler;
use air_interpreter_data::FoldLore;
/// This FSM manages fold and keeps internally queue of lore ctors.
/// State transitioning functions must work in the following way:
/// meet_fold_start.1 ->
/// meet_generation_start.N ->
/// meet_next.M ->
/// meet_prev.M ->
/// meet_generation_end.N ->
/// meet_fold_end.1
/// where .T means that this function should be called exactly T times.
#[derive(Debug, Default, Clone)]
pub(crate) struct FoldFSM {
prev_fold: ResolvedFold,
current_fold: ResolvedFold,
state_inserter: StateInserter,
ctor_queue: SubTraceLoreCtorQueue,
result_lore: FoldLore,
state_handler: CtxStateHandler,
}
impl FoldFSM {
pub(crate) fn from_fold_start(fold_result: MergerFoldResult, data_keeper: &mut DataKeeper) -> FSMResult<Self> {
let state_inserter = StateInserter::from_keeper(data_keeper);
let state_handler =
CtxStateHandler::prepare(&fold_result.prev_fold_lore, &fold_result.current_fold_lore, data_keeper)?;
let fold_fsm = Self {
prev_fold: fold_result.prev_fold_lore,
current_fold: fold_result.current_fold_lore,
state_inserter,
state_handler,
..<_>::default()
};
Ok(fold_fsm)
}
pub(crate) fn meet_iteration_start(&mut self, value_pos: usize, data_keeper: &mut DataKeeper) -> FSMResult<()> {
let (prev_pos, current_pos) = match data_keeper.new_to_old_pos.get(&value_pos) {
Some(DataPositions { prev_pos, current_pos }) => (prev_pos, current_pos),
None => return self.prepare(None, None, value_pos, data_keeper),
};
let prev_lore = prev_pos.map(|pos| self.prev_fold.lore.remove(&pos)).flatten();
let current_lore = current_pos.map(|pos| self.current_fold.lore.remove(&pos)).flatten();
self.prepare(prev_lore, current_lore, value_pos, data_keeper)
}
fn prepare(
&mut self,
prev_lore: Option<ResolvedSubTraceDescs>,
current_lore: Option<ResolvedSubTraceDescs>,
value_pos: usize,
data_keeper: &mut DataKeeper,
) -> FSMResult<()> {
apply_fold_lore_before(data_keeper, &prev_lore, &current_lore)?;
let ctor = SubTraceLoreCtor::from_before_start(value_pos, data_keeper);
self.ctor_queue.add_element(ctor, prev_lore, current_lore);
Ok(())
}
pub(crate) fn meet_iteration_end(&mut self, data_keeper: &mut DataKeeper) {
self.ctor_queue.current().ctor.before_end(data_keeper);
}
pub(crate) fn meet_back_iterator(&mut self, data_keeper: &mut DataKeeper) -> FSMResult<()> {
let back_traversal_started = self.ctor_queue.back_traversal_started();
let LoreCtorDesc {
ctor,
prev_lore,
current_lore,
} = self.ctor_queue.current();
if !back_traversal_started {
ctor.maybe_before_end(data_keeper);
ctor.after_start(data_keeper);
apply_fold_lore_after(data_keeper, prev_lore, current_lore)?;
self.ctor_queue.start_back_traverse();
} else {
ctor.after_end(data_keeper);
self.ctor_queue.traverse_back();
let LoreCtorDesc {
ctor,
prev_lore,
current_lore,
} = self.ctor_queue.current();
ctor.after_start(data_keeper);
apply_fold_lore_after(data_keeper, prev_lore, current_lore)?;
}
Ok(())
}
pub(crate) fn meet_generation_end(&mut self, data_keeper: &mut DataKeeper) {
self.ctor_queue.finish(data_keeper);
self.ctor_queue.end_back_traverse();
let fold_lore = self.ctor_queue.transform_to_lore();
self.result_lore.extend(fold_lore);
}
pub(crate) fn meet_fold_end(self, data_keeper: &mut DataKeeper) {
// TODO: check for prev and current lore emptiness
let fold_result = FoldResult { lore: self.result_lore };
let state = ExecutedState::Fold(fold_result);
self.state_inserter.insert(data_keeper, state);
self.state_handler.set_final_states(data_keeper);
}
pub(crate) fn fold_end_with_error(mut self, data_keeper: &mut DataKeeper) {
self.meet_generation_end(data_keeper);
self.meet_fold_end(data_keeper);
}
}
#[derive(Clone, Copy)]
pub(self) enum ByNextPosition {
/// Represents executed states before next.
Before,
/// Represents executed states after next.
After,
}

View File

@ -0,0 +1,72 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use ByNextPosition::*;
use MergeCtxType::*;
/// Adjusts sliders accordingly to a before fold lore state.
pub(super) fn apply_fold_lore_before(
data_keeper: &mut DataKeeper,
prev_fold_lore: &Option<ResolvedSubTraceDescs>,
current_fold_lore: &Option<ResolvedSubTraceDescs>,
) -> FSMResult<()> {
apply_fold_lore(data_keeper, prev_fold_lore, Previous, Before)?;
apply_fold_lore(data_keeper, current_fold_lore, Current, Before)
}
/// Adjusts sliders accordingly to an after fold lore state.
pub(super) fn apply_fold_lore_after(
data_keeper: &mut DataKeeper,
prev_fold_lore: &Option<ResolvedSubTraceDescs>,
current_fold_lore: &Option<ResolvedSubTraceDescs>,
) -> FSMResult<()> {
apply_fold_lore(data_keeper, prev_fold_lore, Previous, After)?;
apply_fold_lore(data_keeper, current_fold_lore, Current, After)
}
fn apply_fold_lore(
data_keeper: &mut DataKeeper,
fold_lore: &Option<ResolvedSubTraceDescs>,
ctx_type: MergeCtxType,
next_position: ByNextPosition,
) -> FSMResult<()> {
let fold_lore = match fold_lore {
Some(fold_lore) => fold_lore,
None => return Ok(()),
};
let slider = match ctx_type {
Previous => data_keeper.prev_slider_mut(),
Current => data_keeper.current_slider_mut(),
};
match next_position {
Before => {
slider.set_position_and_len(
fold_lore.before_subtrace.begin_pos as _,
fold_lore.before_subtrace.subtrace_len as _,
)?;
}
After => {
slider.set_position_and_len(
fold_lore.after_subtrace.begin_pos as _,
fold_lore.after_subtrace.subtrace_len as _,
)?;
}
}
Ok(())
}

View File

@ -0,0 +1,146 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use air_interpreter_data::SubTraceDesc;
/// This struct is a form of FSM and is intended to construct a fold subtrace lore element.
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
pub(super) struct SubTraceLoreCtor {
value_pos: usize,
before_tracker: PositionsTracker,
after_tracker: PositionsTracker,
state: CtorState,
}
#[derive(Debug, Default, PartialEq, Eq, Clone, Copy)]
struct PositionsTracker {
pub(self) start_pos: usize,
pub(self) end_pos: usize,
}
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum CtorState {
BeforeStarted,
BeforeCompleted,
AfterStarted,
AfterCompleted,
}
impl SubTraceLoreCtor {
pub(super) fn from_before_start(value_pos: usize, data_keeper: &DataKeeper) -> Self {
let before_tracker = PositionsTracker {
start_pos: data_keeper.result_states_count(),
end_pos: 0,
};
Self {
value_pos,
before_tracker,
..<_>::default()
}
}
pub(super) fn before_end(&mut self, data_keeper: &DataKeeper) {
self.before_tracker.end_pos = data_keeper.result_states_count();
self.state.next();
}
pub(super) fn maybe_before_end(&mut self, data_keeper: &DataKeeper) {
if !matches!(self.state, CtorState::BeforeStarted) {
return;
}
self.before_tracker.end_pos = data_keeper.result_states_count();
self.state.next();
}
pub(super) fn after_start(&mut self, data_keeper: &DataKeeper) {
self.after_tracker.start_pos = data_keeper.result_states_count();
self.state.next();
}
pub(super) fn after_end(&mut self, data_keeper: &DataKeeper) {
self.after_tracker.end_pos = data_keeper.result_states_count();
self.state.next();
}
pub(super) fn into_subtrace_lore(self) -> FoldSubTraceLore {
let before = SubTraceDesc {
begin_pos: self.before_tracker.start_pos as _,
subtrace_len: self.before_tracker.len() as _,
};
let after = SubTraceDesc {
begin_pos: self.after_tracker.start_pos as _,
subtrace_len: self.after_tracker.len() as _,
};
FoldSubTraceLore {
value_pos: self.value_pos as _,
subtraces_desc: vec![before, after],
}
}
// this function should be called in a situation of early exit from fold,
// for more details see the comment above SubTraceLoreCtorQueue::finish().
pub(super) fn finish(&mut self, data_keeper: &DataKeeper) {
use CtorState::*;
match self.state {
BeforeStarted => {
self.before_end(data_keeper);
self.after_start(data_keeper);
self.after_end(data_keeper);
}
BeforeCompleted => {
self.after_start(data_keeper);
self.after_end(data_keeper);
}
AfterStarted => {
self.after_end(data_keeper);
}
AfterCompleted => {}
}
}
}
impl PositionsTracker {
pub(self) fn len(&self) -> usize {
self.end_pos - self.start_pos
}
}
impl Default for CtorState {
fn default() -> Self {
Self::BeforeStarted
}
}
impl CtorState {
pub(self) fn next(&mut self) {
use CtorState::*;
let next_state = match self {
BeforeStarted => BeforeCompleted,
BeforeCompleted => AfterStarted,
AfterStarted => AfterCompleted,
AfterCompleted => AfterCompleted,
};
*self = next_state;
}
}

View File

@ -0,0 +1,103 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::DataKeeper;
use super::FoldLore;
use super::ResolvedSubTraceDescs;
use super::SubTraceLoreCtor;
/// This queue emulates behaviour of fold states traversal:
/// - at first states are traversal forward until the end of states
/// - then states are traversal backward the same times
#[derive(Debug, Default, Clone)]
pub(super) struct SubTraceLoreCtorQueue {
queue: Vec<LoreCtorDesc>,
back_traversal_pos: usize,
back_traversal_started: bool,
}
impl SubTraceLoreCtorQueue {
pub(super) fn current(&mut self) -> &mut LoreCtorDesc {
&mut self.queue[self.back_traversal_pos - 1]
}
pub(super) fn add_element(
&mut self,
ctor: SubTraceLoreCtor,
prev_lore: Option<ResolvedSubTraceDescs>,
current_lore: Option<ResolvedSubTraceDescs>,
) {
let new_element = LoreCtorDesc {
ctor,
prev_lore,
current_lore,
};
self.queue.push(new_element);
self.back_traversal_pos += 1;
}
pub(super) fn traverse_back(&mut self) {
self.back_traversal_pos -= 1;
}
pub(super) fn start_back_traverse(&mut self) {
self.back_traversal_started = true;
}
pub(super) fn end_back_traverse(&mut self) {
self.back_traversal_started = false;
}
pub(super) fn back_traversal_started(&self) -> bool {
self.back_traversal_started
}
pub(super) fn transform_to_lore(&mut self) -> FoldLore {
self.queue
.drain(..)
.map(|l| l.ctor.into_subtrace_lore())
.collect::<Vec<_>>()
}
// this function should be called in a case of early exit from fold, f.e.
// in last error bubbling or join behaviour in the following situations:
// (fold iterable iterator
// (seq
// (call .. [joined_variable])
// (next iterator)
// )
// )
//
// In such example next wouldn't be called and correspondingly all pushed to
// ctor queue states wouldn't be properly finished. This function serves such
// situations, having called from generation_end.
pub(super) fn finish(&mut self, data_keeper: &DataKeeper) {
// TODO: optimize it
for ctor in self.queue.iter_mut() {
ctor.ctor.finish(data_keeper);
}
// set this to zero to correspond that all states were "observed" with back traversal
self.back_traversal_pos = 0;
}
}
#[derive(Debug, Clone)]
pub(super) struct LoreCtorDesc {
pub(super) ctor: SubTraceLoreCtor,
pub(super) prev_lore: Option<ResolvedSubTraceDescs>,
pub(super) current_lore: Option<ResolvedSubTraceDescs>,
}

View File

@ -0,0 +1,65 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use crate::MergeCtxType;
use crate::ResolvedFold;
/// This state updater manage to do the same thing as SubTreeStateUpdater in ParFSM,
/// for details please see its detailed comment.
#[derive(Debug, Default, Clone)]
pub(super) struct CtxStateHandler {
state_pair: CtxStatesPair,
}
impl CtxStateHandler {
pub(super) fn prepare(
prev_fold: &ResolvedFold,
current_fold: &ResolvedFold,
data_keeper: &DataKeeper,
) -> FSMResult<Self> {
let prev_state = compute_new_state(prev_fold, data_keeper, MergeCtxType::Previous)?;
let current_state = compute_new_state(current_fold, data_keeper, MergeCtxType::Current)?;
let state_pair = CtxStatesPair::new(prev_state, current_state);
let updater = Self { state_pair };
Ok(updater)
}
pub(super) fn set_final_states(self, data_keeper: &mut DataKeeper) {
update_ctx_states(self.state_pair, data_keeper)
}
}
fn compute_new_state(fold: &ResolvedFold, data_keeper: &DataKeeper, ctx_type: MergeCtxType) -> FSMResult<CtxState> {
let ctx = match ctx_type {
MergeCtxType::Previous => &data_keeper.prev_ctx,
MergeCtxType::Current => &data_keeper.current_ctx,
};
let current_position = ctx.slider.position();
let pos = current_position
.checked_add(fold.fold_states_count)
.ok_or_else(|| StateFSMError::FoldPosOverflow(fold.clone(), current_position, ctx_type))?;
let current_len = ctx.slider.subtrace_len();
let subtrace_len = current_len
.checked_sub(fold.fold_states_count)
.ok_or_else(|| StateFSMError::FoldLenUnderflow(fold.clone(), current_position, ctx_type))?;
let state = CtxState::new(pos, subtrace_len);
Ok(state)
}

View File

@ -0,0 +1,58 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::FSMResult;
use super::FoldFSM;
use super::ParFSM;
use super::StateFSMError;
use std::collections::HashMap;
#[derive(Debug, Default)]
pub(crate) struct FSMKeeper {
par_stack: Vec<ParFSM>,
fold_map: HashMap<u32, FoldFSM>,
}
impl FSMKeeper {
pub(crate) fn push_par(&mut self, par_fsm: ParFSM) {
self.par_stack.push(par_fsm);
}
pub(crate) fn add_fold(&mut self, fold_id: u32, fold_fsm: FoldFSM) {
self.fold_map.insert(fold_id, fold_fsm);
}
pub(crate) fn last_par(&mut self) -> FSMResult<&mut ParFSM> {
self.par_stack.last_mut().ok_or(StateFSMError::ParQueueIsEmpty)
}
pub(crate) fn pop_par(&mut self) -> FSMResult<ParFSM> {
self.par_stack.pop().ok_or(StateFSMError::ParQueueIsEmpty)
}
pub(crate) fn fold_mut(&mut self, fold_id: u32) -> FSMResult<&mut FoldFSM> {
self.fold_map
.get_mut(&fold_id)
.ok_or(StateFSMError::FoldFSMNotFound(fold_id))
}
pub(crate) fn extract_fold(&mut self, fold_id: u32) -> FSMResult<FoldFSM> {
self.fold_map
.remove(&fold_id)
.ok_or(StateFSMError::FoldFSMNotFound(fold_id))
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod errors;
mod fold_fsm;
mod fsm_queue;
mod par_fsm;
mod state_inserter;
mod utils;
pub use errors::StateFSMError;
pub use par_fsm::SubtreeType;
pub(crate) type FSMResult<T> = std::result::Result<T, StateFSMError>;
pub(super) use fold_fsm::FoldFSM;
pub(super) use fsm_queue::FSMKeeper;
pub(super) use par_fsm::ParFSM;
use super::data_keeper::DataPositions;
use super::data_keeper::KeeperError;
use super::merger::MergerParResult;
use super::DataKeeper;
use super::ExecutedState;
use super::FoldResult;
use super::FoldSubTraceLore;
use super::MergeCtx;
use super::MergeCtxType;
use super::MergerFoldResult;
use super::ParResult;
use super::ResolvedFold;
use super::ResolvedSubTraceDescs;
use state_inserter::StateInserter;
use utils::*;

View File

@ -0,0 +1,104 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod par_builder;
mod state_handler;
use super::*;
use par_builder::ParBuilder;
use state_handler::CtxStateHandler;
/// Manages a par state, its state transitioning functions must be called in the following way:
/// from_left_started
/// -> left_completed
/// -> right_completed
#[derive(Debug, Default, Clone)]
pub(crate) struct ParFSM {
prev_par: ParResult,
current_par: ParResult,
state_inserter: StateInserter,
state_handler: CtxStateHandler,
par_builder: ParBuilder,
}
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub enum SubtreeType {
Left,
Right,
}
impl ParFSM {
pub(crate) fn from_left_started(ingredients: MergerParResult, data_keeper: &mut DataKeeper) -> FSMResult<Self> {
// default is a par with empty left and right subtrees
let prev_par = ingredients.prev_par.unwrap_or_default();
let current_par = ingredients.current_par.unwrap_or_default();
let state_inserter = StateInserter::from_keeper(data_keeper);
let state_handler = CtxStateHandler::prepare(prev_par, current_par, data_keeper)?;
let par_builder = ParBuilder::from_keeper(data_keeper, &state_inserter);
let par_fsm = Self {
prev_par,
current_par,
state_inserter,
state_handler,
par_builder,
};
par_fsm.prepare_sliders(data_keeper, SubtreeType::Left)?;
Ok(par_fsm)
}
pub(crate) fn left_completed(&mut self, data_keeper: &mut DataKeeper) {
self.par_builder.track(data_keeper, SubtreeType::Left);
self.state_handler.handle_subtree_end(data_keeper, SubtreeType::Left);
// all invariants were checked in the ctor
let _ = self.prepare_sliders(data_keeper, SubtreeType::Right);
}
pub(crate) fn right_completed(mut self, data_keeper: &mut DataKeeper) {
self.par_builder.track(data_keeper, SubtreeType::Right);
let state = self.par_builder.build();
self.state_inserter.insert(data_keeper, state);
self.state_handler.handle_subtree_end(data_keeper, SubtreeType::Right);
}
fn prepare_sliders(&self, data_keeper: &mut DataKeeper, subtree_type: SubtreeType) -> FSMResult<()> {
let (prev_len, current_len) = match subtree_type {
SubtreeType::Left => (self.prev_par.left_size, self.current_par.left_size),
SubtreeType::Right => (self.prev_par.right_size, self.current_par.right_size),
};
data_keeper.prev_slider_mut().set_subtrace_len(prev_len as _)?;
data_keeper.current_slider_mut().set_subtrace_len(current_len as _)?;
Ok(())
}
}
use std::fmt;
impl fmt::Display for SubtreeType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SubtreeType::Left => write!(f, "left"),
SubtreeType::Right => write!(f, "right"),
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
/// Tracks lens of data_keeper.result_trace to build resulted Par state at the end.
#[derive(Debug, Default, Clone)]
pub(super) struct ParBuilder {
saved_states_count: usize,
left_subtree_size: usize,
right_subtree_size: usize,
}
impl ParBuilder {
// StateInserter here needs to guaranteed that ParBuilder creates after it,
// it must be so to right track a left subtree size
pub(super) fn from_keeper(data_keeper: &DataKeeper, _: &StateInserter) -> Self {
let saved_states_count = data_keeper.result_states_count();
Self {
saved_states_count,
left_subtree_size: 0,
right_subtree_size: 0,
}
}
pub(super) fn track(&mut self, data_keeper: &DataKeeper, subtree_type: SubtreeType) {
let prev_states_count = self.saved_states_count;
let states_count = data_keeper.result_states_count();
let resulted_states_count = states_count - prev_states_count;
match subtree_type {
SubtreeType::Left => self.left_subtree_size = resulted_states_count,
SubtreeType::Right => self.right_subtree_size = resulted_states_count,
}
self.saved_states_count = data_keeper.result_trace.len();
}
pub(super) fn build(self) -> ExecutedState {
// TODO: check that usize could be converted into u32
ExecutedState::par(self.left_subtree_size, self.right_subtree_size)
}
}

View File

@ -0,0 +1,88 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod new_states_calculation;
use super::*;
use new_states_calculation::compute_new_states;
/// At the end of a Par execution it's needed to update subtrace_len and positions of both sliders.
///
/// To see why it's really needed, imagine the following trace:
/// [par 9, 3]
/// [par 3, 5] <- left subtree of [par 9, 3]
/// [call rs 1] [call rs 2] [call rs 3] <- left subtree of [par 3, 5]
/// [call rs 4] [call rs 5] [call rs 6] [call rs 7] [call rs 8] <- right subtree of [par 3, 5]
/// [par 1, 1] <- right subtree of [par 9, 3]
/// [call e 9] <- left subtree of [par 1, 1]
/// [call e 10] <- right subtree of [par 1, 1]
///
/// where
/// call rs N - request sent state of Nth call
/// call e N - executed state of Nth call
///
/// and the following script:
/// (par
/// (xor
/// (par
/// (call 1-3)
/// (call 4-8)
/// )
/// (null) <- here could be any non-fallible set of instructions
/// )
/// (par
/// (call 9)
/// (call 10)
/// )
/// )
///
/// Suppose that call 5 (corresponds to [call rs 5]) will fail (f.e. call_service returns a service
/// error). Since it's wrapped with xor, then right subtree of xor (null) will be executed.
/// After that next par will be executed. This par has corresponding state [par 1, 1] in a trace,
/// and to allow slider to pop it it's needed to set updated position in a proper way, because
/// otherwise [call rs 6] will be returned.
///
/// This struct manages to save the updated lens and pos and update slider states to prevent
/// such situations.
///
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct CtxStateHandler {
left_pair: CtxStatesPair,
right_pair: CtxStatesPair,
}
impl CtxStateHandler {
/// Prepare new states that sliders will have after finishing executing of each subtree.
pub(super) fn prepare(
prev_par: ParResult,
current_par: ParResult,
data_keeper: &mut DataKeeper,
) -> FSMResult<Self> {
let left_pair = compute_new_states(data_keeper, prev_par, current_par, SubtreeType::Left)?;
let right_pair = compute_new_states(data_keeper, prev_par, current_par, SubtreeType::Right)?;
let handler = Self { left_pair, right_pair };
Ok(handler)
}
pub(super) fn handle_subtree_end(self, data_keeper: &mut DataKeeper, subtree_type: SubtreeType) {
match subtree_type {
SubtreeType::Left => update_ctx_states(self.left_pair, data_keeper),
SubtreeType::Right => update_ctx_states(self.right_pair, data_keeper),
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use crate::data_keeper::TraceSlider;
pub(super) fn compute_new_states(
data_keeper: &DataKeeper,
prev_par: ParResult,
current_par: ParResult,
subtree_type: SubtreeType,
) -> FSMResult<CtxStatesPair> {
let (prev_len, current_len) = match subtree_type {
SubtreeType::Left => (prev_par.left_size, current_par.left_size),
SubtreeType::Right => {
let prev_par_size = prev_par.size().ok_or(StateFSMError::ParLenOverflow(prev_par))?;
let current_par_size = current_par.size().ok_or(StateFSMError::ParLenOverflow(current_par))?;
(prev_par_size as u32, current_par_size as u32)
}
};
let prev_state = compute_new_state(prev_len as usize, data_keeper.prev_slider(), prev_par)?;
let current_state = compute_new_state(current_len as usize, data_keeper.current_slider(), current_par)?;
let pair = CtxStatesPair::new(prev_state, current_state);
Ok(pair)
}
fn compute_new_state(par_subtree_len: usize, slider: &TraceSlider, par: ParResult) -> FSMResult<CtxState> {
let pos = slider
.position()
.checked_add(par_subtree_len)
.ok_or_else(|| StateFSMError::ParPosOverflow(par, slider.position(), MergeCtxType::Previous))?;
let subtrace_len = slider
.subtrace_len()
.checked_sub(par_subtree_len)
.ok_or_else(|| StateFSMError::ParLenUnderflow(par, slider.subtrace_len(), MergeCtxType::Current))?;
let new_state = CtxState::new(pos, subtrace_len);
Ok(new_state)
}

View File

@ -0,0 +1,40 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::DataKeeper;
use super::ExecutedState;
/// This one is intended to optimize insertion in data to avoid insertion in a middle of it.
/// This is achieved by inserting a temporary state, track insert position and insert there
/// the final state.
#[derive(Debug, Default, Clone)]
pub(super) struct StateInserter {
position: usize,
}
impl StateInserter {
pub(super) fn from_keeper(data_keeper: &mut DataKeeper) -> Self {
let position = data_keeper.result_trace.len();
// this par is a temporary state
data_keeper.result_trace.push(ExecutedState::par(0, 0));
Self { position }
}
pub(super) fn insert(self, data_keeper: &mut DataKeeper, state: ExecutedState) {
data_keeper.result_trace[self.position] = state;
}
}

View File

@ -0,0 +1,61 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::DataKeeper;
use super::FSMResult;
use super::MergeCtx;
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct CtxState {
pub(super) pos: usize,
pub(super) subtrace_len: usize,
}
#[derive(Debug, Default, Clone, Copy)]
pub(super) struct CtxStatesPair {
pub(super) prev_state: CtxState,
pub(super) current_state: CtxState,
}
impl CtxState {
pub(super) fn new(pos: usize, subtrace_len: usize) -> Self {
Self { pos, subtrace_len }
}
pub(super) fn update_ctx_state(self, ctx: &mut MergeCtx) -> FSMResult<()> {
ctx.slider
.set_position_and_len(self.pos, self.subtrace_len)
.map_err(Into::into)
}
}
impl CtxStatesPair {
pub(super) fn new(prev_state: CtxState, current_state: CtxState) -> Self {
Self {
prev_state,
current_state,
}
}
}
pub(super) fn update_ctx_states(state_pair: CtxStatesPair, data_keeper: &mut DataKeeper) {
// these calls shouldn't produce a error, because sizes become less and
// they have been already checked in a state updater ctor. It's important
// to make it in a such way, because this function could be called from
// error_exit that shouldn't fail.
let _ = state_pair.prev_state.update_ctx_state(&mut data_keeper.prev_ctx);
let _ = state_pair.current_state.update_ctx_state(&mut data_keeper.current_ctx);
}