Decouple log targets to a separate crate (#152)

This commit is contained in:
Mike Voronov
2021-10-05 16:55:04 +03:00
committed by GitHub
parent adba9e8e65
commit e99c352a95
91 changed files with 63 additions and 36 deletions

View File

@ -0,0 +1,133 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod impls;
mod se_de;
use se_de::par_serializer;
use se_de::sender_serializer;
use serde::Deserialize;
use serde::Serialize;
use serde_json::Value as JValue;
use std::fmt::Formatter;
use std::rc::Rc;
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct ParResult {
pub left_size: u32,
pub right_size: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Sender {
PeerId(Rc<String>),
PeerIdWithCallId { peer_id: Rc<String>, call_id: u32 },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum CallResult {
/// Request was sent to a target node by node with such public key and it shouldn't be called again.
#[serde(with = "sender_serializer")]
#[serde(rename = "sent_by")]
RequestSentBy(Sender),
/// A corresponding call's been already executed with such value as a result.
Executed(Value),
/// call_service ended with a service error.
#[serde(rename = "failed")]
CallServiceFailed(i32, Rc<String>),
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Value {
Scalar(Rc<JValue>),
Stream { value: Rc<JValue>, generation: u32 },
}
/// Let's consider an example of trace that could be produces by the following fold:
/// (fold $stream v
/// (call 1)
/// (call 2)
/// (next v)
/// (call 3)
/// (call 4)
/// )
///
/// Having started with stream with two elements {v1, v2} the resulted trace would looks like
/// [(1) (2)] [(1) (2)] [(3) (4)] [(3) (4)] <--- the sequence of call states
/// v1 v2 v2 v1 <---- corresponding values from $stream that
/// the iterable v had at the moment of call
///
/// From this example, it could be seen that each instruction sequence inside fold is divided into
/// two intervals (left and right), each of these intervals has borders [begin, end).
/// So, this struct describes position inside overall execution_step trace belongs to one fold iteration.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct FoldSubTraceLore {
/// Position of current value in a trace.
#[serde(rename = "pos")]
pub value_pos: u32,
/// Descriptors of a subtrace that are corresponded to the current value. Technically, now
/// it always contains two values, and Vec here is used to have a possibility to handle more
/// than one next inside fold in future.
#[serde(rename = "desc")]
pub subtraces_desc: Vec<SubTraceDesc>,
}
/// Descriptor of a subtrace inside execution trace.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct SubTraceDesc {
/// Start position in a trace of this subtrace.
#[serde(rename = "pos")]
pub begin_pos: u32,
/// Length of the subtrace.
#[serde(rename = "len")]
pub subtrace_len: u32,
}
/// This type represents all information in an execution trace about states executed during
/// a fold execution.
pub type FoldLore = Vec<FoldSubTraceLore>;
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct FoldResult {
pub lore: FoldLore,
}
/// Describes result of applying functor `apply` to streams.
#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub struct ApResult {
#[serde(rename = "gens")]
pub res_generations: Vec<u32>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ExecutedState {
#[serde(with = "par_serializer")]
Par(ParResult),
Call(CallResult),
Fold(FoldResult),
Ap(ApResult),
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
impl ParResult {
pub fn new(left_size: u32, right_size: u32) -> Self {
Self {
left_size,
right_size,
}
}
/// Returns a size of subtrace that this par describes in execution_step trace.
pub fn size(&self) -> Option<usize> {
self.left_size
.checked_add(self.right_size)
.map(|v| v as usize)
}
}
impl CallResult {
pub fn sent_peer_id(peer_id: Rc<String>) -> CallResult {
CallResult::RequestSentBy(Sender::PeerId(peer_id))
}
pub fn sent_peer_id_with_call_id(peer_id: Rc<String>, call_id: u32) -> CallResult {
CallResult::RequestSentBy(Sender::PeerIdWithCallId { peer_id, call_id })
}
pub fn executed_scalar(value: Rc<JValue>) -> CallResult {
let value = Value::Scalar(value);
CallResult::Executed(value)
}
pub fn executed_stream(value: Rc<JValue>, generation: u32) -> CallResult {
let value = Value::Stream { value, generation };
CallResult::Executed(value)
}
pub fn failed(ret_code: i32, error_msg: impl Into<String>) -> CallResult {
CallResult::CallServiceFailed(ret_code, Rc::new(error_msg.into()))
}
}
impl SubTraceDesc {
pub fn new(begin_pos: usize, subtrace_len: usize) -> Self {
Self {
begin_pos: begin_pos as _,
subtrace_len: subtrace_len as _,
}
}
}
impl ExecutedState {
pub fn par(left_subtree_size: usize, right_subtree_size: usize) -> Self {
let par_result = ParResult {
left_size: left_subtree_size as _,
right_size: right_subtree_size as _,
};
Self::Par(par_result)
}
}
impl ApResult {
pub fn new(res_gens: Vec<u32>) -> Self {
Self {
res_generations: res_gens,
}
}
}
impl std::fmt::Display for ExecutedState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
use CallResult::*;
use ExecutedState::*;
match self {
Par(ParResult {
left_size: left_subtree_size,
right_size: right_subtree_size,
}) => write!(f, "par({}, {})", left_subtree_size, right_subtree_size),
Call(RequestSentBy(sender)) => write!(f, r"{}", sender),
Call(Executed(value)) => {
write!(f, "executed({})", value)
}
Call(CallServiceFailed(ret_code, err_msg)) => {
write!(f, r#"call_service_failed({}, "{}")"#, ret_code, err_msg)
}
Fold(FoldResult { lore }) => {
writeln!(f, "fold(",)?;
for sublore in lore {
writeln!(
f,
" {} - [{}, {}], [{}, {}]",
sublore.value_pos,
sublore.subtraces_desc[0].begin_pos,
sublore.subtraces_desc[0].subtrace_len,
sublore.subtraces_desc[1].begin_pos,
sublore.subtraces_desc[1].subtrace_len
)?;
}
write!(f, " )")
}
Ap(ap) => {
write!(f, "ap: _ -> {:?}", ap.res_generations)
}
}
}
}
impl std::fmt::Display for Value {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
Value::Scalar(value) => write!(f, "scalar: {}", value),
Value::Stream { value, generation } => {
write!(f, "stream: {} generation: {}", value, generation)
}
}
}
}
impl std::fmt::Display for Sender {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Sender::PeerId(peer_id) => write!(f, "request_sent_by({})", peer_id),
Sender::PeerIdWithCallId { peer_id, call_id } => {
write!(f, "request_sent_by({}: {})", peer_id, call_id)
}
}
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::*;
use serde::de::SeqAccess;
use serde::de::Visitor;
use serde::ser::SerializeSeq;
use serde::Deserializer;
use serde::Serializer;
use std::fmt;
pub mod par_serializer {
use super::*;
pub fn serialize<S>(value: &ParResult, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = serializer.serialize_seq(Some(2))?;
seq.serialize_element(&value.left_size)?;
seq.serialize_element(&value.right_size)?;
seq.end()
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<ParResult, D::Error>
where
D: Deserializer<'de>,
{
struct ParVisitor;
impl<'de> Visitor<'de> for ParVisitor {
type Value = ParResult;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("[left_size, right_size]")
}
fn visit_seq<A: SeqAccess<'de>>(self, mut seq: A) -> Result<Self::Value, A::Error> {
let left_size = seq.next_element::<u32>()?;
let right_size = seq.next_element::<u32>()?;
let (left_size, right_size) = match (left_size, right_size) {
(Some(left_size), Some(right_size)) => (left_size, right_size),
_ => return Err(serde::de::Error::custom(
"failed to deserialize ParResult, not enough elements in serialized array",
)),
};
let par_result = ParResult::new(left_size, right_size);
Ok(par_result)
}
}
deserializer.deserialize_seq(ParVisitor {})
}
}
pub mod sender_serializer {
use super::*;
pub fn serialize<S>(value: &Sender, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match value {
Sender::PeerId(peer_id) => serializer.serialize_str(peer_id.as_str()),
Sender::PeerIdWithCallId { peer_id, call_id } => {
let result = format!("{}: {}", peer_id, call_id);
serializer.serialize_str(&result)
}
}
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Sender, D::Error>
where
D: Deserializer<'de>,
{
struct SenderVisitor;
impl<'de> Visitor<'de> for SenderVisitor {
type Value = Sender;
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.write_str("call sender")
}
fn visit_str<E: serde::de::Error>(self, raw_sender: &str) -> Result<Self::Value, E> {
let sender = match raw_sender.find(": ") {
None => Sender::PeerId(Rc::new(raw_sender.to_string())),
Some(pos) => {
let peer_id = raw_sender[..pos].to_string();
let call_id = &raw_sender[pos + 2..];
let call_id = call_id.parse::<u32>().map_err(|e| {
serde::de::Error::custom(format!(
"failed to parse call_id of a sender {}: {}",
call_id, e
))
})?;
Sender::PeerIdWithCallId {
peer_id: Rc::new(peer_id),
call_id,
}
}
};
Ok(sender)
}
}
deserializer.deserialize_str(SenderVisitor {})
}
}

