Codec in subproject (#5)

* codec in separate module

* lisence headers

* codec.direct and codec.inverse as Kleisli instances

* test compilation error fixed

* fork and not execute in parallel

* better Codec; some dataset-grpc server implementations

* license headers

* compile bug fixed

* kademlia's Key invariants checking

* ContractsAllocatorApiServer implementation

* node test fixed

* better errors catching in KryoCodecs & disable logging in simulation tests
This commit is contained in:
Dmitry Kurinskiy 2017-12-27 15:02:38 +03:00 committed by GitHub
commit 8d4983a636
4 changed files with 265 additions and 0 deletions

View File

@ -0,0 +1,65 @@
/*
* Copyright (C) 2017 Fluence Labs Limited
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package fluence.codec
import cats.data.Kleisli
import cats.{ Applicative, FlatMap, Traverse }
import cats.syntax.applicative._
import scala.language.{ higherKinds, implicitConversions }
/**
* Base trait for serialize/deserialize objects.
*
* @tparam A The type of plain object representation
* @tparam B The type of binary representation
* @tparam F Encoding/decoding effect
*/
final case class Codec[F[_], A, B](encode: A F[B], decode: B F[A]) {
self
val direct: Kleisli[F, A, B] = Kleisli(encode)
val inverse: Kleisli[F, B, A] = Kleisli(decode)
def andThen[C](other: Codec[F, B, C])(implicit F: FlatMap[F]): Codec[F, A, C] =
Codec((self.direct andThen other.direct).run, (other.inverse andThen self.inverse).run)
}
object Codec {
implicit def identityCodec[F[_] : Applicative, T]: Codec[F, T, T] =
Codec(_.pure[F], _.pure[F])
implicit def traverseCodec[F[_] : Applicative, G[_] : Traverse, O, B](implicit codec: Codec[F, O, B]): Codec[F, G[O], G[B]] =
Codec[F, G[O], G[B]](Traverse[G].traverse[F, O, B](_)(codec.encode), Traverse[G].traverse[F, B, O](_)(codec.decode))
def codec[F[_], O, B](implicit codec: Codec[F, O, B]): Codec[F, O, B] = codec
/**
* Constructs a Codec from pure encode/decode functions and an Applicative
*
* @param encodeFn Encode function that never fail
* @param decodeFn Decode function that never fail
* @tparam F Applicative effect
* @tparam O Raw type
* @tparam B Encoded type
* @return New codec for O and B
*/
def pure[F[_] : Applicative, O, B](encodeFn: O B, decodeFn: B O): Codec[F, O, B] =
Codec(encodeFn(_).pure[F], decodeFn(_).pure[F])
}

View File

@ -0,0 +1,105 @@
/*
* Copyright (C) 2017 Fluence Labs Limited
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package fluence.codec.kryo
import cats.MonadError
import cats.syntax.flatMap._
import com.twitter.chill.KryoPool
import fluence.codec.Codec
import shapeless._
import scala.language.higherKinds
import scala.reflect.ClassTag
/**
* Wrapper for a KryoPool with a list of registered classes
* @param pool Pre-configured KryoPool
* @param F Applicative error
* @tparam L List of classes registered with kryo
* @tparam F Effect
*/
class KryoCodecs[F[_], L <: HList] private (pool: KryoPool)(implicit F: MonadError[F, Throwable]) {
/**
* Returns a codec for any registered type
* @param sel Shows the presence of type T within list L
* @tparam T Object type
* @return Freshly created Codec with Kryo inside
*/
implicit def codec[T](implicit sel: ops.hlist.Selector[L, T]): Codec[F, T, Array[Byte]] =
Codec(
obj Option(obj) match {
case Some(o)
F.catchNonFatal(Option(pool.toBytesWithClass(o))).flatMap {
case Some(v) F.pure(v)
case None F.raiseError(new NullPointerException("Obj is encoded into null"))
}
case None
F.raiseError[Array[Byte]](new NullPointerException("Obj is null, encoding is impossible"))
}, binary F.catchNonFatal(pool.fromBytes(binary).asInstanceOf[T]))
}
object KryoCodecs {
/**
* Builder for Kryo codecs
* @param klasses Classes to register with Kryo
* @tparam L List of registered classes
*/
class Builder[L <: HList] private[KryoCodecs] (klasses: Seq[Class[_]]) {
/**
* Register a new case class T to Kryo
* @tparam T Type to add
* @tparam S Generic representation of T
* @param gen Generic representation of case type T
* @param sa Presence of all types of S inside L
* @return Extended builder
*/
def addCase[T, S <: HList](klass: Class[T])(implicit gen: Generic.Aux[T, S], sa: ops.hlist.SelectAll[L, S]): Builder[T :: L] =
new Builder[T :: L](klasses :+ klass)
/**
* Register a primitive type T to Kryo
* @tparam T Type to add
* @return Extended builder
*/
def add[T : ClassTag]: Builder[T :: L] =
new Builder[T :: L](klasses :+ implicitly[ClassTag[T]].runtimeClass)
/**
* Build a new instance of KryoCodecs with the given poolSize and F effect
* @param poolSize Kryo pool size
* @param F ApplicativeError for catching serialization errors
* @tparam F Effect type
* @return Configured instance of KryoCodecs
*/
def build[F[_]](poolSize: Int = Runtime.getRuntime.availableProcessors)(implicit F: MonadError[F, Throwable]): KryoCodecs[F, L] =
new KryoCodecs[F, L](
KryoPool.withByteArrayOutputStream(
poolSize,
KryoFactory(klasses, registrationRequired = true) // registrationRequired should never be needed, as codec derivation is typesafe
)
)
}
/**
* Prepares a fresh builder
*/
def apply(): Builder[Array[Byte] :: Long :: String :: HNil] =
new Builder[HNil](Vector.empty).add[String].add[Long].add[Array[Byte]]
}

View File

@ -0,0 +1,40 @@
/*
* Copyright (C) 2017 Fluence Labs Limited
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package fluence.codec.kryo
import com.twitter.chill.{ AllScalaRegistrar, KryoBase, KryoInstantiator }
import org.objenesis.strategy.StdInstantiatorStrategy
/**
* This Instantiator enable compulsory class registration, registers all java and scala main classes.
* This class required for [[com.twitter.chill.KryoPool]].
* @param classesToReg additional classes for registration
* @param registrationRequired if true, an exception is thrown when an unregistered class is encountered.
*/
private[kryo] case class KryoFactory(classesToReg: Seq[Class[_]], registrationRequired: Boolean) extends KryoInstantiator {
override def newKryo(): KryoBase = {
val kryo = new KryoBase()
kryo.setRegistrationRequired(registrationRequired)
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy())
new AllScalaRegistrar()(kryo)
// in future will be able to create specific fast serializer for each class
kryo.registerClasses(classesToReg)
kryo
}
}

