mirror of
https://github.com/fluencelabs/aqua.git
synced 2025-04-24 22:42:13 +00:00
427 topology bug (#433)
* fix path * topology bug 427 test * test with join, it works * improve topology test, still works. add compiler test * broken compiler test * CompilerSpec works fine without `wrapWithXor` * add xor to topology test, it becomes broken * XOR topology fixed * ForceExecModel * Debugging topology WIP * Fixed Co-authored-by: DieMyst <dmitry.shakhtarin@fluence.ai>
This commit is contained in:
parent
1a8af46b51
commit
c74eb06499
@ -1,6 +1,22 @@
|
||||
service TestS("some-id"):
|
||||
t: string -> string
|
||||
service Kademlia("kad"):
|
||||
neighborhood: -> []string
|
||||
|
||||
service Peer("peer"):
|
||||
timestamp_sec: -> u64
|
||||
|
||||
service Op2("op"):
|
||||
identity: u64 -> u64
|
||||
|
||||
func getTwoResults(node: string) -> []u64:
|
||||
on node:
|
||||
nodes <- Kademlia.neighborhood()
|
||||
res: *u64
|
||||
for n <- nodes par:
|
||||
on n:
|
||||
try:
|
||||
res <- Peer.timestamp_sec()
|
||||
Op2.identity(res!)
|
||||
Op2.identity(res!1)
|
||||
Op2.identity(res!2)
|
||||
<- res
|
||||
|
||||
func doStuff(c: bool):
|
||||
if c:
|
||||
TestS.t("fr")
|
@ -1,14 +1,12 @@
|
||||
package aqua
|
||||
|
||||
import cats.effect.ExitCode
|
||||
import cats.effect.kernel.Async
|
||||
import com.monovore.decline.Opts
|
||||
import fs2.io.file.{Files, Path}
|
||||
import cats.syntax.applicative._
|
||||
import fs2.io.file.Path
|
||||
|
||||
// Scala-specific options and subcommands
|
||||
object PlatformOpts {
|
||||
def opts[F[_]]: Opts[F[ExitCode]] = Opts.never
|
||||
def getGlobalNodeModulePath: Option[Path] = None
|
||||
def getPackagePath[F[_]: Files: Async](path: String): F[Path] = Path(path).pure[F]
|
||||
def getGlobalNodeModulePath: List[Path] = Nil
|
||||
def getPackagePath: Option[Path] = None
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ object Test extends IOApp.Simple {
|
||||
start <- IO(System.currentTimeMillis())
|
||||
_ <- AquaPathCompiler
|
||||
.compileFilesTo[IO](
|
||||
Path("./aqua-src/hack.aqua"),
|
||||
Path("./aqua-src/so.aqua"),
|
||||
List(Path("./aqua")),
|
||||
Option(Path("./target")),
|
||||
TypeScriptBackend,
|
||||
|
@ -1,21 +1,31 @@
|
||||
package aqua.compiler
|
||||
|
||||
import aqua.model.{CallModel, LiteralModel, VarModel}
|
||||
import aqua.model.{CallModel, IntoIndexModel, LiteralModel, ValueModel, VarModel}
|
||||
import aqua.model.transform.TransformConfig
|
||||
import aqua.model.transform.Transform
|
||||
import aqua.parser.ParserError
|
||||
import aqua.parser.Ast
|
||||
import aqua.parser.Parser
|
||||
import aqua.parser.lift.Span
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw}
|
||||
import aqua.res.{ApRes, CallRes, CallServiceRes, RestrictionRes, SeqRes}
|
||||
import aqua.types.{ArrayType, LiteralType, ScalarType, StreamType}
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.res.{
|
||||
ApRes,
|
||||
CallRes,
|
||||
CallServiceRes,
|
||||
FoldRes,
|
||||
MakeRes,
|
||||
NextRes,
|
||||
ParRes,
|
||||
RestrictionRes,
|
||||
SeqRes
|
||||
}
|
||||
import aqua.types.{ArrayType, LiteralType, ScalarType, StreamType, Type}
|
||||
import org.scalatest.flatspec.AnyFlatSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import cats.Id
|
||||
import cats.data.{Chain, NonEmptyChain, Validated, ValidatedNec}
|
||||
import cats.instances.string.*
|
||||
import cats.syntax.show._
|
||||
import cats.syntax.show.*
|
||||
|
||||
class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
@ -76,6 +86,118 @@ class AquaCompilerSpec extends AnyFlatSpec with Matchers {
|
||||
|
||||
}
|
||||
|
||||
def through(peer: ValueModel, log: String = null) =
|
||||
MakeRes.noop(peer, log)
|
||||
|
||||
val relay = VarRaw("-relay-", ScalarType.string)
|
||||
|
||||
def getDataSrv(name: String, t: Type) = {
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("getDataSrv")),
|
||||
name,
|
||||
CallRes(Nil, Some(CallModel.Export(name, t))),
|
||||
LiteralModel.fromRaw(ValueRaw.InitPeerId)
|
||||
).leaf
|
||||
}
|
||||
|
||||
"aqua compiler" should "create right topology" in {
|
||||
|
||||
val res = compileToContext(
|
||||
Map(
|
||||
"index.aqua" ->
|
||||
"""service Op("op"):
|
||||
| identity(s: string) -> string
|
||||
|
|
||||
|func exec(peers: []string) -> []string:
|
||||
| results: *string
|
||||
| for peer <- peers par:
|
||||
| on peer:
|
||||
| results <- Op.identity("hahahahah")
|
||||
|
|
||||
| join results[2]
|
||||
| <- results""".stripMargin
|
||||
),
|
||||
Map.empty
|
||||
)
|
||||
|
||||
res.isValid should be(true)
|
||||
val Validated.Valid(ctxs) = res
|
||||
|
||||
ctxs.length should be(1)
|
||||
val ctx = ctxs.headOption.get
|
||||
|
||||
val aquaRes =
|
||||
Transform.contextRes(ctx, TransformConfig(wrapWithXor = false))
|
||||
|
||||
val Some(exec) = aquaRes.funcs.find(_.funcName == "exec")
|
||||
|
||||
val peers = VarModel("peers", ArrayType(ScalarType.string))
|
||||
val peer = VarModel("peer", ScalarType.string)
|
||||
val results = VarModel("results", StreamType(ScalarType.string))
|
||||
val initPeer = LiteralModel.fromRaw(ValueRaw.InitPeerId)
|
||||
|
||||
val expected =
|
||||
SeqRes.wrap(
|
||||
getDataSrv("-relay-", ScalarType.string),
|
||||
getDataSrv(peers.name, peers.`type`),
|
||||
RestrictionRes("results", true).wrap(
|
||||
SeqRes.wrap(
|
||||
ParRes.wrap(
|
||||
FoldRes("peer", peers).wrap(
|
||||
ParRes.wrap(
|
||||
// better if first relay will be outside `for`
|
||||
SeqRes.wrap(
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("op")),
|
||||
"identity",
|
||||
CallRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("hahahahah")) :: Nil,
|
||||
Some(CallModel.Export(results.name, results.`type`))
|
||||
),
|
||||
peer
|
||||
).leaf,
|
||||
through(ValueModel.fromRaw(relay)),
|
||||
through(initPeer)
|
||||
),
|
||||
NextRes("peer").leaf
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("op")),
|
||||
"noop",
|
||||
CallRes(
|
||||
results.copy(lambda = Chain.one(IntoIndexModel("2", ScalarType.string))) :: Nil,
|
||||
None
|
||||
),
|
||||
initPeer
|
||||
).leaf,
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("op")),
|
||||
"identity",
|
||||
CallRes(
|
||||
results :: Nil,
|
||||
Some(CallModel.Export("results-fix", ArrayType(ScalarType.string)))
|
||||
),
|
||||
initPeer
|
||||
).leaf
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel.fromRaw(LiteralRaw.quote("callbackSrv")),
|
||||
"response",
|
||||
CallRes(
|
||||
VarModel("results-fix", ArrayType(ScalarType.string)) :: Nil,
|
||||
None
|
||||
),
|
||||
initPeer
|
||||
).leaf
|
||||
)
|
||||
|
||||
exec.body.equalsOrShowDiff(expected) shouldBe (true)
|
||||
}
|
||||
|
||||
"aqua compiler" should "compile with imports" in {
|
||||
|
||||
val res = compileToContext(
|
||||
|
@ -40,6 +40,8 @@ object OpModel extends TreeNodeCompanion[OpModel] {
|
||||
|
||||
sealed trait NoExecModel extends OpModel
|
||||
|
||||
sealed trait ForceExecModel extends OpModel
|
||||
|
||||
sealed trait GroupOpModel extends OpModel
|
||||
|
||||
sealed trait SeqGroupModel extends GroupOpModel
|
||||
@ -86,12 +88,16 @@ case class RestrictionModel(name: String, isStream: Boolean) extends SeqGroupMod
|
||||
case class MatchMismatchModel(left: ValueModel, right: ValueModel, shouldMatch: Boolean)
|
||||
extends SeqGroupModel {
|
||||
|
||||
override def toString: String = s"if $left ${if (shouldMatch) "==" else "!="} $right"
|
||||
|
||||
override def usesVarNames: Set[String] =
|
||||
left.usesVarNames ++ right.usesVarNames
|
||||
}
|
||||
|
||||
case class ForModel(item: String, iterable: ValueModel) extends SeqGroupModel {
|
||||
|
||||
override def toString: String = s"for $item <- $iterable"
|
||||
|
||||
override def restrictsVarNames: Set[String] = Set(item)
|
||||
|
||||
override def usesVarNames: Set[String] = iterable.usesVarNames
|
||||
@ -99,6 +105,7 @@ case class ForModel(item: String, iterable: ValueModel) extends SeqGroupModel {
|
||||
}
|
||||
|
||||
case class DeclareStreamModel(value: ValueModel) extends NoExecModel {
|
||||
override def toString: String = s"declare $value"
|
||||
|
||||
override def usesVarNames: Set[String] = value.usesVarNames
|
||||
}
|
||||
@ -117,21 +124,26 @@ case class PushToStreamModel(value: ValueModel, exportTo: CallModel.Export) exte
|
||||
}
|
||||
|
||||
case class CallServiceModel(serviceId: ValueModel, funcName: String, call: CallModel)
|
||||
extends OpModel {
|
||||
extends ForceExecModel {
|
||||
|
||||
override def toString: String = s"(call _ ($serviceId $funcName) $call)"
|
||||
|
||||
override lazy val usesVarNames: Set[String] = serviceId.usesVarNames ++ call.usesVarNames
|
||||
|
||||
override def exportsVarNames: Set[String] = call.exportTo.map(_.name).toSet
|
||||
}
|
||||
|
||||
case class CanonicalizeModel(operand: ValueModel, exportTo: CallModel.Export) extends OpModel {
|
||||
case class CanonicalizeModel(operand: ValueModel, exportTo: CallModel.Export)
|
||||
extends ForceExecModel {
|
||||
|
||||
override def exportsVarNames: Set[String] = Set(exportTo.name)
|
||||
|
||||
override def usesVarNames: Set[String] = operand.usesVarNames
|
||||
}
|
||||
|
||||
case class JoinModel(operands: NonEmptyList[ValueModel]) extends OpModel {
|
||||
case class JoinModel(operands: NonEmptyList[ValueModel]) extends ForceExecModel {
|
||||
|
||||
override def toString: String = s"join ${operands.toList.mkString(", ")}"
|
||||
|
||||
override lazy val usesVarNames: Set[String] =
|
||||
operands.toList.flatMap(_.usesVarNames).toSet
|
||||
|
@ -65,17 +65,25 @@ object LambdaModel {
|
||||
}
|
||||
|
||||
case class IntoFieldModel(field: String, `type`: Type) extends LambdaModel {
|
||||
override def toString: String = s".$field:${`type`}"
|
||||
|
||||
override def toRaw: LambdaRaw = IntoFieldRaw(field, `type`)
|
||||
}
|
||||
|
||||
case class IntoIndexModel(idx: String, `type`: Type) extends LambdaModel {
|
||||
override lazy val usesVarNames: Set[String] = Set(idx).filterNot(_.forall(Character.isDigit))
|
||||
|
||||
override def toRaw: LambdaRaw = IntoIndexRaw(if (idx.forall(Character.isDigit)) LiteralRaw(idx, LiteralType.number) else VarRaw(idx, LiteralType.number), `type`)
|
||||
override def toString: String = s"[$idx -> ${`type`}]"
|
||||
|
||||
override def toRaw: LambdaRaw = IntoIndexRaw(
|
||||
if (idx.forall(Character.isDigit)) LiteralRaw(idx, LiteralType.number)
|
||||
else VarRaw(idx, LiteralType.number),
|
||||
`type`
|
||||
)
|
||||
}
|
||||
|
||||
case class VarModel(name: String, baseType: Type, lambda: Chain[LambdaModel] = Chain.empty)
|
||||
extends ValueModel with Logging {
|
||||
extends ValueModel with Logging {
|
||||
|
||||
override lazy val usesVarNames: Set[String] =
|
||||
lambda.toList.map(_.usesVarNames).foldLeft(Set(name))(_ ++ _)
|
||||
@ -109,7 +117,7 @@ case class VarModel(name: String, baseType: Type, lambda: Chain[LambdaModel] = C
|
||||
res <- two(variable)
|
||||
<- variable
|
||||
*/
|
||||
case vm@VarModel(nn, _, _) if nn == name => deriveFrom(vm)
|
||||
case vm @ VarModel(nn, _, _) if nn == name => deriveFrom(vm)
|
||||
// it couldn't go to a cycle as long as the semantics protects it
|
||||
case _ =>
|
||||
n.resolveWith(vals) match {
|
||||
|
@ -44,8 +44,7 @@ case class Topology private (
|
||||
|
||||
lazy val firstExecutesOn: Eval[Option[List[OnModel]]] =
|
||||
(cursor.op match {
|
||||
case _: CallServiceModel => pathOn.map(Some(_))
|
||||
case _: JoinModel => pathOn.map(Some(_))
|
||||
case _: ForceExecModel => pathOn.map(Some(_))
|
||||
case _ =>
|
||||
children
|
||||
.map(_.firstExecutesOn)
|
||||
@ -60,8 +59,7 @@ case class Topology private (
|
||||
|
||||
lazy val lastExecutesOn: Eval[Option[List[OnModel]]] =
|
||||
(cursor.op match {
|
||||
case _: CallServiceModel => pathOn.map(Some(_))
|
||||
case _: JoinModel => pathOn.map(Some(_))
|
||||
case _: ForceExecModel => pathOn.map(Some(_))
|
||||
case _ =>
|
||||
children
|
||||
.map(_.lastExecutesOn)
|
||||
@ -185,13 +183,22 @@ object Topology extends Logging {
|
||||
def endsOn(current: Topology): Eval[List[OnModel]] =
|
||||
current.beginsOn
|
||||
|
||||
protected def lastChildFinally(current: Topology): Eval[List[OnModel]] =
|
||||
current.lastChild.map(lc =>
|
||||
private def childFinally(
|
||||
current: Topology,
|
||||
child: Topology => Option[Topology]
|
||||
): Eval[List[OnModel]] =
|
||||
child(current).map(lc =>
|
||||
lc.forceExit.flatMap {
|
||||
case true => current.afterOn
|
||||
case false => lc.endsOn
|
||||
}
|
||||
) getOrElse current.beginsOn
|
||||
|
||||
protected def lastChildFinally(current: Topology): Eval[List[OnModel]] =
|
||||
childFinally(current, _.lastChild)
|
||||
|
||||
protected def firstChildFinally(current: Topology): Eval[List[OnModel]] =
|
||||
childFinally(current, _.firstChild)
|
||||
}
|
||||
|
||||
trait After {
|
||||
@ -286,7 +293,8 @@ object Topology extends Logging {
|
||||
.map(t => t -> t.parent.map(_.cursor.op))
|
||||
.takeWhile {
|
||||
case (t, Some(_: ParGroupModel)) => true
|
||||
case (t, _) => t.nextSibling.isEmpty
|
||||
case (t, Some(_: SeqGroupModel)) => t.nextSibling.isEmpty
|
||||
case _ => false
|
||||
}
|
||||
.map(_._1)
|
||||
.map(t => t -> t.cursor.op)
|
||||
@ -300,7 +308,11 @@ object Topology extends Logging {
|
||||
.fold(Eval.later(current.cursor.moveUp.exists(_.hasExecLater)))(_.forceExit)
|
||||
|
||||
override def afterOn(current: Topology): Eval[List[OnModel]] =
|
||||
closestParExit(current).fold(afterParent(current))(_.afterOn)
|
||||
current.forceExit.flatMap {
|
||||
case true =>
|
||||
closestParExit(current).fold(afterParent(current))(_.afterOn)
|
||||
case false => super.afterOn(current)
|
||||
}
|
||||
|
||||
// Parent of this branch's parent xor – fixes the case when this xor is in par
|
||||
override def pathAfter(current: Topology): Eval[Chain[ValueModel]] =
|
||||
@ -328,7 +340,7 @@ object Topology extends Logging {
|
||||
|
||||
// Xor tag ends where any child ends; can't get first one as it may lead to recursion
|
||||
override def endsOn(current: Topology): Eval[List[OnModel]] =
|
||||
lastChildFinally(current)
|
||||
firstChildFinally(current)
|
||||
|
||||
}
|
||||
|
||||
@ -496,7 +508,7 @@ object Topology extends Logging {
|
||||
|
||||
logger.trace("Resolved: " + resolved)
|
||||
|
||||
if (debug) {
|
||||
if (debug /*|| currI == 11 || currI == 12 || currI == 14*/ ) {
|
||||
println(Console.BLUE + rc + Console.RESET)
|
||||
println(currI + " : " + rc.topology)
|
||||
println("Before: " + rc.topology.beforeOn.value)
|
||||
@ -510,7 +522,10 @@ object Topology extends Logging {
|
||||
|
||||
println("End : " + rc.topology.endsOn.value)
|
||||
println("After: " + rc.topology.afterOn.value)
|
||||
println("Exit : " + rc.topology.forceExit.value)
|
||||
println(
|
||||
"Exit : " + (if (rc.topology.forceExit.value) Console.MAGENTA + "true" + Console.RESET
|
||||
else "false")
|
||||
)
|
||||
println(
|
||||
(if (rc.topology.pathAfter.value.nonEmpty) Console.YELLOW
|
||||
else "") + "PathAfter: " + Console.RESET + rc.topology.pathAfter.value
|
||||
|
@ -39,6 +39,7 @@ object ModelBuilder {
|
||||
val otherRelay = LiteralRaw("other-relay", ScalarType.string)
|
||||
val otherPeer2 = LiteralRaw("other-peer-2", ScalarType.string)
|
||||
val otherRelay2 = LiteralRaw("other-relay-2", ScalarType.string)
|
||||
val iRelay = VarRaw("i", ScalarType.string)
|
||||
val varNode = VarRaw("node-id", ScalarType.string)
|
||||
val viaList = VarRaw("other-relay-2", ArrayType(ScalarType.string))
|
||||
val valueArray = VarRaw("array", ArrayType(ScalarType.string))
|
||||
|
@ -420,6 +420,153 @@ class TopologySpec extends AnyFlatSpec with Matchers {
|
||||
proc.equalsOrShowDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
// https://github.com/fluencelabs/aqua/issues/427
|
||||
"topology resolver" should "create returning hops after for-par with inner `on` and xor" in {
|
||||
|
||||
val streamRaw = VarRaw("stream", StreamType(ScalarType.string))
|
||||
val streamRawEl = VarRaw(
|
||||
"stream",
|
||||
StreamType(ScalarType.string),
|
||||
Chain.one(IntoIndexRaw(LiteralRaw("2", ScalarType.u32), ScalarType.string))
|
||||
)
|
||||
val stream = ValueModel.fromRaw(streamRaw)
|
||||
val streamEl = ValueModel.fromRaw(streamRawEl)
|
||||
|
||||
val init =
|
||||
SeqModel.wrap(
|
||||
DeclareStreamModel(stream).leaf,
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
foldPar(
|
||||
"i",
|
||||
valueArray,
|
||||
OnModel(iRelay, Chain.empty).wrap(
|
||||
XorModel.wrap(
|
||||
callModel(2, CallModel.Export(streamRaw.name, streamRaw.`type`) :: Nil),
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
callModel(4, Nil, Nil)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
JoinModel(NonEmptyList.one(streamEl)).leaf,
|
||||
callModel(3, Nil, streamRaw :: Nil)
|
||||
)
|
||||
)
|
||||
|
||||
val proc = Topology.resolve(init).value
|
||||
|
||||
val expected =
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
ParRes.wrap(
|
||||
FoldRes("i", valueArray).wrap(
|
||||
ParRes.wrap(
|
||||
// better if first relay will be outside `for`
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
XorRes.wrap(
|
||||
callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`))),
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
callRes(4, initPeer)
|
||||
)
|
||||
),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
),
|
||||
NextRes("i").leaf
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel(s"\"op\"", LiteralType.string),
|
||||
s"noop",
|
||||
CallRes(streamEl :: Nil, None),
|
||||
initPeer
|
||||
).leaf,
|
||||
callRes(3, initPeer, None, stream :: Nil)
|
||||
)
|
||||
proc.equalsOrShowDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
// https://github.com/fluencelabs/aqua/issues/427
|
||||
"topology resolver" should "create returning hops after for-par with inner `on` and xor, version 2" in {
|
||||
|
||||
val streamRaw = VarRaw("stream", StreamType(ScalarType.string))
|
||||
val streamRawEl = VarRaw(
|
||||
"stream",
|
||||
StreamType(ScalarType.string),
|
||||
Chain.one(IntoIndexRaw(LiteralRaw("2", ScalarType.u32), ScalarType.string))
|
||||
)
|
||||
val stream = ValueModel.fromRaw(streamRaw)
|
||||
val streamEl = ValueModel.fromRaw(streamRawEl)
|
||||
|
||||
val init =
|
||||
SeqModel.wrap(
|
||||
DeclareStreamModel(stream).leaf,
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
foldPar(
|
||||
"i",
|
||||
valueArray,
|
||||
OnModel(iRelay, Chain.empty).wrap(
|
||||
XorModel.wrap(
|
||||
XorModel.wrap(
|
||||
callModel(2, CallModel.Export(streamRaw.name, streamRaw.`type`) :: Nil)
|
||||
),
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
callModel(4, Nil, Nil)
|
||||
)
|
||||
)
|
||||
)
|
||||
),
|
||||
JoinModel(NonEmptyList.one(streamEl)).leaf,
|
||||
callModel(3, Nil, streamRaw :: Nil)
|
||||
)
|
||||
)
|
||||
|
||||
val proc = Topology.resolve(init).value
|
||||
|
||||
val expected =
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
ParRes.wrap(
|
||||
FoldRes("i", valueArray).wrap(
|
||||
ParRes.wrap(
|
||||
// better if first relay will be outside `for`
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
XorRes.wrap(
|
||||
XorRes.wrap(
|
||||
callRes(2, iRelay, Some(CallModel.Export(streamRaw.name, streamRaw.`type`)))
|
||||
),
|
||||
SeqRes.wrap(
|
||||
through(relay),
|
||||
callRes(4, initPeer)
|
||||
)
|
||||
),
|
||||
through(relay),
|
||||
through(initPeer)
|
||||
),
|
||||
NextRes("i").leaf
|
||||
)
|
||||
)
|
||||
),
|
||||
CallServiceRes(
|
||||
LiteralModel(s"\"op\"", LiteralType.string),
|
||||
s"noop",
|
||||
CallRes(streamEl :: Nil, None),
|
||||
initPeer
|
||||
).leaf,
|
||||
callRes(3, initPeer, None, stream :: Nil)
|
||||
)
|
||||
|
||||
// println(Console.MAGENTA + init.show + Console.RESET)
|
||||
// println(Console.YELLOW + proc.show + Console.RESET)
|
||||
// println(Console.BLUE + expected.show + Console.RESET)
|
||||
|
||||
proc.equalsOrShowDiff(expected) should be(true)
|
||||
}
|
||||
|
||||
"topology resolver" should "create returning hops on nested 'on'" in {
|
||||
val init =
|
||||
OnModel(initPeer, Chain.one(relay)).wrap(
|
||||
|
Loading…
x
Reference in New Issue
Block a user