mirror of
https://github.com/fluencelabs/aqua.git
synced 2025-04-24 22:42:13 +00:00
Fix empty streams as result (#582)
This commit is contained in:
parent
67fe5a8c7e
commit
61f483e16b
@ -26,57 +26,65 @@ object ArrowInliner extends Logging {
|
||||
// Apply a callable function, get its fully resolved body & optional value, if any
|
||||
private def inline[S: Mangler: Arrows: Exports](
|
||||
fn: FuncArrow,
|
||||
call: CallModel,
|
||||
outsideDeclaredStreams: Map[String, ValueModel]
|
||||
call: CallModel
|
||||
): State[S, (OpModel.Tree, List[ValueModel])] =
|
||||
// Function's internal variables will not be available outside, hence the scope
|
||||
Exports[S].scope(
|
||||
for {
|
||||
// Process renamings, prepare environment
|
||||
tr <- prelude[S](fn, call)
|
||||
(tree, result) = tr
|
||||
Exports[S].exports
|
||||
.map(exports =>
|
||||
exports.collect { case e @ (_, VarModel(_, StreamType(_), _)) =>
|
||||
e
|
||||
}
|
||||
)
|
||||
.flatMap { outsideDeclaredStreams =>
|
||||
// Function's internal variables will not be available outside, hence the scope
|
||||
Exports[S].scope(
|
||||
for {
|
||||
// Process renamings, prepare environment
|
||||
tr <- prelude[S](fn, call)
|
||||
(tree, result) = tr
|
||||
|
||||
// Register captured values as available exports
|
||||
_ <- Exports[S].resolved(fn.capturedValues)
|
||||
_ <- Mangler[S].forbid(fn.capturedValues.keySet)
|
||||
// Register captured values as available exports
|
||||
_ <- Exports[S].resolved(fn.capturedValues)
|
||||
_ <- Mangler[S].forbid(fn.capturedValues.keySet)
|
||||
|
||||
// Now, substitute the arrows that were received as function arguments
|
||||
// Use the new op tree (args are replaced with values, names are unique & safe)
|
||||
callableFuncBodyNoTopology <- TagInliner.handleTree(tree, fn.funcName)
|
||||
callableFuncBody =
|
||||
fn.capturedTopology
|
||||
.fold[OpModel](SeqModel)(ApplyTopologyModel.apply)
|
||||
.wrap(callableFuncBodyNoTopology)
|
||||
// Now, substitute the arrows that were received as function arguments
|
||||
// Use the new op tree (args are replaced with values, names are unique & safe)
|
||||
callableFuncBodyNoTopology <- TagInliner.handleTree(tree, fn.funcName)
|
||||
callableFuncBody =
|
||||
fn.capturedTopology
|
||||
.fold[OpModel](SeqModel)(ApplyTopologyModel.apply)
|
||||
.wrap(callableFuncBodyNoTopology)
|
||||
|
||||
// Fix return values with exports collected in the body
|
||||
resolvedResult <- RawValueInliner.valueListToModel(result)
|
||||
|
||||
// Fix the return values
|
||||
(ops, rets) = (call.exportTo zip resolvedResult)
|
||||
.map[(List[OpModel.Tree], ValueModel)] {
|
||||
case (CallModel.Export(n, StreamType(_)), (res@VarModel(_, StreamType(_), _), resDesugar))
|
||||
if !outsideDeclaredStreams.contains(n) =>
|
||||
resDesugar.toList -> res
|
||||
case (CallModel.Export(exp, st @ StreamType(_)), (res, resDesugar)) =>
|
||||
// pass nested function results to a stream
|
||||
(resDesugar.toList :+ PushToStreamModel(
|
||||
res,
|
||||
CallModel.Export(exp, st)
|
||||
).leaf) -> VarModel(
|
||||
exp,
|
||||
st,
|
||||
Chain.empty
|
||||
)
|
||||
case (_, (res, resDesugar)) =>
|
||||
resDesugar.toList -> res
|
||||
}
|
||||
.foldLeft[(List[OpModel.Tree], List[ValueModel])](
|
||||
(callableFuncBody :: Nil, Nil)
|
||||
) { case ((ops, rets), (fo, r)) =>
|
||||
(fo ::: ops, r :: rets)
|
||||
}
|
||||
} yield SeqModel.wrap(ops.reverse: _*) -> rets.reverse
|
||||
)
|
||||
// Fix return values with exports collected in the body
|
||||
resolvedResult <- RawValueInliner.valueListToModel(result)
|
||||
// Fix the return values
|
||||
(ops, rets) = (call.exportTo zip resolvedResult)
|
||||
.map[(List[OpModel.Tree], ValueModel)] {
|
||||
case (
|
||||
CallModel.Export(n, StreamType(_)),
|
||||
(res @ VarModel(_, StreamType(_), _), resDesugar)
|
||||
) if !outsideDeclaredStreams.contains(n) =>
|
||||
resDesugar.toList -> res
|
||||
case (CallModel.Export(exp, st @ StreamType(_)), (res, resDesugar)) =>
|
||||
// pass nested function results to a stream
|
||||
(resDesugar.toList :+ PushToStreamModel(
|
||||
res,
|
||||
CallModel.Export(exp, st)
|
||||
).leaf) -> VarModel(
|
||||
exp,
|
||||
st,
|
||||
Chain.empty
|
||||
)
|
||||
case (_, (res, resDesugar)) =>
|
||||
resDesugar.toList -> res
|
||||
}
|
||||
.foldLeft[(List[OpModel.Tree], List[ValueModel])](
|
||||
(callableFuncBody :: Nil, Nil)
|
||||
) { case ((ops, rets), (fo, r)) =>
|
||||
(fo ::: ops, r :: rets)
|
||||
}
|
||||
} yield SeqModel.wrap(ops.reverse: _*) -> rets.reverse
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Prepare the state context for this function call
|
||||
@ -173,16 +181,10 @@ object ArrowInliner extends Logging {
|
||||
for {
|
||||
passArrows <- Arrows[S].pickArrows(call.arrowArgNames)
|
||||
|
||||
exports <- Exports[S].exports
|
||||
declaredStreams = exports.filter {
|
||||
case (_, VarModel(_, StreamType(_), _)) => true
|
||||
case _ => false
|
||||
}
|
||||
|
||||
av <- Arrows[S].scope(
|
||||
for {
|
||||
_ <- Arrows[S].resolved(passArrows)
|
||||
av <- ArrowInliner.inline(arrow, call, declaredStreams)
|
||||
av <- ArrowInliner.inline(arrow, call)
|
||||
} yield av
|
||||
)
|
||||
(appliedOp, value) = av
|
||||
|
@ -90,14 +90,12 @@ object RawValueInliner extends Logging {
|
||||
}
|
||||
}.map(inline.predo.toList ::: _)
|
||||
|
||||
def valueToModel[S: Mangler: Exports: Arrows](
|
||||
value: ValueRaw
|
||||
private def toModel[S: Mangler: Exports: Arrows](
|
||||
unfoldF: State[S, (ValueModel, Inline)]
|
||||
): State[S, (ValueModel, Option[OpModel.Tree])] =
|
||||
for {
|
||||
vmp <- unfold(value)
|
||||
vmp <- unfoldF
|
||||
(vm, map) = vmp
|
||||
|
||||
_ = logger.trace("RAW " + value)
|
||||
_ = logger.trace("MOD " + vm)
|
||||
dc <- Exports[S].exports
|
||||
_ = logger.trace("DEC " + dc)
|
||||
@ -107,6 +105,21 @@ object RawValueInliner extends Logging {
|
||||
_ = logger.trace("map was: " + map)
|
||||
} yield vm -> parDesugarPrefix(ops)
|
||||
|
||||
def collectionToModel[S: Mangler: Exports: Arrows](
|
||||
value: CollectionRaw,
|
||||
assignTo: Option[String]
|
||||
): State[S, (ValueModel, Option[OpModel.Tree])] = {
|
||||
logger.trace("RAW COLLECTION " + value)
|
||||
toModel(CollectionRawInliner.unfoldCollection(value, assignTo))
|
||||
}
|
||||
|
||||
def valueToModel[S: Mangler: Exports: Arrows](
|
||||
value: ValueRaw
|
||||
): State[S, (ValueModel, Option[OpModel.Tree])] = {
|
||||
logger.trace("RAW " + value)
|
||||
toModel(unfold(value))
|
||||
}
|
||||
|
||||
def valueListToModel[S: Mangler: Exports: Arrows](
|
||||
values: List[ValueRaw]
|
||||
): State[S, List[(ValueModel, Option[OpModel.Tree])]] =
|
||||
|
@ -2,7 +2,8 @@ package aqua.model.inline
|
||||
|
||||
import aqua.model.inline.state.{Arrows, Counter, Exports, Mangler}
|
||||
import aqua.model.*
|
||||
import aqua.model.inline.raw.CallArrowRawInliner
|
||||
import aqua.model.inline.RawValueInliner.collectionToModel
|
||||
import aqua.model.inline.raw.{CallArrowRawInliner, CollectionRawInliner}
|
||||
import aqua.raw.ops.*
|
||||
import aqua.raw.value.*
|
||||
import aqua.types.{ArrayType, BoxType, CanonStreamType, StreamType}
|
||||
@ -10,7 +11,7 @@ import cats.syntax.traverse.*
|
||||
import cats.syntax.applicative.*
|
||||
import cats.instances.list.*
|
||||
import cats.data.{Chain, State, StateT}
|
||||
import scribe.{log, Logging}
|
||||
import scribe.{Logging, log}
|
||||
|
||||
/**
|
||||
* [[TagInliner]] prepares a [[RawTag]] for futher processing by converting [[ValueRaw]]s into [[ValueModel]]s.
|
||||
@ -179,10 +180,17 @@ object TagInliner extends Logging {
|
||||
}
|
||||
|
||||
case AssignmentTag(value, assignTo) =>
|
||||
for {
|
||||
cd <- valueToModel(value)
|
||||
_ <- Exports[S].resolved(assignTo, cd._1)
|
||||
} yield Some(SeqModel) -> cd._2
|
||||
(value match {
|
||||
// if we assign collection to a stream, we must use it's name, because it is already created with 'new'
|
||||
case c@CollectionRaw(_, _: StreamType) =>
|
||||
collectionToModel(c, Some(assignTo))
|
||||
case v =>
|
||||
valueToModel(v)
|
||||
}).flatMap { cd =>
|
||||
for {
|
||||
_ <- Exports[S].resolved(assignTo, cd._1)
|
||||
} yield Some(SeqModel) -> cd._2
|
||||
}
|
||||
|
||||
case ClosureTag(arrow, detach) =>
|
||||
if (detach) Arrows[S].resolved(arrow, None).map(_ => None -> None)
|
||||
|
@ -1,6 +1,16 @@
|
||||
package aqua.model.inline.raw
|
||||
|
||||
import aqua.model.{CallModel, CanonicalizeModel, NullModel, PushToStreamModel, RestrictionModel, SeqModel, ValueModel, VarModel, XorModel}
|
||||
import aqua.model.{
|
||||
CallModel,
|
||||
CanonicalizeModel,
|
||||
NullModel,
|
||||
PushToStreamModel,
|
||||
RestrictionModel,
|
||||
SeqModel,
|
||||
ValueModel,
|
||||
VarModel,
|
||||
XorModel
|
||||
}
|
||||
import aqua.model.inline.Inline
|
||||
import aqua.model.inline.RawValueInliner.valueToModel
|
||||
import aqua.model.inline.state.{Arrows, Exports, Mangler}
|
||||
@ -13,18 +23,20 @@ object CollectionRawInliner extends RawInliner[CollectionRaw] {
|
||||
override def apply[S: Mangler: Exports: Arrows](
|
||||
raw: CollectionRaw,
|
||||
propertiesAllowed: Boolean
|
||||
): State[S, (ValueModel, Inline)] = unfoldCollection(raw)
|
||||
|
||||
def unfoldCollection[S: Mangler: Exports: Arrows](
|
||||
raw: CollectionRaw,
|
||||
assignToName: Option[String] = None
|
||||
): State[S, (ValueModel, Inline)] =
|
||||
for {
|
||||
streamName <- Mangler[S].findAndForbidName(
|
||||
(
|
||||
raw.boxType match {
|
||||
case _: StreamType => "stream"
|
||||
case _: CanonStreamType => "canon_stream"
|
||||
case _: ArrayType => "array"
|
||||
case _: OptionType => "option"
|
||||
}
|
||||
) + "-inline"
|
||||
)
|
||||
streamName <-
|
||||
raw.boxType match {
|
||||
case _: StreamType => assignToName.map(s => State.pure(s)).getOrElse(Mangler[S].findAndForbidName("stream-inline"))
|
||||
case _: CanonStreamType => Mangler[S].findAndForbidName("canon_stream-inline")
|
||||
case _: ArrayType => Mangler[S].findAndForbidName("array-inline")
|
||||
case _: OptionType => Mangler[S].findAndForbidName("option-inline")
|
||||
}
|
||||
|
||||
stream = VarModel(streamName, StreamType(raw.elementType))
|
||||
streamExp = CallModel.Export(stream.name, stream.`type`)
|
||||
|
Loading…
x
Reference in New Issue
Block a user