View File

@ -0,0 +1,122 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use super::ExecutedState;
use super::DATA_FORMAT_VERSION;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::ops::Deref;
pub type StreamGenerations = HashMap<String, u32>;
pub type ExecutionTrace = Vec<ExecutedState>;
/// The AIR interpreter could be considered as a function
/// f(prev_data: InterpreterData, current_data: InterpreterData, ... ) -> (result_data: InterpreterData, ...).
/// This function receives prev and current data and produces a result data. All these data
/// have the following format.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InterpreterData {
/// Trace of AIR execution, which contains executed call, par and fold states.
pub trace: ExecutionTrace,
/// Contains maximum generation for each stream. This info will be used while merging
/// values in streams.
pub streams: StreamGenerations,
/// Version of this data format.
pub version: semver::Version,
/// Last exposed to a peer call request id. All next call request ids will be bigger than this.
#[serde(default)]
#[serde(rename = "lcid")]
pub last_call_request_id: u32,
}
impl InterpreterData {
pub fn new() -> Self {
Self {
trace: <_>::default(),
streams: <_>::default(),
version: DATA_FORMAT_VERSION.deref().clone(),
last_call_request_id: 0,
}
}
pub fn from_execution_result(
trace: ExecutionTrace,
streams: StreamGenerations,
last_call_request_id: u32,
) -> Self {
Self {
trace,
streams,
version: DATA_FORMAT_VERSION.deref().clone(),
last_call_request_id,
}
}
/// Tries to de InterpreterData from slice according to the data version.
pub fn try_from_slice(slice: &[u8]) -> Result<Self, serde_json::Error> {
// treat empty slice as an empty interpreter data allows abstracting from
// the internal format for empty data.
if slice.is_empty() {
return Ok(Self::default());
}
serde_json::from_slice(slice)
}
}
impl Default for InterpreterData {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::Deserialize;
use serde::Serialize;
#[test]
fn compatible_with_0_2_0_version() {
#[derive(Serialize, Deserialize)]
struct InterpreterData0_2_0 {
pub trace: ExecutionTrace,
pub streams: StreamGenerations,
pub version: semver::Version,
}
// test 0.2.0 to 0.2.1 conversion
let data_0_2_0 = InterpreterData0_2_0 {
trace: ExecutionTrace::default(),
streams: StreamGenerations::default(),
version: semver::Version::new(0, 2, 0),
};
let data_0_2_0_se = serde_json::to_vec(&data_0_2_0).unwrap();
let data_0_2_1 = serde_json::from_slice::<InterpreterData>(&data_0_2_0_se);
assert!(data_0_2_1.is_ok());
// test 0.2.1 to 0.2.1 conversion
let data_0_2_1 = InterpreterData::default();
let data_0_2_1_se = serde_json::to_vec(&data_0_2_1).unwrap();
let data_0_2_0 = serde_json::from_slice::<InterpreterData0_2_0>(&data_0_2_1_se);
assert!(data_0_2_0.is_ok());
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
mod executed_state;
mod interpreter_data;
pub use executed_state::*;
pub use interpreter_data::*;
use once_cell::sync::Lazy;
use std::str::FromStr;
pub static DATA_FORMAT_VERSION: Lazy<semver::Version> = Lazy::new(|| {
semver::Version::from_str("0.2.1").expect("invalid data format version specified")
});