diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index feff4bc1..7b0d7a85 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -51,8 +51,8 @@ jobs: - name: Check .js exists run: | - JS="cli/.js/target/scala-3.0.2/cli-opt/aqua-${{ env.VERSION }}.js" - mv cli/.js/target/scala-3.0.2/cli-opt/main.js "$JS" + JS="cli/.js/target/scala-3.1.0/cli-opt/aqua-${{ env.VERSION }}.js" + mv cli/.js/target/scala-3.1.0/cli-opt/main.js "$JS" stat "$JS" echo "JS=$JS" >> $GITHUB_ENV diff --git a/.github/workflows/test_branch.yml b/.github/workflows/test_branch.yml index be0bcc50..3b5bd52b 100644 --- a/.github/workflows/test_branch.yml +++ b/.github/workflows/test_branch.yml @@ -52,7 +52,7 @@ jobs: git clone https://github.com/fluencelabs/aqua-playground.git sbt "cliJS/fastOptJS" rm -rf aqua-playground/src/compiled/examples/* - mv cli/.js/target/scala-3.0.2/cli-fastopt.js npm/aqua.js + mv cli/.js/target/scala-3.1.0/cli-fastopt.js npm/aqua.js cd npm npm i cd ../aqua-playground diff --git a/aqua-src/foldJoin.aqua b/aqua-src/foldJoin.aqua new file mode 100644 index 00000000..e0939e0c --- /dev/null +++ b/aqua-src/foldJoin.aqua @@ -0,0 +1,11 @@ +service Op2("op"): + identity(s: u64) + +service Peer("peer"): + timestamp_sec: -> u64 + +func getTwoResults(): + on "other node": + co on "x": + z <- Peer.timestamp_sec() + Op2.identity(z) \ No newline at end of file diff --git a/aqua-src/nopingback.aqua b/aqua-src/nopingback.aqua new file mode 100644 index 00000000..f8e4515d --- /dev/null +++ b/aqua-src/nopingback.aqua @@ -0,0 +1,23 @@ +service Kademlia("kad"): + neighborhood: string, ?bool, ?bool -> []string + +service Peer("peer"): + timestamp_sec: -> () + timeout: u32, string -> () + +func ack_peers() -> []string: + acked_peers: *string + + on HOST_PEER_ID: + nodes <- Kademlia.neighborhood(%init_peer_id%, nil, nil) + for n <- nodes par: + status: *string + on n: + Peer.timestamp_sec() + status <<- "acked" + + if status! == "acked": + acked_peers <<- n + + Peer.timeout(15000, "") -- this line's indentation triggers the bug + <- acked_peers \ No newline at end of file diff --git a/aqua-src/parfold.aqua b/aqua-src/parfold.aqua index a3d21e89..8c5404a6 100644 --- a/aqua-src/parfold.aqua +++ b/aqua-src/parfold.aqua @@ -5,7 +5,6 @@ service Moo("tools"): func foo(): ss <- Moo.bla() on HOST_PEER_ID: - Moo.bla() for s <- ss par: on s: Moo.bla() \ No newline at end of file diff --git a/aqua-src/ret.aqua b/aqua-src/ret.aqua index ff8e1dd5..4ca7f726 100644 --- a/aqua-src/ret.aqua +++ b/aqua-src/ret.aqua @@ -1,7 +1,11 @@ module Ret declares * -export someFunc +export getTwoResults -func someFunc(cb: []string -> ()): - ifaces: *string - cb(ifaces) +service Peer("peer"): + timestamp_sec: -> u64 + +func getTwoResults() -> u64: + on "other node": + res <- Peer.timestamp_sec() + <- res \ No newline at end of file diff --git a/aqua-src/so.aqua b/aqua-src/so.aqua new file mode 100644 index 00000000..e0a195de --- /dev/null +++ b/aqua-src/so.aqua @@ -0,0 +1,6 @@ +service TestS("some-id"): + t: string -> string + +func doStuff(c: bool): + if c: + TestS.t("fr") \ No newline at end of file diff --git a/build.sbt b/build.sbt index ba56d375..d71b239d 100644 --- a/build.sbt +++ b/build.sbt @@ -1,28 +1,27 @@ -val dottyVersion = "3.0.2" +val dottyVersion = "3.1.0" scalaVersion := dottyVersion val baseAquaVersion = settingKey[String]("base aqua version") -val catsV = "2.6.1" -val catsParseV = "0.3.5" +val catsV = "2.7.0" +val catsParseV = "0.3.6" val monocleV = "3.0.0-M6" -val scalaTestV = "3.2.9" -val fs2V = "3.1.0" -val catsEffectV = "3.2.1" -val log4catsV = "2.1.1" -val slf4jV = "1.7.30" -val declineV = "2.1.0" +val scalaTestV = "3.2.10" +val fs2V = "3.2.3" +val catsEffectV = "3.3.1" +val declineV = "2.2.0" val circeVersion = "0.14.1" +val scribeV = "3.6.3" name := "aqua-hll" val commons = Seq( - baseAquaVersion := "0.5.1", + baseAquaVersion := "0.5.2", version := baseAquaVersion.value + "-" + sys.env.getOrElse("BUILD_NUMBER", "SNAPSHOT"), scalaVersion := dottyVersion, libraryDependencies ++= Seq( - "com.outr" %%% "scribe" % "3.5.5", + "com.outr" %%% "scribe" % scribeV, "org.scalatest" %%% "scalatest" % scalaTestV % Test ), scalacOptions ++= { diff --git a/cli/.jvm/src/main/scala/aqua/Test.scala b/cli/.jvm/src/main/scala/aqua/Test.scala index fcb54f88..351229e8 100644 --- a/cli/.jvm/src/main/scala/aqua/Test.scala +++ b/cli/.jvm/src/main/scala/aqua/Test.scala @@ -22,7 +22,7 @@ object Test extends IOApp.Simple { start <- IO(System.currentTimeMillis()) _ <- AquaPathCompiler .compileFilesTo[IO]( - Path("./aqua-src/parfold.aqua"), + Path("./aqua-src/nopingback.aqua"), List(Path("./aqua")), Option(Path("./target")), TypeScriptBackend, diff --git a/model/src/main/scala/aqua/model/func/raw/RawTag.scala b/model/src/main/scala/aqua/model/func/raw/RawTag.scala index af2f9ca9..6830c0cf 100644 --- a/model/src/main/scala/aqua/model/func/raw/RawTag.scala +++ b/model/src/main/scala/aqua/model/func/raw/RawTag.scala @@ -1,11 +1,14 @@ package aqua.model.func.raw import aqua.model.ValueModel +import aqua.model.ValueModel.varName import aqua.model.func.{Call, FuncModel} import cats.data.NonEmptyList import cats.data.Chain sealed trait RawTag { + // What variable names this tag uses (children are not respected) + def usesVarNames: Set[String] = Set.empty def mapValues(f: ValueModel => ValueModel): RawTag = this match { case OnTag(peerId, via) => OnTag(f(peerId), via.map(f)) @@ -58,7 +61,9 @@ sealed trait RawTag { sealed trait NoExecTag extends RawTag sealed trait GroupTag extends RawTag + sealed trait SeqGroupTag extends GroupTag + sealed trait ParGroupTag extends GroupTag case object SeqTag extends SeqGroupTag @@ -67,40 +72,69 @@ case object ParTag extends ParGroupTag { case object Detach extends ParGroupTag } -case object XorTag extends SeqGroupTag { - case object LeftBiased extends SeqGroupTag +case object XorTag extends GroupTag { + case object LeftBiased extends GroupTag +} + +case class XorParTag(xor: FuncOp, par: FuncOp) extends RawTag { + // Collect all the used variable names + override def usesVarNames: Set[String] = xor.usesVarNames.value ++ par.usesVarNames.value } -case class XorParTag(xor: FuncOp, par: FuncOp) extends RawTag case class OnTag(peerId: ValueModel, via: Chain[ValueModel]) extends SeqGroupTag { + override def usesVarNames: Set[String] = + ValueModel.varName(peerId).toSet ++ via.iterator.flatMap(ValueModel.varName) + override def toString: String = s"(on $peerId${if (via.nonEmpty) " via " + via.toList.mkString(" via ") else ""})" } -case class NextTag(item: String) extends RawTag -case class RestrictionTag(name: String, isStream: Boolean) extends SeqGroupTag + +case class NextTag(item: String) extends RawTag { + override def usesVarNames: Set[String] = Set(item) +} + +case class RestrictionTag(name: String, isStream: Boolean) extends SeqGroupTag { + override def usesVarNames: Set[String] = Set(name) +} case class MatchMismatchTag(left: ValueModel, right: ValueModel, shouldMatch: Boolean) - extends SeqGroupTag -case class ForTag(item: String, iterable: ValueModel) extends SeqGroupTag + extends SeqGroupTag { + + override def usesVarNames: Set[String] = + ValueModel.varName(left).toSet ++ ValueModel.varName(right).toSet +} + +case class ForTag(item: String, iterable: ValueModel) extends SeqGroupTag { + override def usesVarNames: Set[String] = Set(item) ++ ValueModel.varName(iterable) +} case class CallArrowTag( funcName: String, call: Call -) extends RawTag +) extends RawTag { + override def usesVarNames: Set[String] = call.argVarNames +} case class DeclareStreamTag( value: ValueModel -) extends NoExecTag +) extends NoExecTag { + override def usesVarNames: Set[String] = ValueModel.varName(value).toSet +} case class AssignmentTag( value: ValueModel, assignTo: String -) extends NoExecTag +) extends NoExecTag { + override def usesVarNames: Set[String] = Set(assignTo) ++ ValueModel.varName(value) +} case class ClosureTag( func: FuncModel -) extends NoExecTag +) extends NoExecTag { + // TODO captured names are lost? + override def usesVarNames: Set[String] = Set(func.name) +} case class ReturnTag( values: NonEmptyList[ValueModel] @@ -118,13 +152,20 @@ case class CallServiceTag( funcName: String, call: Call ) extends RawTag { + + override def usesVarNames: Set[String] = ValueModel.varName(serviceId).toSet ++ call.argVarNames + override def toString: String = s"(call _ ($serviceId $funcName) $call)" } case class PushToStreamTag(operand: ValueModel, exportTo: Call.Export) extends RawTag { + override def usesVarNames: Set[String] = ValueModel.varName(operand).toSet + override def toString: String = s"(push $operand $exportTo)" } case class CanonicalizeTag(operand: ValueModel, exportTo: Call.Export) extends RawTag { + override def usesVarNames: Set[String] = ValueModel.varName(operand).toSet + override def toString: String = s"(can $operand $exportTo)" } diff --git a/model/test-kit/src/main/scala/aqua/Node.scala b/model/test-kit/src/main/scala/aqua/Node.scala index e81dafe8..2f03bbe6 100644 --- a/model/test-kit/src/main/scala/aqua/Node.scala +++ b/model/test-kit/src/main/scala/aqua/Node.scala @@ -148,6 +148,12 @@ object Node { ) } + def co(body: Raw*) = + Node( + ParTag.Detach, + body.toList + ) + def on(peer: ValueModel, via: List[ValueModel], body: Raw*) = Node( OnTag(peer, Chain.fromSeq(via)), diff --git a/model/test-kit/src/test/scala/aqua/model/transform/TransformSpec.scala b/model/test-kit/src/test/scala/aqua/model/transform/TransformSpec.scala index 51a07a57..64725fa4 100644 --- a/model/test-kit/src/test/scala/aqua/model/transform/TransformSpec.scala +++ b/model/test-kit/src/test/scala/aqua/model/transform/TransformSpec.scala @@ -42,16 +42,17 @@ class TransformSpec extends AnyFlatSpec with Matchers { through(relayV), through(otherRelay), MakeRes.xor( - callRes(1, otherPeer), + MakeRes.seq( + callRes(1, otherPeer), + through(otherRelay), + through(relayV) + ), MakeRes.seq( through(otherRelay), through(relayV), - errorCall(bc, 1, initPeer), - through(relayV) + errorCall(bc, 1, initPeer) ) ), - through(otherRelay), - through(relayV), MakeRes.xor( respCall(bc, ret, initPeer), errorCall(bc, 2, initPeer) diff --git a/model/test-kit/src/test/scala/aqua/model/transform/topology/RawCursorSpec.scala b/model/test-kit/src/test/scala/aqua/model/transform/topology/RawCursorSpec.scala index d26ae0c8..3034176d 100644 --- a/model/test-kit/src/test/scala/aqua/model/transform/topology/RawCursorSpec.scala +++ b/model/test-kit/src/test/scala/aqua/model/transform/topology/RawCursorSpec.scala @@ -26,7 +26,7 @@ class RawCursorSpec extends AnyFlatSpec with Matchers { ) ) - raw.firstExecuted shouldBe raw.lastExecuted + //raw.firstExecuted shouldBe raw.lastExecuted } "simple raw cursor with multiple calls" should "move on seqs" in { @@ -47,10 +47,10 @@ class RawCursorSpec extends AnyFlatSpec with Matchers { ) ) - raw.lastExecuted shouldBe raw.firstExecuted.get.seqNext.get.seqNext.get.seqNext - raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext - raw.lastExecuted.get.seqPrev.get.seqPrev shouldBe raw.firstExecuted.get.seqNext - raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext +// raw.lastExecuted shouldBe raw.firstExecuted.get.seqNext.get.seqNext.get.seqNext +// raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext +// raw.lastExecuted.get.seqPrev.get.seqPrev shouldBe raw.firstExecuted.get.seqNext +// raw.lastExecuted.get.seqPrev shouldBe raw.firstExecuted.get.seqNext.get.seqNext } "simple raw cursor on init_peer_id via relay" should "move properly" in { @@ -66,7 +66,7 @@ class RawCursorSpec extends AnyFlatSpec with Matchers { ) ) - raw.firstExecuted shouldBe raw.lastExecuted + //raw.firstExecuted shouldBe raw.lastExecuted } "raw cursor" should "move properly" in { @@ -105,29 +105,29 @@ class RawCursorSpec extends AnyFlatSpec with Matchers { raw.tag should be( OnTag(LiteralModel.initPeerId, Chain.one(VarModel("-relay-", ScalarType.string))) ) - raw.firstExecuted.map(_.tag) should be( - Some( - callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head - ) - ) - raw.lastExecuted.map(_.tag) should be( - Some( - callService( - LiteralModel.quote("return"), - "fn", - Call(VarModel("export", ScalarType.string) :: Nil, Nil) - ).tree.head - ) - ) - raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be( - Some( - callService( - LiteralModel.quote("calledInside"), - "fn", - Call(Nil, Call.Export("export", ScalarType.string) :: Nil) - ).tree.head - ) - ) +// raw.firstExecuted.map(_.tag) should be( +// Some( +// callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head +// ) +// ) +// raw.lastExecuted.map(_.tag) should be( +// Some( +// callService( +// LiteralModel.quote("return"), +// "fn", +// Call(VarModel("export", ScalarType.string) :: Nil, Nil) +// ).tree.head +// ) +// ) +// raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be( +// Some( +// callService( +// LiteralModel.quote("calledInside"), +// "fn", +// Call(Nil, Call.Export("export", ScalarType.string) :: Nil) +// ).tree.head +// ) +// ) } @@ -172,48 +172,48 @@ class RawCursorSpec extends AnyFlatSpec with Matchers { raw.tag should be( OnTag(LiteralModel.initPeerId, Chain.one(VarModel("-relay-", ScalarType.string))) ) - raw.firstExecuted.map(_.tag) should be( - Some( - callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head - ) - ) - raw.lastExecuted.map(_.tag) should be( - Some( - callService( - LiteralModel.quote("return"), - "fn", - Call(VarModel("export", ScalarType.string) :: Nil, Nil) - ).tree.head - ) - ) - raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be( - Some( - callService( - LiteralModel.quote("calledInside"), - "fn", - Call(Nil, Call.Export("export", ScalarType.string) :: Nil) - ).tree.head - ) - ) - raw.lastExecuted.flatMap(_.seqPrev).map(_.pathOn).get should be( - OnTag( - VarModel("-in-fold-", ScalarType.string), - Chain.one(VarModel("-fold-relay-", ScalarType.string)) - ) :: OnTag( - VarModel("-other-", ScalarType.string), - Chain.one(VarModel("-external-", ScalarType.string)) - ) :: OnTag( - LiteralModel.initPeerId, - Chain.one(VarModel("-relay-", ScalarType.string)) - ) :: Nil - ) - raw.lastExecuted.map(_.pathFromPrev).get should be( - Chain( - VarModel("-fold-relay-", ScalarType.string), - VarModel("-external-", ScalarType.string), - VarModel("-relay-", ScalarType.string) - ) - ) +// raw.firstExecuted.map(_.tag) should be( +// Some( +// callService(LiteralModel.quote("calledOutside"), "fn", Call(Nil, Nil)).tree.head +// ) +// ) +// raw.lastExecuted.map(_.tag) should be( +// Some( +// callService( +// LiteralModel.quote("return"), +// "fn", +// Call(VarModel("export", ScalarType.string) :: Nil, Nil) +// ).tree.head +// ) +// ) +// raw.lastExecuted.flatMap(_.seqPrev).flatMap(_.lastExecuted).map(_.tag) should be( +// Some( +// callService( +// LiteralModel.quote("calledInside"), +// "fn", +// Call(Nil, Call.Export("export", ScalarType.string) :: Nil) +// ).tree.head +// ) +// ) +// raw.lastExecuted.flatMap(_.seqPrev).map(_.topology.pathOn).get should be( +// OnTag( +// VarModel("-in-fold-", ScalarType.string), +// Chain.one(VarModel("-fold-relay-", ScalarType.string)) +// ) :: OnTag( +// VarModel("-other-", ScalarType.string), +// Chain.one(VarModel("-external-", ScalarType.string)) +// ) :: OnTag( +// LiteralModel.initPeerId, +// Chain.one(VarModel("-relay-", ScalarType.string)) +// ) :: Nil +// ) +// raw.lastExecuted.map(_.topology.pathBefore).get should be( +// Chain( +// VarModel("-fold-relay-", ScalarType.string), +// VarModel("-external-", ScalarType.string), +// VarModel("-relay-", ScalarType.string) +// ) +// ) } } diff --git a/model/test-kit/src/test/scala/aqua/model/transform/topology/TopologySpec.scala b/model/test-kit/src/test/scala/aqua/model/transform/topology/TopologySpec.scala index 5d2ed07b..8c9bae83 100644 --- a/model/test-kit/src/test/scala/aqua/model/transform/topology/TopologySpec.scala +++ b/model/test-kit/src/test/scala/aqua/model/transform/topology/TopologySpec.scala @@ -4,7 +4,7 @@ import aqua.Node import aqua.model.VarModel import aqua.model.func.Call import aqua.model.func.raw.FuncOps -import aqua.model.transform.res.{MakeRes, ResolvedOp, XorRes} +import aqua.model.transform.res.{MakeRes, ResolvedOp, SeqRes, XorRes} import aqua.types.ScalarType import cats.Eval import cats.data.Chain @@ -13,6 +13,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers class TopologySpec extends AnyFlatSpec with Matchers { + import Node._ "topology resolver" should "do nothing on init peer" in { @@ -440,7 +441,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { // this example doesn't create a hop on relay after fold // but the test create it, so there is not a one-on-one simulation // change it or write an integration test - "topology resolver" should "create returning hops on chain of 'on'" ignore { + "topology resolver" should "create returning hops on chain of 'on'" in { val init = on( initPeer, @@ -470,14 +471,16 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected: Node.Res = MakeRes.seq( - callRes(0, initPeer), - callRes(1, otherRelay) + through(relay), + callRes(0, otherPeer), + MakeRes.fold("i", valueArray, MakeRes.par(callRes(2, otherPeer2), nextRes("i"))), + through(relay), + callRes(3, initPeer) ) proc.equalsOrPrintDiff(expected) should be(true) } - // This behavior is correct, but as two seqs are not flattened, it's a question how to make the matching result structure - "topology resolver" should "create returning hops on nested 'on'" ignore { + "topology resolver" should "create returning hops on nested 'on'" in { val init = on( initPeer, @@ -505,17 +508,16 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected: Node.Res = MakeRes.seq( callRes(0, initPeer), - callRes(1, otherRelay), + through(relay), + callRes(1, otherPeer), + through(otherRelay2), MakeRes.fold( "i", valueArray, - MakeRes.seq( - through(otherRelay2), - callRes(2, otherPeer2), - through(otherRelay2), - nextRes("i") - ) + callRes(2, otherPeer2), + nextRes("i") ), + through(otherRelay2), through(relay), callRes(3, initPeer) ) @@ -524,7 +526,7 @@ class TopologySpec extends AnyFlatSpec with Matchers { } // https://github.com/fluencelabs/aqua/issues/205 - "topology resolver" should "optimize path over fold" ignore { + "topology resolver" should "optimize path over fold" in { val i = VarModel("i", ScalarType.string) val init = on( @@ -546,12 +548,14 @@ class TopologySpec extends AnyFlatSpec with Matchers { val expected: Node.Res = MakeRes.seq( through(relay), - through(otherRelay), MakeRes.fold( "i", valueArray, MakeRes.seq( - callRes(1, i), + through(otherRelay), + callRes(1, i) + ), + MakeRes.seq( through(otherRelay), nextRes("i") ) @@ -561,4 +565,92 @@ class TopologySpec extends AnyFlatSpec with Matchers { proc.equalsOrPrintDiff(expected) should be(true) } + "topology resolver" should "handle detach" in { + val init = + on( + initPeer, + relay :: Nil, + co(on(otherPeer, Nil, callTag(1, Call.Export(varNode.name, varNode.`type`) :: Nil))), + callTag(2, Nil, varNode :: Nil) + ) + + val proc = Topology.resolve(init) + + val expected: Node.Res = + MakeRes.seq( + through(relay), + MakeRes.par( + MakeRes.seq( + callRes(1, otherPeer, Some(Call.Export(varNode.name, varNode.`type`))), + through(relay), + through(initPeer) // pingback + ) + ), + callRes(2, initPeer, None, varNode :: Nil) + ) + + proc.equalsOrPrintDiff(expected) should be(true) + } + + "topology resolver" should "handle moved detach" in { + val init = + on( + initPeer, + relay :: Nil, + on( + otherPeer2, + Nil, + co(on(otherPeer, Nil, callTag(1, Call.Export(varNode.name, varNode.`type`) :: Nil))), + callTag(2, Nil, varNode :: Nil) + ) + ) + + val proc = Topology.resolve(init) + + val expected: Node.Res = + MakeRes.seq( + through(relay), + MakeRes.par( + MakeRes.seq( + callRes(1, otherPeer, Some(Call.Export(varNode.name, varNode.`type`))), + through(otherPeer2) // pingback + ) + ), + callRes(2, otherPeer2, None, varNode :: Nil) + ) + + proc.equalsOrPrintDiff(expected) should be(true) + } + + "topology resolver" should "handle detach moved to relay" in { + val init = + on( + initPeer, + relay :: Nil, + on( + relay, + Nil, + co(on(otherPeer, Nil, callTag(1, Call.Export(varNode.name, varNode.`type`) :: Nil))) + ), + callTag(2, Nil, varNode :: Nil) + ) + + val proc = Topology.resolve(init) + + val expected: Node.Res = + MakeRes.seq( + through(relay), + MakeRes.par( + MakeRes.seq( + callRes(1, otherPeer, Some(Call.Export(varNode.name, varNode.`type`))), + through(relay), // pingback + through(initPeer) // pingback + ) + ), + callRes(2, initPeer, None, varNode :: Nil) + ) + + proc.equalsOrPrintDiff(expected) should be(true) + } + } diff --git a/model/transform/img.png b/model/transform/img.png new file mode 100644 index 00000000..11f4072a Binary files /dev/null and b/model/transform/img.png differ diff --git a/model/transform/readme.md b/model/transform/readme.md new file mode 100644 index 00000000..0c5a7c58 --- /dev/null +++ b/model/transform/readme.md @@ -0,0 +1,33 @@ +# Topology transformations + +Beware cycles! + +![img.png](img.png) + +- Before: where execution happened before entering into this tag +- Begin: where execution is expected to be in the beginning of this tag + +Usually tags handle their beforePaths: inject pathway from location Before to location Begin, to ensure that execution can get there. + +- End: where execution is expected to end when all the children of the tag are executed +- After: where execution flow should go after this tag is handled + +Usually Before == previous End or parent's Begin, and After == next Begin or parent's After. + +- Finally: either After, if tag takes care of getting from its internal scope to the outer scope, or End if it doesn't + +Usually tags do not care about the afterPath and inject nothing. But in some cases this path is super necessary, e.g. returning from the Par branch must be done within that Par branch, as doing it later is too late. + +| Tag | Before | Begin | End | After | Finally | Force Exit | +|------------|-------------------------|--------------------|--------------------|-----------------------------|-----------------------------------|------------------| +| Default | parent.begin OR _path_ | _path_ | **<-** begin | **<-** ends | force ? **<-** after: **<-** ends | _false_ | +| seq | - | - | lastChild.finally | - | - | - | +| seq/* | prev.finally OR default | - | - | next.begin OR parent.after | - | - | +| xor/*:0 | - | - | - | parent.after | - | hasExecLater | +| xor/*:1 | prev.ends | - | - | parent.after | - | hasExecLater | +| xor | - | - | lastChild.finally | - | - | - | +| par/* | - | - | **<-** before | parent.after | - | exportsUsedLater | +| for | - | fc.begins(until i) | - | - | - | - | +| noExec | - | - | **<-** begin | - | - | - | + + diff --git a/model/transform/src/main/scala/aqua/model/transform/cursor/ChainCursor.scala b/model/transform/src/main/scala/aqua/model/transform/cursor/ChainCursor.scala index 8700bfa3..53a28095 100644 --- a/model/transform/src/main/scala/aqua/model/transform/cursor/ChainCursor.scala +++ b/model/transform/src/main/scala/aqua/model/transform/cursor/ChainCursor.scala @@ -12,7 +12,7 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi val tree: NonEmptyList[ChainZipper[T]] // Parent element, if not at root - def parent: Option[T] = tree.tail.headOption.map(_.current) + def parent: Option[T] = moveUp.map(_.current) // The closest element def current: T = tree.head.current @@ -34,13 +34,14 @@ abstract class ChainCursor[C <: ChainCursor[C, T], T](make: NonEmptyList[ChainZi ) // Path to this position: just drop siblings - def path: NonEmptyList[T] = tree.map(_.current) + lazy val path: NonEmptyList[T] = tree.map(_.current) // Move cursor up + // TODO: ensure this cursor's data is cached properly def moveUp: Option[C] = NonEmptyList.fromList(tree.tail).map(make) // Path to root, in form of Cursors; this is skipped - def pathToRoot: LazyList[C] = LazyList.unfold(this)(_.moveUp.map(c => c -> c)) + val pathToRoot: LazyList[C] = LazyList.unfold(this)(_.moveUp.map(c => c -> c)) // Move down: need a ChainZipper that's below def moveDown(focusOn: ChainZipper[T]): C = make(focusOn :: tree) diff --git a/model/transform/src/main/scala/aqua/model/transform/res/MakeRes.scala b/model/transform/src/main/scala/aqua/model/transform/res/MakeRes.scala index d0a69719..9b8e8aef 100644 --- a/model/transform/src/main/scala/aqua/model/transform/res/MakeRes.scala +++ b/model/transform/src/main/scala/aqua/model/transform/res/MakeRes.scala @@ -24,14 +24,17 @@ object MakeRes { def seq(first: Res, second: Res, more: Res*): Res = Cofree[Chain, ResolvedOp](SeqRes, Eval.later(first +: second +: Chain.fromSeq(more))) - def par(first: Res, second: Res, more: Res*): Res = - Cofree[Chain, ResolvedOp](ParRes, Eval.later(first +: second +: Chain.fromSeq(more))) + def par(first: Res, more: Res*): Res = + Cofree[Chain, ResolvedOp](ParRes, Eval.later(first +: Chain.fromSeq(more))) def xor(first: Res, second: Res): Res = Cofree[Chain, ResolvedOp](XorRes, Eval.later(Chain(first, second))) - def fold(item: String, iter: ValueModel, body: Res): Res = - Cofree[Chain, ResolvedOp](FoldRes(item, iter), Eval.now(Chain.one(body))) + def fold(item: String, iter: ValueModel, body0: Res, body: Res*): Res = + Cofree[Chain, ResolvedOp]( + FoldRes(item, iter), + Eval.now(Chain.one(body0) ++ Chain.fromSeq(body)) + ) def noop(onPeer: ValueModel): Res = leaf(CallServiceRes(LiteralModel.quote("op"), "noop", CallRes(Nil, None), onPeer)) diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala b/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala index 119b6ad6..458abbd2 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/PathFinder.scala @@ -10,53 +10,19 @@ import scala.annotation.tailrec object PathFinder extends Logging { - def find(from: RawCursor, to: RawCursor, isExit: Boolean = false): Chain[ValueModel] = { - - val fromOn = Chain.fromSeq(from.pathOn).reverse - val toOn = Chain.fromSeq(to.pathOn).reverse - - val wasHandled = - !isExit && - to.leftSiblings.isEmpty && - to.moveUp.exists(_.pathOn == to.pathOn) && - !to.parentTag.exists(_.isInstanceOf[ParGroupTag]) - - if (wasHandled) { - logger.trace("Was handled") - logger.trace(" :: " + from) - logger.trace(" -> " + to) - Chain.empty - } else { - logger.trace("Find path") - logger.trace(" :: " + from) - logger.trace(" -> " + to) - findPath( - fromOn, - toOn, - from.currentPeerId, - to.currentPeerId - ) - } - } - - def optimizePath( - peerIds: Chain[ValueModel], - prefix: Chain[ValueModel], - suffix: Chain[ValueModel] - ): Chain[ValueModel] = { - val optimized = peerIds - .foldLeft(Chain.empty[ValueModel]) { - case (acc, p) if acc.lastOption.contains(p) => acc - case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p - case (acc, p) => acc :+ p - } - logger.trace(s"PEER IDS: $optimized") - logger.trace(s"PREFIX: $prefix") - logger.trace(s"SUFFIX: $suffix") - logger.trace(s"OPTIMIZED WITH PREFIX AND SUFFIX: $optimized") - val noPrefix = skipPrefix(optimized, prefix, optimized) - skipSuffix(noPrefix, suffix, noPrefix) - } + /** + * Finds the path – chain of peers to visit to get from [[fromOn]] to [[toOn]] + * @param fromOn Previous location + * @param toOn Next location + * @return Chain of peers to visit in between + */ + def findPath(fromOn: List[OnTag], toOn: List[OnTag]): Chain[ValueModel] = + findPath( + Chain.fromSeq(fromOn).reverse, + Chain.fromSeq(toOn).reverse, + fromOn.headOption.map(_.peerId), + toOn.headOption.map(_.peerId) + ) def findPath( fromOn: Chain[OnTag], @@ -89,6 +55,33 @@ object PathFinder extends Logging { optimized } + /** + * Removes cycles from the path + * + * @param peerIds peers to walk trough + * @param prefix getting from the previous peer + * @param suffix getting to the next peer + * @return optimal path with no duplicates + */ + def optimizePath( + peerIds: Chain[ValueModel], + prefix: Chain[ValueModel], + suffix: Chain[ValueModel] + ): Chain[ValueModel] = { + val optimized = peerIds + .foldLeft(Chain.empty[ValueModel]) { + case (acc, p) if acc.lastOption.contains(p) => acc + case (acc, p) if acc.contains(p) => acc.takeWhile(_ != p) :+ p + case (acc, p) => acc :+ p + } + logger.trace(s"PEER IDS: $optimized") + logger.trace(s"PREFIX: $prefix") + logger.trace(s"SUFFIX: $suffix") + logger.trace(s"OPTIMIZED WITH PREFIX AND SUFFIX: $optimized") + val noPrefix = skipPrefix(optimized, prefix, optimized) + skipSuffix(noPrefix, suffix, noPrefix) + } + @tailrec def skipPrefix[T](chain: Chain[T], prefix: Chain[T], init: Chain[T]): Chain[T] = (chain, prefix) match { diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/RawCursor.scala b/model/transform/src/main/scala/aqua/model/transform/topology/RawCursor.scala index a25ceb0f..2d38625d 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/RawCursor.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/RawCursor.scala @@ -2,21 +2,36 @@ package aqua.model.transform.topology import aqua.model.ValueModel import aqua.model.func.raw.* +import aqua.model.func.raw.FuncOp.Tree import cats.Eval import cats.data.{Chain, NonEmptyList, OptionT} -import aqua.model.transform.cursor._ -import cats.syntax.traverse._ +import aqua.model.transform.cursor.* +import cats.syntax.traverse.* import cats.free.Cofree import scribe.Logging // Can be heavily optimized by caching parent cursors, not just list of zippers -case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]]) - extends ChainCursor[RawCursor, FuncOp.Tree](RawCursor.apply) with Logging { +case class RawCursor( + tree: NonEmptyList[ChainZipper[FuncOp.Tree]], + cachedParent: Option[RawCursor] = None +) extends ChainCursor[RawCursor, FuncOp.Tree](RawCursor.apply(_, None)) with Logging { + + override def moveUp: Option[RawCursor] = cachedParent.orElse(super.moveUp) + + override lazy val toPrevSibling: Option[RawCursor] = + super.toPrevSibling.map(_.copy(cachedParent = cachedParent)) + + override lazy val toNextSibling: Option[RawCursor] = + super.toNextSibling.map(_.copy(cachedParent = cachedParent)) + + override def moveDown(focusOn: ChainZipper[Tree]): RawCursor = + super.moveDown(focusOn).copy(cachedParent = Some(this)) def tag: RawTag = current.head + def parentTag: Option[RawTag] = parent.map(_.head) - def hasChildren: Boolean = + lazy val hasChildren: Boolean = current.tailForced.nonEmpty lazy val toFirstChild: Option[RawCursor] = @@ -25,61 +40,30 @@ case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]]) lazy val toLastChild: Option[RawCursor] = ChainZipper.last(current.tail.value).map(moveDown) + lazy val children: LazyList[RawCursor] = + LazyList.unfold(toFirstChild)(_.map(c => c -> c.toNextSibling)) + + def findInside(f: RawCursor => Boolean): LazyList[RawCursor] = + children.flatMap(_.findInside(f)).prependedAll(Option.when(f(this))(this)) + + lazy val topology: Topology = Topology.make(this) + lazy val tagsPath: NonEmptyList[RawTag] = path.map(_.head) - lazy val pathOn: List[OnTag] = tagsPath.collect { case o: OnTag => - o - } - - // Assume that the very first tag is `on` tag - lazy val rootOn: Option[RawCursor] = moveUp - .flatMap(_.rootOn) - .orElse(tag match { - case _: OnTag => - Some(this) - case _ => None - }) - - // The closest peerId - lazy val currentPeerId: Option[ValueModel] = - pathOn.headOption.map(_.peerId) - - // Cursor to the last sequentially executed operation, if any - lazy val lastExecuted: Option[RawCursor] = tag match { - case XorTag => toFirstChild.flatMap(_.lastExecuted) - case _: SeqGroupTag => toLastChild.flatMap(_.lastExecuted) - case _: ParGroupTag => - None // ParGroup builds exit path within itself; there's no "lastExecuted", they are many - case _: NoExecTag => moveLeft.flatMap(_.lastExecuted) - case _ => Some(this) - } - - lazy val firstExecuted: Option[RawCursor] = tag match { - case _: SeqGroupTag => toFirstChild.flatMap(_.firstExecuted) - case _: ParGroupTag => - None // As many branches are executed simultaneously, no definition of first - case _: NoExecTag => moveRight.flatMap(_.firstExecuted) - case _ => Some(this) - } - - /** - * Sequentially previous cursor - * @return - */ - lazy val seqPrev: Option[RawCursor] = - parentTag.flatMap { - case p: SeqGroupTag if leftSiblings.nonEmpty => - toPrevSibling.flatMap(c => c.lastExecuted orElse c.seqPrev) - case p => - moveUp.flatMap(_.seqPrev) + // Whether the current branch contains any AIR-executable code or not + lazy val isNoExec: Boolean = + tag match { + case _: NoExecTag => true + case _: GroupTag => children.forall(_.isNoExec) + case _ => false } - lazy val seqNext: Option[RawCursor] = - parentTag.flatMap { - case _: SeqGroupTag if rightSiblings.nonEmpty => - toNextSibling.flatMap(c => c.firstExecuted orElse c.seqNext) - case _ => moveUp.flatMap(_.seqNext) - } + def hasExecLater: Boolean = + !allToRight.forall(_.isNoExec) + + // Whether variables exported from this branch are used later in the code or not + def exportsUsedLater: Boolean = + FuncOp(current).exportsVarNames.map(ns => ns.nonEmpty && checkNamesUsedLater(ns)).value // TODO write a test def checkNamesUsedLater(names: Set[String]): Boolean = @@ -88,50 +72,6 @@ case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]]) .map(FuncOp(_)) .exists(_.usesVarNames.value.intersect(names).nonEmpty) - lazy val pathFromPrev: Chain[ValueModel] = pathFromPrevD() - - def pathFromPrevD(forExit: Boolean = false): Chain[ValueModel] = - parentTag.fold(Chain.empty[ValueModel]) { - case _: GroupTag => - seqPrev - .orElse(rootOn) - .fold(Chain.empty[ValueModel])(PathFinder.find(_, this, isExit = forExit)) - case _ => - Chain.empty - } - - lazy val pathToNext: Chain[ValueModel] = parentTag.fold(Chain.empty[ValueModel]) { - case _: ParGroupTag => - val exports = FuncOp(current).exportsVarNames.value - if (exports.nonEmpty && checkNamesUsedLater(exports)) - seqNext.fold(Chain.empty[ValueModel])(nxt => - PathFinder.find(this, nxt) ++ - // we need to "wake" the target peer to enable join behaviour - Chain.fromOption(nxt.currentPeerId) - ) - else Chain.empty - case XorTag if leftSiblings.nonEmpty => - lastExecuted - .flatMap(le => - seqNext - .map(nxt => PathFinder.find(le, nxt, isExit = true) -> nxt) - .flatMap { - case (path, nxt) if path.isEmpty && currentPeerId == nxt.currentPeerId => - nxt.pathFromPrevD(true).reverse.initLast.map(_._1) - case (path, nxt) => - path.initLast.map { - case (init, last) - if nxt.pathFromPrevD(forExit = true).headOption.contains(last) => - init - case (init, last) => init :+ last - } - } - ) - .getOrElse(Chain.empty) - case _ => - Chain.empty - } - def cata[A](wrap: ChainZipper[Cofree[Chain, A]] => Chain[Cofree[Chain, A]])( folder: RawCursor => OptionT[Eval, ChainZipper[Cofree[Chain, A]]] ): Eval[Chain[Cofree[Chain, A]]] = @@ -141,7 +81,9 @@ case class RawCursor(tree: NonEmptyList[ChainZipper[FuncOp.Tree]]) toFirstChild .map(folderCursor => LazyList - .unfold(folderCursor) { _.toNextSibling.map(cursor => cursor -> cursor) } + .unfold(folderCursor) { + _.toNextSibling.map(cursor => cursor -> cursor) + } .prepended(folderCursor) ) .getOrElse(LazyList.empty) diff --git a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala index 3006d619..431ce1bd 100644 --- a/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala +++ b/model/transform/src/main/scala/aqua/model/transform/topology/Topology.scala @@ -1,23 +1,401 @@ package aqua.model.transform.topology +import aqua.model.ValueModel.varName import aqua.model.transform.cursor.ChainZipper import aqua.model.func.raw.* import aqua.model.transform.res.* import aqua.model.{LiteralModel, ValueModel, VarModel} import aqua.types.{BoxType, ScalarType} import cats.Eval -import cats.data.Chain.nil +import cats.data.Chain.{==:, nil} import cats.data.{Chain, NonEmptyChain, NonEmptyList, OptionT} import cats.free.Cofree import cats.syntax.traverse.* +import cats.syntax.apply.* import scribe.Logging +/** + * Wraps all the logic for topology reasoning about the tag in the AST represented by the [[cursor]] + * + * @param cursor Pointer to the current place in the AST + * @param before Strategy of calculating where the previous executions happened + * @param begins Strategy of calculating where execution of this tag/node should begin + * @param ends Strategy of calculating where execution of this tag/node happens + * @param after Strategy of calculating where the next execution should happen and whether we need to move there or not + */ +case class Topology private ( + cursor: RawCursor, + before: Topology.Before, + begins: Topology.Begins, + ends: Topology.Ends, + after: Topology.After +) { + + val pathOn: Eval[List[OnTag]] = Eval + .later(cursor.tagsPath.collect { case o: OnTag => + o + }) + .memoize + + lazy val firstExecutesOn: Eval[Option[List[OnTag]]] = + (cursor.tag match { + case _: CallServiceTag => pathOn.map(Some(_)) + case _ => + children + .map(_.firstExecutesOn) + .scanLeft[Eval[Option[List[OnTag]]]](Eval.now(None)) { case (acc, el) => + (acc, el).mapN(_ orElse _) + } + .collectFirst { + case e if e.value.isDefined => e + } + .getOrElse(Eval.now(None)) + }).memoize + + lazy val lastExecutesOn: Eval[Option[List[OnTag]]] = + (cursor.tag match { + case _: CallServiceTag => pathOn.map(Some(_)) + case _ => + children + .map(_.lastExecutesOn) + .scanRight[Eval[Option[List[OnTag]]]](Eval.now(None)) { case (acc, el) => + (acc, el).mapN(_ orElse _) + } + .collectFirst { + case e if e.value.isDefined => e + } + .getOrElse(Eval.now(None)) + }).memoize + + lazy val currentPeerId: Option[ValueModel] = pathOn.value.headOption.map(_.peerId) + + lazy val prevSibling: Option[Topology] = cursor.toPrevSibling.map(_.topology) + + lazy val nextSibling: Option[Topology] = cursor.toNextSibling.map(_.topology) + + lazy val firstChild: Option[Topology] = cursor.toFirstChild.map(_.topology) + + lazy val lastChild: Option[Topology] = cursor.toLastChild.map(_.topology) + + lazy val children: LazyList[Topology] = cursor.children.map(_.topology) + + def findInside(f: Topology => Boolean): LazyList[Topology] = + children.flatMap(_.findInside(f)).prependedAll(Option.when(f(this))(this)) + + val parent: Option[Topology] = cursor.moveUp.map(_.topology) + + val parents: LazyList[Topology] = + LazyList.unfold(parent)(p => p.map(pp => pp -> pp.parent)) + + lazy val forTag: Option[ForTag] = Option(cursor.tag).collect { case ft: ForTag => + ft + } + + lazy val isForTag: Boolean = forTag.isDefined + + // Before the left boundary of this element, what was the scope + lazy val beforeOn: Eval[List[OnTag]] = before.beforeOn(this).memoize + + // Inside the left boundary of this element, what should be the scope + lazy val beginsOn: Eval[List[OnTag]] = begins.beginsOn(this).memoize + + // After this element is done, what is the scope + lazy val endsOn: Eval[List[OnTag]] = ends.endsOn(this).memoize + + // After this element is done, where should it move to prepare for the next one + lazy val afterOn: Eval[List[OnTag]] = after.afterOn(this).memoize + + // Usually we don't care about exiting from where this tag ends into the outer scope + // But for some cases, like par branches, its necessary, so the exit can be forced + lazy val forceExit: Eval[Boolean] = after.forceExit(this).memoize + + // Where we finally are, after exit enforcement is applied + lazy val finallyOn: Eval[List[OnTag]] = after.finallyOn(this).memoize + + lazy val pathBefore: Eval[Chain[ValueModel]] = begins.pathBefore(this).memoize + + lazy val pathAfter: Eval[Chain[ValueModel]] = after.pathAfter(this).memoize +} + object Topology extends Logging { type Tree = Cofree[Chain, RawTag] type Res = Cofree[Chain, ResolvedOp] - def resolve(op: Tree): Res = { - val resolved = resolveOnMoves(op).value + // Returns a peerId to go to in case it equals the last relay: useful when we do execution on the relay + private def findRelayPathEnforcement(bef: List[OnTag], beg: List[OnTag]): Chain[ValueModel] = + Chain.fromOption( + beg.headOption + .map(_.peerId) + .filter(lastPeerId => beg.tail.headOption.exists(_.via.lastOption.contains(lastPeerId))) + .filter(lastPeerId => !bef.headOption.exists(_.peerId == lastPeerId)) + ) + + trait Before { + + def beforeOn(current: Topology): Eval[List[OnTag]] = + // Go to the parent, see where it begins + current.parent.map(_.beginsOn) getOrElse + // This means, we have no parent; then we're where we should be + current.pathOn + } + + trait Begins { + def beginsOn(current: Topology): Eval[List[OnTag]] = current.pathOn + + def pathBefore(current: Topology): Eval[Chain[ValueModel]] = + (current.beforeOn, current.beginsOn).mapN { case (bef, beg) => + (PathFinder.findPath(bef, beg), bef, beg) + }.flatMap { case (pb, bef, beg) => + // Handle the case when we need to go through the relay, but miss the hop as it's the first + // peer where we go, but there's no service calls there + current.firstExecutesOn.map { + case Some(where) if where != beg => + pb ++ findRelayPathEnforcement(bef, beg) + case _ => pb + } + } + } + + trait Ends { + + def endsOn(current: Topology): Eval[List[OnTag]] = + current.beginsOn + + protected def lastChildFinally(current: Topology): Eval[List[OnTag]] = + current.lastChild.map(lc => + lc.forceExit.flatMap { + case true => current.afterOn + case false => lc.endsOn + } + ) getOrElse current.beginsOn + } + + trait After { + def forceExit(current: Topology): Eval[Boolean] = Eval.now(false) + + def afterOn(current: Topology): Eval[List[OnTag]] = current.pathOn + + protected def afterParent(current: Topology): Eval[List[OnTag]] = + current.parent.map( + _.afterOn + ) getOrElse current.pathOn + + // In case exit is performed and pathAfter is inserted, we're actually where + // execution is expected to continue After this node is handled + final def finallyOn(current: Topology): Eval[List[OnTag]] = + current.forceExit.flatMap { + case true => current.afterOn + case false => current.endsOn + } + + // If exit is forced, make a path outside this node + // – from where it ends to where execution is expected to continue + def pathAfter(current: Topology): Eval[Chain[ValueModel]] = + current.forceExit.flatMap { + case true => + (current.endsOn, current.afterOn).mapN(PathFinder.findPath) + case false => + Eval.now(Chain.empty) + } + } + + object Default extends Before with Begins with Ends with After { + override def toString: String = "" + } + + // Parent == Seq, On + object SeqGroupBranch extends Before with After { + override def toString: String = "/*" + + // If parent is seq, then before this node we are where previous node, if any, ends + override def beforeOn(current: Topology): Eval[List[OnTag]] = + current.prevSibling + .map(_.finallyOn) getOrElse super.beforeOn(current) + + override def afterOn(current: Topology): Eval[List[OnTag]] = + current.nextSibling.map(_.beginsOn) getOrElse afterParent(current) + + } + + object SeqGroup extends Ends { + override def toString: String = "" + + override def endsOn(current: Topology): Eval[List[OnTag]] = + lastChildFinally(current) + } + + // Parent == Xor + object XorBranch extends Before with After { + override def toString: String = "/*" + + override def beforeOn(current: Topology): Eval[List[OnTag]] = + current.prevSibling.map(_.endsOn) getOrElse super.beforeOn(current) + + // TODO: if this xor is in par that needs no forceExit, do not exit + override def forceExit(current: Topology): Eval[Boolean] = + Eval.later(current.cursor.moveUp.exists(_.hasExecLater)) + + override def afterOn(current: Topology): Eval[List[OnTag]] = + afterParent(current) + } + + // Parent == Par + object ParGroupBranch extends Ends with After { + override def toString: String = "/*" + + override def forceExit(current: Topology): Eval[Boolean] = + Eval.later(current.cursor.exportsUsedLater) + + override def afterOn(current: Topology): Eval[List[OnTag]] = + afterParent(current) + + override def pathAfter(current: Topology): Eval[Chain[ValueModel]] = + current.forceExit + .flatMap[Chain[ValueModel]] { + case false => Eval.now(Chain.empty[ValueModel]) + case true => + (current.endsOn, current.afterOn, current.lastExecutesOn).mapN { + case (e, a, _) if e == a => Chain.empty[ValueModel] + case (e, a, l) if l.contains(e) => + // Pingback in case no relays involved + Chain.fromOption(a.headOption.map(_.peerId)) + case (e, a, _) => + // We wasn't at e, so need to get through the last peer in case it matches with the relay + findRelayPathEnforcement(a, e) ++ Chain.fromOption(a.headOption.map(_.peerId)) + } + } + .flatMap { appendix => + // Ping the next (join) peer to enforce its data update + super.pathAfter(current).map(_ ++ appendix) + } + + override def endsOn(current: Topology): Eval[List[OnTag]] = current.beforeOn + } + + object XorGroup extends Ends { + override def toString: String = "" + + // Xor tag ends where any child ends; can't get first one as it may lead to recursion + override def endsOn(current: Topology): Eval[List[OnTag]] = + lastChildFinally(current) + + } + + object Root extends Before with Ends with After { + override def toString: String = "" + + override def beforeOn(current: Topology): Eval[List[OnTag]] = current.beginsOn + + override def endsOn(current: Topology): Eval[List[OnTag]] = current.pathOn + + override def afterOn(current: Topology): Eval[List[OnTag]] = current.pathOn + + override def forceExit(current: Topology): Eval[Boolean] = Eval.now(false) + } + + object ParGroup extends Begins with Ends { + override def toString: String = "" + + // Optimization: find the longest common prefix of all the par branches, and move it outside of this par + // When branches will calculate their paths, they will take this move into account. + // So less hops will be produced + override def beginsOn(current: Topology): Eval[List[OnTag]] = + current.children + .map(_.beginsOn.map(_.reverse)) + .reduceLeftOption { case (b1e, b2e) => + (b1e, b2e).mapN { case (b1, b2) => + (b1 zip b2).takeWhile(_ == _).map(_._1) + } + } + .map(_.map(_.reverse)) getOrElse super.beginsOn(current) + + // Par block ends where all the branches end, if they have forced exit (not fire-and-forget) + override def endsOn(current: Topology): Eval[List[OnTag]] = + current.children + .map(_.forceExit) + .reduceLeftOption { case (a, b) => + (a, b).mapN(_ || _) + } + .map(_.flatMap { + case true => current.afterOn + case false => super.endsOn(current) + }) getOrElse super.endsOn(current) + } + + object For extends Begins { + override def toString: String = "" + + // Optimization: get all the path inside the For block out of the block, to avoid repeating + // hops for every For iteration + override def beginsOn(current: Topology): Eval[List[OnTag]] = + (current.forTag zip current.firstChild.map(_.beginsOn)).map { case (f, b) => + // Take path until this for's iterator is used + b.map( + _.reverse + .foldLeft((true, List.empty[OnTag])) { + case ((true, acc), OnTag(_, r)) if r.exists(ValueModel.varName(_).contains(f.item)) => + (false, acc) + case ((true, acc @ (OnTag(_, r @ (r0 ==: _)) :: _)), OnTag(p, _)) + if ValueModel.varName(p).contains(f.item) => + // This is to take the outstanding relay and force moving there + (false, OnTag(r0, r) :: acc) + case ((true, acc), on) => (true, on :: acc) + case ((false, acc), _) => (false, acc) + } + ._2 + ) + } getOrElse super.beginsOn(current) + + } + + object SeqNext extends Begins { + override def toString: String = "/" + + override def beginsOn(current: Topology): Eval[List[OnTag]] = + current.parents.find(_.isForTag).map(_.beginsOn) getOrElse super.beginsOn(current) + } + + def make(cursor: RawCursor): Topology = + Topology( + cursor, + // Before + cursor.parentTag match { + case Some(XorTag) => XorBranch + case Some(_: SeqGroupTag) => SeqGroupBranch + case None => Root + case _ => Default + }, + // Begin + (cursor.parentTag, cursor.tag) match { + case (Some(_: SeqGroupTag), _: NextTag) => + SeqNext + case (_, _: ForTag) => + For + case (_, ParTag | ParTag.Detach) => + ParGroup + case _ => + Default + }, + // End + cursor.tag match { + case _: SeqGroupTag => SeqGroup + case XorTag => XorGroup + case ParTag | ParTag.Detach => ParGroup + case _ if cursor.parentTag.isEmpty => Root + case _ => Default + }, + // After + cursor.parentTag match { + case Some(ParTag | ParTag.Detach) => ParGroupBranch + case Some(XorTag) => XorBranch + case Some(_: SeqGroupTag) => SeqGroupBranch + case None => Root + case _ => Default + } + ) + + def resolve(op: Tree, debug: Boolean = false): Res = { + val resolved = resolveOnMoves(op, debug).value Cofree .cata[Chain, ResolvedOp, Res](resolved) { case (SeqRes, children) => @@ -46,10 +424,11 @@ object Topology extends Logging { else cz.current ) - def resolveOnMoves(op: Tree): Eval[Res] = { - val cursor = RawCursor(NonEmptyList.one(ChainZipper.one(op))) + def resolveOnMoves(op: Tree, debug: Boolean): Eval[Res] = { + val cursor = RawCursor(NonEmptyList.one(ChainZipper.one(op)), None) // TODO: remove var var i = 0 + def nextI = { i = i + 1 i @@ -60,18 +439,37 @@ object Topology extends Logging { logger.debug(s"<:> $rc") val resolved = MakeRes - .resolve(rc.currentPeerId, nextI) + .resolve(rc.topology.currentPeerId, nextI) .lift .apply(rc.tag) logger.trace("Resolved: " + resolved) + if (debug) { + println(Console.BLUE + rc + Console.RESET) + println(rc.topology) + println("Before: " + rc.topology.beforeOn.value) + println("Begin: " + rc.topology.beginsOn.value) + println("PathBefore: " + rc.topology.pathBefore.value) + + println(Console.CYAN + "Parent: " + rc.topology.parent + Console.RESET) + + println("End : " + rc.topology.endsOn.value) + println("After: " + rc.topology.afterOn.value) + println("Exit : " + rc.topology.forceExit.value) + println("PathAfter: " + rc.topology.pathAfter.value) + println(Console.YELLOW + " - - - - -" + Console.RESET) + } + val chainZipperEv = resolved.traverse(cofree => - Eval.later { + ( + rc.topology.pathBefore.map(through(_)), + rc.topology.pathAfter.map(through(_, reversed = true)) + ).mapN { case (pathBefore, pathAfter) => val cz = ChainZipper( - through(rc.pathFromPrev), + pathBefore, cofree, - through(rc.pathToNext) + pathAfter ) if (cz.next.nonEmpty || cz.prev.nonEmpty) { logger.debug(s"Resolved $rc -> $cofree")