mirror of
https://github.com/fluencelabs/aqua.git
synced 2025-04-24 22:42:13 +00:00
Make the timeout error message clearer (#548)
This commit is contained in:
parent
af64da90bd
commit
2daf6ca422
@ -2,7 +2,7 @@ package aqua
|
||||
|
||||
import aqua.builder.{ArgumentGetter, Service}
|
||||
import aqua.raw.value.{ValueRaw, VarRaw}
|
||||
import aqua.run.{GeneralRunOptions, JsonService, RunCommand, RunOpts}
|
||||
import aqua.run.{GeneralOptions, JsonService, RunCommand, RunOpts}
|
||||
import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel}
|
||||
import cats.data.{NonEmptyList, Validated, ValidatedNec}
|
||||
import cats.effect.ExitCode
|
||||
@ -28,7 +28,7 @@ case class RelativePath(path: Path) extends AquaPath
|
||||
|
||||
// All info to run any aqua function
|
||||
case class RunInfo(
|
||||
common: GeneralRunOptions,
|
||||
common: GeneralOptions,
|
||||
func: CliFunc,
|
||||
input: AquaPath,
|
||||
imports: List[Path] = Nil,
|
||||
@ -102,7 +102,7 @@ object SubCommandBuilder {
|
||||
.valid(
|
||||
name,
|
||||
header,
|
||||
GeneralRunOptions.commonGeneralOpt.map { c =>
|
||||
GeneralOptions.opt.map { c =>
|
||||
RunInfo(c, CliFunc(funcName), path)
|
||||
}
|
||||
)
|
||||
|
@ -21,7 +21,7 @@ import aqua.builder.IPFSUploader
|
||||
import aqua.ipfs.js.IpfsApi
|
||||
import aqua.model.LiteralModel
|
||||
import aqua.raw.value.LiteralRaw
|
||||
import aqua.run.{GeneralRunOptions, RunCommand, RunConfig, RunOpts}
|
||||
import aqua.run.{GeneralOptions, RunCommand, RunConfig, RunOpts}
|
||||
import cats.effect.{Concurrent, ExitCode, Resource, Sync}
|
||||
import cats.syntax.flatMap.*
|
||||
import cats.syntax.functor.*
|
||||
@ -56,7 +56,7 @@ object IpfsOpts extends Logging {
|
||||
SubCommandBuilder.valid(
|
||||
"upload",
|
||||
"Upload a file to IPFS",
|
||||
(GeneralRunOptions.commonGeneralOpt, pathOpt).mapN { (common, path) =>
|
||||
(GeneralOptions.opt, pathOpt).mapN { (common, path) =>
|
||||
RunInfo(
|
||||
common,
|
||||
CliFunc(UploadFuncName, LiteralRaw.quote(path) :: Nil),
|
||||
|
@ -3,7 +3,7 @@ package aqua.remote
|
||||
import aqua.ArgOpts.jsonFromFileOpt
|
||||
import aqua.builder.ArgumentGetter
|
||||
import aqua.raw.value.{LiteralRaw, VarRaw}
|
||||
import aqua.run.GeneralRunOptions
|
||||
import aqua.run.GeneralOptions
|
||||
import aqua.types.{ArrayType, ScalarType, StructType}
|
||||
import aqua.*
|
||||
import aqua.json.JsonEncoder
|
||||
@ -60,7 +60,7 @@ object DistOpts extends Logging {
|
||||
SubCommandBuilder.valid(
|
||||
"remove_service",
|
||||
"Remove service",
|
||||
(GeneralRunOptions.commonGeneralOpt, srvIdOpt).mapN { (common, srvId) =>
|
||||
(GeneralOptions.opt, srvIdOpt).mapN { (common, srvId) =>
|
||||
RunInfo(
|
||||
common,
|
||||
CliFunc(RemoveFuncName, LiteralRaw.quote(srvId) :: Nil),
|
||||
@ -73,7 +73,7 @@ object DistOpts extends Logging {
|
||||
SubCommandBuilder.valid(
|
||||
"create_service",
|
||||
"Deploy service from existing blueprint",
|
||||
(GeneralRunOptions.commonGeneralOpt, blueprintIdOpt).mapN { (common, blueprintId) =>
|
||||
(GeneralOptions.opt, blueprintIdOpt).mapN { (common, blueprintId) =>
|
||||
RunInfo(
|
||||
common,
|
||||
CliFunc(CreateServiceFuncName, LiteralRaw.quote(blueprintId) :: Nil),
|
||||
@ -86,7 +86,7 @@ object DistOpts extends Logging {
|
||||
SubCommandBuilder.valid(
|
||||
"add_blueprint",
|
||||
"Add blueprint to a peer",
|
||||
(GeneralRunOptions.commonGeneralOpt, blueprintNameOpt, dependencyOpt).mapN {
|
||||
(GeneralOptions.opt, blueprintNameOpt, dependencyOpt).mapN {
|
||||
(common, blueprintName, dependencies) =>
|
||||
val depsWithHash = dependencies.map { d =>
|
||||
if (d.startsWith("hash:"))
|
||||
@ -129,7 +129,7 @@ object DistOpts extends Logging {
|
||||
"deploy_service",
|
||||
"Deploy service from WASM modules",
|
||||
(
|
||||
GeneralRunOptions.commonGeneralOptWithSecretKey,
|
||||
GeneralOptions.optWithSecretKeyCustomTimeout(60000),
|
||||
configFromFileOpt[F],
|
||||
srvNameOpt
|
||||
).mapN { (common, configFromFileF, srvName) =>
|
||||
@ -147,12 +147,9 @@ object DistOpts extends Logging {
|
||||
val srvArg = VarRaw(srvName, configType)
|
||||
val args = LiteralRaw.quote(srvName) :: srvArg :: Nil
|
||||
// if we have default timeout, increase it
|
||||
val commonWithTimeout = if (common.timeout.isEmpty) {
|
||||
common.copy(timeout = Some(60000))
|
||||
} else common
|
||||
validNec(
|
||||
RunInfo(
|
||||
commonWithTimeout,
|
||||
common,
|
||||
CliFunc(DeployFuncName, args),
|
||||
PackagePath(DistAqua),
|
||||
Nil,
|
||||
|
@ -6,7 +6,7 @@ import aqua.ipfs.IpfsOpts.{pathOpt, UploadFuncName}
|
||||
import aqua.js.FluenceEnvironment
|
||||
import aqua.model.{LiteralModel, ValueModel}
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw}
|
||||
import aqua.run.{GeneralRunOptions, RunCommand, RunConfig, RunOpts}
|
||||
import aqua.run.{GeneralOptions, RunCommand, RunConfig, RunOpts}
|
||||
import aqua.*
|
||||
import cats.Applicative
|
||||
import cats.data.{NonEmptyList, Validated}
|
||||
@ -68,7 +68,7 @@ object RemoteInfoOpts {
|
||||
SubCommandBuilder.valid(
|
||||
"list_interfaces",
|
||||
"List all service interfaces on a peer by a given owner",
|
||||
(GeneralRunOptions.commonGeneralOpt, AppOpts.wrapWithOption(ownerOpt), allFlag).mapN {
|
||||
(GeneralOptions.opt, AppOpts.wrapWithOption(ownerOpt), allFlag).mapN {
|
||||
(common, peer, printAll) =>
|
||||
if (printAll)
|
||||
RunInfo(
|
||||
@ -95,7 +95,7 @@ object RemoteInfoOpts {
|
||||
SubCommandBuilder.valid(
|
||||
GetInterfaceFuncName,
|
||||
"Show interface of a service",
|
||||
(GeneralRunOptions.commonGeneralOpt, idOpt).mapN { (common, serviceId) =>
|
||||
(GeneralOptions.opt, idOpt).mapN { (common, serviceId) =>
|
||||
RunInfo(
|
||||
common,
|
||||
CliFunc(GetInterfaceFuncName, LiteralRaw.quote(serviceId) :: Nil),
|
||||
@ -108,7 +108,7 @@ object RemoteInfoOpts {
|
||||
SubCommandBuilder.valid(
|
||||
GetModuleInterfaceFuncName,
|
||||
"Print a module interface",
|
||||
(GeneralRunOptions.commonGeneralOpt, idOpt).mapN { (common, serviceId) =>
|
||||
(GeneralOptions.opt, idOpt).mapN { (common, serviceId) =>
|
||||
RunInfo(
|
||||
common,
|
||||
CliFunc(GetModuleInterfaceFuncName, LiteralRaw.quote(serviceId) :: Nil),
|
||||
|
@ -16,6 +16,7 @@ import cats.syntax.applicative.*
|
||||
import cats.syntax.flatMap.*
|
||||
import cats.syntax.show.*
|
||||
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.{ExecutionContext, Future, Promise, TimeoutException}
|
||||
import scala.scalajs.js
|
||||
import scala.scalajs.js.{JSON, JavaScriptException, timers}
|
||||
@ -28,6 +29,7 @@ object FuncCaller {
|
||||
* @return
|
||||
*/
|
||||
def funcCall[F[_]: Async](
|
||||
name: String,
|
||||
air: String,
|
||||
functionDef: FunctionDef,
|
||||
config: RunConfig,
|
||||
@ -36,7 +38,9 @@ object FuncCaller {
|
||||
getters: List[ArgumentGetter]
|
||||
): F[ValidatedNec[String, Unit]] = {
|
||||
|
||||
FluenceUtils.setLogLevel(LogLevelTransformer.logLevelToFluenceJS(config.common.logLevel.fluencejs))
|
||||
FluenceUtils.setLogLevel(
|
||||
LogLevelTransformer.logLevelToFluenceJS(config.common.logLevel.fluencejs)
|
||||
)
|
||||
|
||||
// stops peer in any way at the end of execution
|
||||
val resource = Resource.make(Fluence.getPeer().pure[F]) { peer =>
|
||||
@ -48,12 +52,14 @@ object FuncCaller {
|
||||
Async[F].fromFuture {
|
||||
(for {
|
||||
keyPair <- createKeyPair(config.common.secretKey)
|
||||
logLevel: js.UndefOr[aqua.js.LogLevel] = LogLevelTransformer.logLevelToAvm(config.common.logLevel.aquavm)
|
||||
logLevel: js.UndefOr[aqua.js.LogLevel] = LogLevelTransformer.logLevelToAvm(
|
||||
config.common.logLevel.aquavm
|
||||
)
|
||||
_ <- Fluence
|
||||
.start(
|
||||
PeerConfig(
|
||||
config.common.multiaddr,
|
||||
config.common.timeout.getOrElse(scalajs.js.undefined),
|
||||
config.common.timeout.toMillis.toInt : js.UndefOr[Int],
|
||||
keyPair,
|
||||
Debug(printParticleId = config.common.flags.verbose, marineLogLevel = logLevel)
|
||||
)
|
||||
@ -63,7 +69,7 @@ object FuncCaller {
|
||||
if (config.common.flags.showConfig) {
|
||||
val configJson = KeyPairOp.toDynamicJSON(keyPair)
|
||||
configJson.updateDynamic("relay")(config.common.multiaddr)
|
||||
config.common.timeout.foreach(t => configJson.updateDynamic("timeout")(t))
|
||||
configJson.updateDynamic("timeout")(config.common.timeout.toMillis)
|
||||
configJson.updateDynamic("log-level")(config.common.logLevel.compiler.name)
|
||||
OutputPrinter.print(JSON.stringify(configJson, null, 4))
|
||||
}
|
||||
@ -82,23 +88,25 @@ object FuncCaller {
|
||||
_ <- callFuture
|
||||
finisherFuture = finisherService.promise.future
|
||||
// use a timeout in finisher if we have an async function and it hangs on node's side
|
||||
finisher = config.common.timeout.map { t =>
|
||||
setTimeout(finisherFuture, t)
|
||||
}.getOrElse(finisherFuture)
|
||||
finisher = setTimeout(name, finisherFuture, config.common.timeout)
|
||||
_ <- finisher
|
||||
} yield validNec(())).recover(handleFuncCallErrors).pure[F]
|
||||
} yield validNec(()))
|
||||
.recover(handleFuncCallErrors(name, config.common.timeout))
|
||||
.pure[F]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private def setTimeout[T](f: Future[T], timeout: Int)(implicit
|
||||
private def setTimeout[T](funcName: String, f: Future[T], timeout: Duration)(implicit
|
||||
ec: ExecutionContext
|
||||
): Future[T] = {
|
||||
val p = Promise[T]()
|
||||
val timeoutHandle =
|
||||
timers.setTimeout(timeout)(p.tryFailure(new TimeoutException(TimeoutErrorMessage)))
|
||||
timers.setTimeout(timeout.toMillis)(
|
||||
p.tryFailure(new TimeoutException(timeoutErrorMessage(funcName, timeout, None)))
|
||||
)
|
||||
f.onComplete { result =>
|
||||
timers.clearTimeout(timeoutHandle)
|
||||
p.tryComplete(result)
|
||||
@ -106,21 +114,30 @@ object FuncCaller {
|
||||
p.future
|
||||
}
|
||||
|
||||
val TimeoutErrorMessage =
|
||||
"Function execution failed by timeout. You can increase the timeout with '--timeout' option in milliseconds or check if your code can hang while executing."
|
||||
private def timeoutErrorMessage(funcName: String, timeout: Duration, pid: Option[String]) = {
|
||||
val pidStr = pid.map(s => " " + s).getOrElse("")
|
||||
s"Function '$funcName' timed out after ${timeout.toMillis} milliseconds. Increase the timeout with '--timeout' option or check if your code can hang while executing$pidStr."
|
||||
}
|
||||
|
||||
private def handleFuncCallErrors: PartialFunction[Throwable, ValidatedNec[String, Unit]] = { t =>
|
||||
private def handleFuncCallErrors(
|
||||
funcName: String,
|
||||
timeout: Duration
|
||||
): PartialFunction[Throwable, ValidatedNec[String, Unit]] = { t =>
|
||||
val message =
|
||||
t match {
|
||||
case te: TimeoutException => te.getMessage
|
||||
case t if t.getMessage.contains("Request timed out after") =>
|
||||
val msg = t.getMessage
|
||||
timeoutErrorMessage(
|
||||
funcName,
|
||||
timeout,
|
||||
Some(msg.substring(msg.indexOf("particle id") - 1, msg.length))
|
||||
)
|
||||
case tjs: JavaScriptException =>
|
||||
val msg = tjs.exception.asInstanceOf[js.Dynamic].selectDynamic("message")
|
||||
if (scalajs.js.isUndefined(msg)) JSON.stringify(tjs.exception.asInstanceOf[js.Any])
|
||||
else msg.toString
|
||||
case _ =>
|
||||
if (t.getMessage.contains("Request timed out after")) {
|
||||
TimeoutErrorMessage
|
||||
} else JSON.stringify(t.toString)
|
||||
case _ => t.toString
|
||||
}
|
||||
|
||||
invalidNec(message)
|
||||
|
@ -14,6 +14,8 @@ import cats.syntax.apply.*
|
||||
import com.monovore.decline.Opts
|
||||
import scribe.Level
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.scalajs.js
|
||||
import scala.util.Try
|
||||
|
||||
@ -25,8 +27,8 @@ case class Flags(
|
||||
noRelay: Boolean
|
||||
)
|
||||
|
||||
case class GeneralRunOptions(
|
||||
timeout: Option[Int],
|
||||
case class GeneralOptions(
|
||||
timeout: Duration,
|
||||
logLevel: LogLevels,
|
||||
multiaddr: String,
|
||||
on: Option[String],
|
||||
@ -35,7 +37,7 @@ case class GeneralRunOptions(
|
||||
constants: List[ConstantRaw]
|
||||
)
|
||||
|
||||
object GeneralRunOptions {
|
||||
object GeneralOptions {
|
||||
|
||||
val multiaddrOpt: Opts[String] =
|
||||
Opts
|
||||
@ -84,10 +86,11 @@ object GeneralRunOptions {
|
||||
def commonOpt(
|
||||
isRun: Boolean,
|
||||
withSecret: Boolean,
|
||||
withConstants: Boolean
|
||||
): Opts[GeneralRunOptions] =
|
||||
withConstants: Boolean,
|
||||
defaultTimeout: Duration = Duration(7000, TimeUnit.MILLISECONDS)
|
||||
): Opts[GeneralOptions] =
|
||||
(
|
||||
AppOpts.wrapWithOption(timeoutOpt),
|
||||
timeoutOpt.withDefault(defaultTimeout),
|
||||
logLevelOpt,
|
||||
multiaddrOpt,
|
||||
onOpt,
|
||||
@ -95,16 +98,17 @@ object GeneralRunOptions {
|
||||
if (withSecret) { secretKeyOpt.map(Some.apply) }
|
||||
else { AppOpts.wrapWithOption(secretKeyOpt) },
|
||||
if (withConstants) AppOpts.constantOpts else Nil.pure[Opts]
|
||||
).mapN(GeneralRunOptions.apply)
|
||||
).mapN(GeneralOptions.apply)
|
||||
|
||||
val commonGeneralOpt: Opts[GeneralRunOptions] = commonOpt(false, false, false)
|
||||
val commonGeneralRunOpt: Opts[GeneralRunOptions] = commonOpt(true, false, true)
|
||||
val commonGeneralOptWithSecretKey: Opts[GeneralRunOptions] = commonOpt(false, true, false)
|
||||
val opt: Opts[GeneralOptions] = commonOpt(false, false, false)
|
||||
val runOpt: Opts[GeneralOptions] = commonOpt(true, false, true)
|
||||
val optWithSecretKey: Opts[GeneralOptions] = commonOpt(false, true, false)
|
||||
def optWithSecretKeyCustomTimeout(timeoutMs: Int): Opts[GeneralOptions] = commonOpt(false, true, false, Duration(timeoutMs, TimeUnit.MILLISECONDS))
|
||||
}
|
||||
|
||||
// `run` command configuration
|
||||
case class RunConfig(
|
||||
common: GeneralRunOptions,
|
||||
common: GeneralOptions,
|
||||
// services that will pass arguments to air
|
||||
argumentGetters: Map[String, VarJson],
|
||||
// builtin services for aqua run, for example: Console, FileSystem, etc
|
||||
|
@ -80,7 +80,7 @@ object RunOpts extends Logging {
|
||||
name = "run",
|
||||
header = "Run Aqua code",
|
||||
(
|
||||
GeneralRunOptions.commonGeneralRunOpt,
|
||||
GeneralOptions.runOpt,
|
||||
runOptsCompose[F]
|
||||
).mapN {
|
||||
case (
|
||||
|
@ -117,6 +117,7 @@ class Runner(
|
||||
}
|
||||
|
||||
FuncCaller.funcCall[F](
|
||||
func.name,
|
||||
air,
|
||||
definitions,
|
||||
config,
|
||||
|
@ -16,7 +16,7 @@ import aqua.raw.ops.{Call, CallArrowRawTag}
|
||||
import aqua.raw.value.{LiteralRaw, ValueRaw, VarRaw}
|
||||
import aqua.res.{AquaRes, FuncRes}
|
||||
import aqua.run.RunOpts.logger
|
||||
import aqua.run.{GeneralRunOptions, RunCommand, RunConfig, RunOpts}
|
||||
import aqua.run.{GeneralOptions, RunCommand, RunConfig, RunOpts}
|
||||
import aqua.types.{ArrowType, LiteralType, NilType, ScalarType}
|
||||
import cats.data.*
|
||||
import cats.data.Validated.{invalid, invalidNec, valid, validNec, validNel}
|
||||
@ -114,7 +114,7 @@ object ScriptOpts extends Logging {
|
||||
AirGen(funcRes.body).generate.show
|
||||
}
|
||||
|
||||
private def commonScriptOpts = GeneralRunOptions.commonOpt(false, true, true)
|
||||
private def commonScriptOpts = GeneralOptions.commonOpt(false, true, true)
|
||||
|
||||
private def compileAir[F[_]: Async: AquaIO](
|
||||
input: Path,
|
||||
|
@ -7,6 +7,8 @@ import cats.syntax.traverse.*
|
||||
import cats.data.Validated.{invalid, invalidNec, invalidNel, valid, validNec, validNel}
|
||||
|
||||
import java.util.Base64
|
||||
import java.util.concurrent.TimeUnit
|
||||
import scala.concurrent.duration.Duration
|
||||
|
||||
case class LogLevels(
|
||||
compiler: Level = Level.Error,
|
||||
@ -80,9 +82,10 @@ object LogLevels {
|
||||
|
||||
object FluenceOpts {
|
||||
|
||||
val timeoutOpt: Opts[Int] =
|
||||
val timeoutOpt: Opts[Duration] =
|
||||
Opts
|
||||
.option[Int]("timeout", "Request timeout in milliseconds", "t")
|
||||
.map(i => Duration(i, TimeUnit.MILLISECONDS))
|
||||
|
||||
val onOpt: Opts[Option[String]] =
|
||||
AppOpts.wrapWithOption(
|
||||
|
Loading…
x
Reference in New Issue
Block a user