diff --git a/cli/.js/src/main/scala/aqua/CommandBuilder.scala b/cli/.js/src/main/scala/aqua/CommandBuilder.scala index 37f23086..3e78db43 100644 --- a/cli/.js/src/main/scala/aqua/CommandBuilder.scala +++ b/cli/.js/src/main/scala/aqua/CommandBuilder.scala @@ -2,7 +2,7 @@ package aqua import aqua.builder.{ArgumentGetter, Service} import aqua.raw.value.{ValueRaw, VarRaw} -import aqua.run.{GeneralRunOptions, JsonService, RunCommand, RunOpts} +import aqua.run.{GeneralOptions, JsonService, RunCommand, RunOpts} import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel} import cats.data.{NonEmptyList, Validated, ValidatedNec} import cats.effect.ExitCode @@ -28,7 +28,7 @@ case class RelativePath(path: Path) extends AquaPath // All info to run any aqua function case class RunInfo( - common: GeneralRunOptions, + common: GeneralOptions, func: CliFunc, input: AquaPath, imports: List[Path] = Nil, @@ -102,7 +102,7 @@ object SubCommandBuilder { .valid( name, header, - GeneralRunOptions.commonGeneralOpt.map { c => + GeneralOptions.opt.map { c => RunInfo(c, CliFunc(funcName), path) } ) diff --git a/cli/.js/src/main/scala/aqua/ipfs/IpfsOpts.scala b/cli/.js/src/main/scala/aqua/ipfs/IpfsOpts.scala index 44e0b0e5..672bf6e9 100644 --- a/cli/.js/src/main/scala/aqua/ipfs/IpfsOpts.scala +++ b/cli/.js/src/main/scala/aqua/ipfs/IpfsOpts.scala @@ -21,7 +21,7 @@ import aqua.builder.IPFSUploader import aqua.ipfs.js.IpfsApi import aqua.model.LiteralModel import aqua.raw.value.LiteralRaw -import aqua.run.{GeneralRunOptions, RunCommand, RunConfig, RunOpts} +import aqua.run.{GeneralOptions, RunCommand, RunConfig, RunOpts} import cats.effect.{Concurrent, ExitCode, Resource, Sync} import cats.syntax.flatMap.* import cats.syntax.functor.* @@ -56,7 +56,7 @@ object IpfsOpts extends Logging { SubCommandBuilder.valid( "upload", "Upload a file to IPFS", - (GeneralRunOptions.commonGeneralOpt, pathOpt).mapN { (common, path) => + (GeneralOptions.opt, pathOpt).mapN { (common, path) => RunInfo( common, CliFunc(UploadFuncName, LiteralRaw.quote(path) :: Nil), diff --git a/cli/.js/src/main/scala/aqua/remote/DistOpts.scala b/cli/.js/src/main/scala/aqua/remote/DistOpts.scala index 36535433..380c814d 100644 --- a/cli/.js/src/main/scala/aqua/remote/DistOpts.scala +++ b/cli/.js/src/main/scala/aqua/remote/DistOpts.scala @@ -3,7 +3,7 @@ package aqua.remote import aqua.ArgOpts.jsonFromFileOpt import aqua.builder.ArgumentGetter import aqua.raw.value.{LiteralRaw, VarRaw} -import aqua.run.GeneralRunOptions +import aqua.run.GeneralOptions import aqua.types.{ArrayType, ScalarType, StructType} import aqua.* import aqua.json.JsonEncoder @@ -60,7 +60,7 @@ object DistOpts extends Logging { SubCommandBuilder.valid( "remove_service", "Remove service", - (GeneralRunOptions.commonGeneralOpt, srvIdOpt).mapN { (common, srvId) => + (GeneralOptions.opt, srvIdOpt).mapN { (common, srvId) => RunInfo( common, CliFunc(RemoveFuncName, LiteralRaw.quote(srvId) :: Nil), @@ -73,7 +73,7 @@ object DistOpts extends Logging { SubCommandBuilder.valid( "create_service", "Deploy service from existing blueprint", - (GeneralRunOptions.commonGeneralOpt, blueprintIdOpt).mapN { (common, blueprintId) => + (GeneralOptions.opt, blueprintIdOpt).mapN { (common, blueprintId) => RunInfo( common, CliFunc(CreateServiceFuncName, LiteralRaw.quote(blueprintId) :: Nil), @@ -86,7 +86,7 @@ object DistOpts extends Logging { SubCommandBuilder.valid( "add_blueprint", "Add blueprint to a peer", - (GeneralRunOptions.commonGeneralOpt, blueprintNameOpt, dependencyOpt).mapN { + (GeneralOptions.opt, blueprintNameOpt, dependencyOpt).mapN { (common, blueprintName, dependencies) => val depsWithHash = dependencies.map { d => if (d.startsWith("hash:")) @@ -129,7 +129,7 @@ object DistOpts extends Logging { "deploy_service", "Deploy service from WASM modules", ( - GeneralRunOptions.commonGeneralOptWithSecretKey, + GeneralOptions.optWithSecretKeyCustomTimeout(60000), configFromFileOpt[F], srvNameOpt ).mapN { (common, configFromFileF, srvName) => @@ -147,12 +147,9 @@ object DistOpts extends Logging { val srvArg = VarRaw(srvName, configType) val args = LiteralRaw.quote(srvName) :: srvArg :: Nil // if we have default timeout, increase it - val commonWithTimeout = if (common.timeout.isEmpty) { - common.copy(timeout = Some(60000)) - } else common validNec( RunInfo( - commonWithTimeout, + common, CliFunc(DeployFuncName, args), PackagePath(DistAqua), Nil, diff --git a/cli/.js/src/main/scala/aqua/remote/RemoteInfoOpts.scala b/cli/.js/src/main/scala/aqua/remote/RemoteInfoOpts.scala index 24ed4ab9..dd6ec763 100644 --- a/cli/.js/src/main/scala/aqua/remote/RemoteInfoOpts.scala +++ b/cli/.js/src/main/scala/aqua/remote/RemoteInfoOpts.scala @@ -6,7 +6,7 @@ import aqua.ipfs.IpfsOpts.{pathOpt, UploadFuncName} import aqua.js.FluenceEnvironment import aqua.model.{LiteralModel, ValueModel} import aqua.raw.value.{LiteralRaw, ValueRaw} -import aqua.run.{GeneralRunOptions, RunCommand, RunConfig, RunOpts} +import aqua.run.{GeneralOptions, RunCommand, RunConfig, RunOpts} import aqua.* import cats.Applicative import cats.data.{NonEmptyList, Validated} @@ -68,7 +68,7 @@ object RemoteInfoOpts { SubCommandBuilder.valid( "list_interfaces", "List all service interfaces on a peer by a given owner", - (GeneralRunOptions.commonGeneralOpt, AppOpts.wrapWithOption(ownerOpt), allFlag).mapN { + (GeneralOptions.opt, AppOpts.wrapWithOption(ownerOpt), allFlag).mapN { (common, peer, printAll) => if (printAll) RunInfo( @@ -95,7 +95,7 @@ object RemoteInfoOpts { SubCommandBuilder.valid( GetInterfaceFuncName, "Show interface of a service", - (GeneralRunOptions.commonGeneralOpt, idOpt).mapN { (common, serviceId) => + (GeneralOptions.opt, idOpt).mapN { (common, serviceId) => RunInfo( common, CliFunc(GetInterfaceFuncName, LiteralRaw.quote(serviceId) :: Nil), @@ -108,7 +108,7 @@ object RemoteInfoOpts { SubCommandBuilder.valid( GetModuleInterfaceFuncName, "Print a module interface", - (GeneralRunOptions.commonGeneralOpt, idOpt).mapN { (common, serviceId) => + (GeneralOptions.opt, idOpt).mapN { (common, serviceId) => RunInfo( common, CliFunc(GetModuleInterfaceFuncName, LiteralRaw.quote(serviceId) :: Nil), diff --git a/cli/.js/src/main/scala/aqua/run/FuncCaller.scala b/cli/.js/src/main/scala/aqua/run/FuncCaller.scala index 962d3f0b..265120a5 100644 --- a/cli/.js/src/main/scala/aqua/run/FuncCaller.scala +++ b/cli/.js/src/main/scala/aqua/run/FuncCaller.scala @@ -16,6 +16,7 @@ import cats.syntax.applicative.* import cats.syntax.flatMap.* import cats.syntax.show.* +import scala.concurrent.duration.Duration import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException} import scala.scalajs.js import scala.scalajs.js.{JSON, JavaScriptException, timers} @@ -28,6 +29,7 @@ object FuncCaller { * @return */ def funcCall[F[_]: Async]( + name: String, air: String, functionDef: FunctionDef, config: RunConfig, @@ -36,7 +38,9 @@ object FuncCaller { getters: List[ArgumentGetter] ): F[ValidatedNec[String, Unit]] = { - FluenceUtils.setLogLevel(LogLevelTransformer.logLevelToFluenceJS(config.common.logLevel.fluencejs)) + FluenceUtils.setLogLevel( + LogLevelTransformer.logLevelToFluenceJS(config.common.logLevel.fluencejs) + ) // stops peer in any way at the end of execution val resource = Resource.make(Fluence.getPeer().pure[F]) { peer => @@ -48,12 +52,14 @@ object FuncCaller { Async[F].fromFuture { (for { keyPair <- createKeyPair(config.common.secretKey) - logLevel: js.UndefOr[aqua.js.LogLevel] = LogLevelTransformer.logLevelToAvm(config.common.logLevel.aquavm) + logLevel: js.UndefOr[aqua.js.LogLevel] = LogLevelTransformer.logLevelToAvm( + config.common.logLevel.aquavm + ) _ <- Fluence .start( PeerConfig( config.common.multiaddr, - config.common.timeout.getOrElse(scalajs.js.undefined), + config.common.timeout.toMillis.toInt : js.UndefOr[Int], keyPair, Debug(printParticleId = config.common.flags.verbose, marineLogLevel = logLevel) ) @@ -63,7 +69,7 @@ object FuncCaller { if (config.common.flags.showConfig) { val configJson = KeyPairOp.toDynamicJSON(keyPair) configJson.updateDynamic("relay")(config.common.multiaddr) - config.common.timeout.foreach(t => configJson.updateDynamic("timeout")(t)) + configJson.updateDynamic("timeout")(config.common.timeout.toMillis) configJson.updateDynamic("log-level")(config.common.logLevel.compiler.name) OutputPrinter.print(JSON.stringify(configJson, null, 4)) } @@ -82,23 +88,25 @@ object FuncCaller { _ <- callFuture finisherFuture = finisherService.promise.future // use a timeout in finisher if we have an async function and it hangs on node's side - finisher = config.common.timeout.map { t => - setTimeout(finisherFuture, t) - }.getOrElse(finisherFuture) + finisher = setTimeout(name, finisherFuture, config.common.timeout) _ <- finisher - } yield validNec(())).recover(handleFuncCallErrors).pure[F] + } yield validNec(())) + .recover(handleFuncCallErrors(name, config.common.timeout)) + .pure[F] } } } } - private def setTimeout[T](f: Future[T], timeout: Int)(implicit + private def setTimeout[T](funcName: String, f: Future[T], timeout: Duration)(implicit ec: ExecutionContext ): Future[T] = { val p = Promise[T]() val timeoutHandle = - timers.setTimeout(timeout)(p.tryFailure(new TimeoutException(TimeoutErrorMessage))) + timers.setTimeout(timeout.toMillis)( + p.tryFailure(new TimeoutException(timeoutErrorMessage(funcName, timeout, None))) + ) f.onComplete { result => timers.clearTimeout(timeoutHandle) p.tryComplete(result) @@ -106,21 +114,30 @@ object FuncCaller { p.future } - val TimeoutErrorMessage = - "Function execution failed by timeout. You can increase the timeout with '--timeout' option in milliseconds or check if your code can hang while executing." + private def timeoutErrorMessage(funcName: String, timeout: Duration, pid: Option[String]) = { + val pidStr = pid.map(s => " " + s).getOrElse("") + s"Function '$funcName' timed out after ${timeout.toMillis} milliseconds. Increase the timeout with '--timeout' option or check if your code can hang while executing$pidStr." + } - private def handleFuncCallErrors: PartialFunction[Throwable, ValidatedNec[String, Unit]] = { t => + private def handleFuncCallErrors( + funcName: String, + timeout: Duration + ): PartialFunction[Throwable, ValidatedNec[String, Unit]] = { t => val message = t match { case te: TimeoutException => te.getMessage + case t if t.getMessage.contains("Request timed out after") => + val msg = t.getMessage + timeoutErrorMessage( + funcName, + timeout, + Some(msg.substring(msg.indexOf("particle id") - 1, msg.length)) + ) case tjs: JavaScriptException => val msg = tjs.exception.asInstanceOf[js.Dynamic].selectDynamic("message") if (scalajs.js.isUndefined(msg)) JSON.stringify(tjs.exception.asInstanceOf[js.Any]) else msg.toString - case _ => - if (t.getMessage.contains("Request timed out after")) { - TimeoutErrorMessage - } else JSON.stringify(t.toString) + case _ => t.toString } invalidNec(message) diff --git a/cli/.js/src/main/scala/aqua/run/RunConfig.scala b/cli/.js/src/main/scala/aqua/run/RunConfig.scala index 6b62b444..0330f17e 100644 --- a/cli/.js/src/main/scala/aqua/run/RunConfig.scala +++ b/cli/.js/src/main/scala/aqua/run/RunConfig.scala @@ -14,6 +14,8 @@ import cats.syntax.apply.* import com.monovore.decline.Opts import scribe.Level +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration import scala.scalajs.js import scala.util.Try @@ -25,8 +27,8 @@ case class Flags( noRelay: Boolean ) -case class GeneralRunOptions( - timeout: Option[Int], +case class GeneralOptions( + timeout: Duration, logLevel: LogLevels, multiaddr: String, on: Option[String], @@ -35,7 +37,7 @@ case class GeneralRunOptions( constants: List[ConstantRaw] ) -object GeneralRunOptions { +object GeneralOptions { val multiaddrOpt: Opts[String] = Opts @@ -84,10 +86,11 @@ object GeneralRunOptions { def commonOpt( isRun: Boolean, withSecret: Boolean, - withConstants: Boolean - ): Opts[GeneralRunOptions] = + withConstants: Boolean, + defaultTimeout: Duration = Duration(7000, TimeUnit.MILLISECONDS) + ): Opts[GeneralOptions] = ( - AppOpts.wrapWithOption(timeoutOpt), + timeoutOpt.withDefault(defaultTimeout), logLevelOpt, multiaddrOpt, onOpt, @@ -95,16 +98,17 @@ object GeneralRunOptions { if (withSecret) { secretKeyOpt.map(Some.apply) } else { AppOpts.wrapWithOption(secretKeyOpt) }, if (withConstants) AppOpts.constantOpts else Nil.pure[Opts] - ).mapN(GeneralRunOptions.apply) + ).mapN(GeneralOptions.apply) - val commonGeneralOpt: Opts[GeneralRunOptions] = commonOpt(false, false, false) - val commonGeneralRunOpt: Opts[GeneralRunOptions] = commonOpt(true, false, true) - val commonGeneralOptWithSecretKey: Opts[GeneralRunOptions] = commonOpt(false, true, false) + val opt: Opts[GeneralOptions] = commonOpt(false, false, false) + val runOpt: Opts[GeneralOptions] = commonOpt(true, false, true) + val optWithSecretKey: Opts[GeneralOptions] = commonOpt(false, true, false) + def optWithSecretKeyCustomTimeout(timeoutMs: Int): Opts[GeneralOptions] = commonOpt(false, true, false, Duration(timeoutMs, TimeUnit.MILLISECONDS)) } // `run` command configuration case class RunConfig( - common: GeneralRunOptions, + common: GeneralOptions, // services that will pass arguments to air argumentGetters: Map[String, VarJson], // builtin services for aqua run, for example: Console, FileSystem, etc diff --git a/cli/.js/src/main/scala/aqua/run/RunOpts.scala b/cli/.js/src/main/scala/aqua/run/RunOpts.scala index d3fead41..a95778ec 100644 --- a/cli/.js/src/main/scala/aqua/run/RunOpts.scala +++ b/cli/.js/src/main/scala/aqua/run/RunOpts.scala @@ -80,7 +80,7 @@ object RunOpts extends Logging { name = "run", header = "Run Aqua code", ( - GeneralRunOptions.commonGeneralRunOpt, + GeneralOptions.runOpt, runOptsCompose[F] ).mapN { case ( diff --git a/cli/.js/src/main/scala/aqua/run/Runner.scala b/cli/.js/src/main/scala/aqua/run/Runner.scala index d83baf64..df0a2b20 100644 --- a/cli/.js/src/main/scala/aqua/run/Runner.scala +++ b/cli/.js/src/main/scala/aqua/run/Runner.scala @@ -117,6 +117,7 @@ class Runner( } FuncCaller.funcCall[F]( + func.name, air, definitions, config, diff --git a/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala b/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala index dade80f7..1f4dc774 100644 --- a/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala +++ b/cli/.js/src/main/scala/aqua/script/ScriptOpts.scala @@ -16,7 +16,7 @@ import aqua.raw.ops.{Call, CallArrowRawTag} import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw} import aqua.res.{AquaRes, FuncRes} import aqua.run.RunOpts.logger -import aqua.run.{GeneralRunOptions, RunCommand, RunConfig, RunOpts} +import aqua.run.{GeneralOptions, RunCommand, RunConfig, RunOpts} import aqua.types.{ArrowType, LiteralType, NilType, ScalarType} import cats.data.* import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel} @@ -114,7 +114,7 @@ object ScriptOpts extends Logging { AirGen(funcRes.body).generate.show } - private def commonScriptOpts = GeneralRunOptions.commonOpt(false, true, true) + private def commonScriptOpts = GeneralOptions.commonOpt(false, true, true) private def compileAir[F[_]: Async: AquaIO]( input: Path, diff --git a/cli/src/main/scala/aqua/FluenceOpts.scala b/cli/src/main/scala/aqua/FluenceOpts.scala index 55170f0b..27fb3ebe 100644 --- a/cli/src/main/scala/aqua/FluenceOpts.scala +++ b/cli/src/main/scala/aqua/FluenceOpts.scala @@ -7,6 +7,8 @@ import cats.syntax.traverse.* import cats.data.Validated.{invalid, invalidNec, invalidNel, valid, validNec, validNel} import java.util.Base64 +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration case class LogLevels( compiler: Level = Level.Error, @@ -80,9 +82,10 @@ object LogLevels { object FluenceOpts { - val timeoutOpt: Opts[Int] = + val timeoutOpt: Opts[Duration] = Opts .option[Int]("timeout", "Request timeout in milliseconds", "t") + .map(i => Duration(i, TimeUnit.MILLISECONDS)) val onOpt: Opts[Option[String]] = AppOpts.wrapWithOption(