View File

@ -0,0 +1,55 @@
package fluence.codec.kryo
import cats.instances.try_._
import org.scalatest.{ Matchers, WordSpec }
import scala.util.Try
class KryoCodecsSpec extends WordSpec with Matchers {
val testBlob = Array("3".getBytes(), "4".getBytes(), "5".getBytes())
val testClass = TestClass("one", 2, testBlob)
private val testCodecs =
KryoCodecs()
.add[Array[Array[Byte]]]
.addCase(classOf[TestClass]).build[Try]()
"encode and decode" should {
"be inverse functions" when {
"object defined" in {
val codec = testCodecs.codec[TestClass]
val result = codec.encode(testClass).flatMap(codec.decode).get
result.str shouldBe "one"
result.num shouldBe 2
result.blob should contain theSameElementsAs testBlob
}
"object is null" in {
val codec = testCodecs.codec[TestClass]
val result = codec.encode(null).flatMap(codec.decode)
result.isFailure shouldBe true
}
}
}
"encode" should {
"don't write full class name to binary representation" when {
"class registered" in {
//val codec = KryoCodec[TestClass](Seq(classOf[TestClass], classOf[Array[Byte]], classOf[Array[Array[Byte]]]), registerRequired = true)
val codec = testCodecs.codec[TestClass]
val encoded = codec.encode(testClass).map(new String(_)).get
val reasonableMaxSize = 20 // bytes
encoded should not contain "TestClass"
encoded.length should be < reasonableMaxSize
}
}
}
}
case class TestClass(str: String, num: Long, blob: Array[Array[Byte]])