commit 8d4983a636fcca65865e02f725330785d64cf144 Author: Dmitry Kurinskiy Date: Wed Dec 27 15:02:38 2017 +0300 Codec in subproject (#5) * codec in separate module * lisence headers * codec.direct and codec.inverse as Kleisli instances * test compilation error fixed * fork and not execute in parallel * better Codec; some dataset-grpc server implementations * license headers * compile bug fixed * kademlia's Key invariants checking * ContractsAllocatorApiServer implementation * node test fixed * better errors catching in KryoCodecs & disable logging in simulation tests diff --git a/core/src/main/scala/fluence/codec/Codec.scala b/core/src/main/scala/fluence/codec/Codec.scala new file mode 100644 index 0000000..c1d284f --- /dev/null +++ b/core/src/main/scala/fluence/codec/Codec.scala @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.codec + +import cats.data.Kleisli +import cats.{ Applicative, FlatMap, Traverse } +import cats.syntax.applicative._ + +import scala.language.{ higherKinds, implicitConversions } + +/** + * Base trait for serialize/deserialize objects. + * + * @tparam A The type of plain object representation + * @tparam B The type of binary representation + * @tparam F Encoding/decoding effect + */ +final case class Codec[F[_], A, B](encode: A ⇒ F[B], decode: B ⇒ F[A]) { + self ⇒ + + val direct: Kleisli[F, A, B] = Kleisli(encode) + + val inverse: Kleisli[F, B, A] = Kleisli(decode) + + def andThen[C](other: Codec[F, B, C])(implicit F: FlatMap[F]): Codec[F, A, C] = + Codec((self.direct andThen other.direct).run, (other.inverse andThen self.inverse).run) +} + +object Codec { + implicit def identityCodec[F[_] : Applicative, T]: Codec[F, T, T] = + Codec(_.pure[F], _.pure[F]) + + implicit def traverseCodec[F[_] : Applicative, G[_] : Traverse, O, B](implicit codec: Codec[F, O, B]): Codec[F, G[O], G[B]] = + Codec[F, G[O], G[B]](Traverse[G].traverse[F, O, B](_)(codec.encode), Traverse[G].traverse[F, B, O](_)(codec.decode)) + + def codec[F[_], O, B](implicit codec: Codec[F, O, B]): Codec[F, O, B] = codec + + /** + * Constructs a Codec from pure encode/decode functions and an Applicative + * + * @param encodeFn Encode function that never fail + * @param decodeFn Decode function that never fail + * @tparam F Applicative effect + * @tparam O Raw type + * @tparam B Encoded type + * @return New codec for O and B + */ + def pure[F[_] : Applicative, O, B](encodeFn: O ⇒ B, decodeFn: B ⇒ O): Codec[F, O, B] = + Codec(encodeFn(_).pure[F], decodeFn(_).pure[F]) +} diff --git a/kryo/src/main/scala/fluence/codec/kryo/KryoCodecs.scala b/kryo/src/main/scala/fluence/codec/kryo/KryoCodecs.scala new file mode 100644 index 0000000..64b53cd --- /dev/null +++ b/kryo/src/main/scala/fluence/codec/kryo/KryoCodecs.scala @@ -0,0 +1,105 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.codec.kryo + +import cats.MonadError +import cats.syntax.flatMap._ +import com.twitter.chill.KryoPool +import fluence.codec.Codec +import shapeless._ + +import scala.language.higherKinds +import scala.reflect.ClassTag + +/** + * Wrapper for a KryoPool with a list of registered classes + * @param pool Pre-configured KryoPool + * @param F Applicative error + * @tparam L List of classes registered with kryo + * @tparam F Effect + */ +class KryoCodecs[F[_], L <: HList] private (pool: KryoPool)(implicit F: MonadError[F, Throwable]) { + + /** + * Returns a codec for any registered type + * @param sel Shows the presence of type T within list L + * @tparam T Object type + * @return Freshly created Codec with Kryo inside + */ + implicit def codec[T](implicit sel: ops.hlist.Selector[L, T]): Codec[F, T, Array[Byte]] = + Codec( + obj ⇒ Option(obj) match { + case Some(o) ⇒ + F.catchNonFatal(Option(pool.toBytesWithClass(o))).flatMap { + case Some(v) ⇒ F.pure(v) + case None ⇒ F.raiseError(new NullPointerException("Obj is encoded into null")) + } + case None ⇒ + F.raiseError[Array[Byte]](new NullPointerException("Obj is null, encoding is impossible")) + }, binary ⇒ F.catchNonFatal(pool.fromBytes(binary).asInstanceOf[T])) +} + +object KryoCodecs { + + /** + * Builder for Kryo codecs + * @param klasses Classes to register with Kryo + * @tparam L List of registered classes + */ + class Builder[L <: HList] private[KryoCodecs] (klasses: Seq[Class[_]]) { + /** + * Register a new case class T to Kryo + * @tparam T Type to add + * @tparam S Generic representation of T + * @param gen Generic representation of case type T + * @param sa Presence of all types of S inside L + * @return Extended builder + */ + def addCase[T, S <: HList](klass: Class[T])(implicit gen: Generic.Aux[T, S], sa: ops.hlist.SelectAll[L, S]): Builder[T :: L] = + new Builder[T :: L](klasses :+ klass) + + /** + * Register a primitive type T to Kryo + * @tparam T Type to add + * @return Extended builder + */ + def add[T : ClassTag]: Builder[T :: L] = + new Builder[T :: L](klasses :+ implicitly[ClassTag[T]].runtimeClass) + + /** + * Build a new instance of KryoCodecs with the given poolSize and F effect + * @param poolSize Kryo pool size + * @param F ApplicativeError for catching serialization errors + * @tparam F Effect type + * @return Configured instance of KryoCodecs + */ + def build[F[_]](poolSize: Int = Runtime.getRuntime.availableProcessors)(implicit F: MonadError[F, Throwable]): KryoCodecs[F, L] = + new KryoCodecs[F, L]( + KryoPool.withByteArrayOutputStream( + poolSize, + KryoFactory(klasses, registrationRequired = true) // registrationRequired should never be needed, as codec derivation is typesafe + ) + ) + } + + /** + * Prepares a fresh builder + */ + def apply(): Builder[Array[Byte] :: Long :: String :: HNil] = + new Builder[HNil](Vector.empty).add[String].add[Long].add[Array[Byte]] +} diff --git a/kryo/src/main/scala/fluence/codec/kryo/KryoFactory.scala b/kryo/src/main/scala/fluence/codec/kryo/KryoFactory.scala new file mode 100644 index 0000000..8c6cb39 --- /dev/null +++ b/kryo/src/main/scala/fluence/codec/kryo/KryoFactory.scala @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2017 Fluence Labs Limited + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package fluence.codec.kryo + +import com.twitter.chill.{ AllScalaRegistrar, KryoBase, KryoInstantiator } +import org.objenesis.strategy.StdInstantiatorStrategy + +/** + * This Instantiator enable compulsory class registration, registers all java and scala main classes. + * This class required for [[com.twitter.chill.KryoPool]]. + * @param classesToReg additional classes for registration + * @param registrationRequired if true, an exception is thrown when an unregistered class is encountered. + */ +private[kryo] case class KryoFactory(classesToReg: Seq[Class[_]], registrationRequired: Boolean) extends KryoInstantiator { + + override def newKryo(): KryoBase = { + val kryo = new KryoBase() + kryo.setRegistrationRequired(registrationRequired) + kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()) + new AllScalaRegistrar()(kryo) + // in future will be able to create specific fast serializer for each class + kryo.registerClasses(classesToReg) + kryo + } +} diff --git a/kryo/src/test/scala/fluence/codec/kryo/KryoCodecsSpec.scala b/kryo/src/test/scala/fluence/codec/kryo/KryoCodecsSpec.scala new file mode 100644 index 0000000..63569e8 --- /dev/null +++ b/kryo/src/test/scala/fluence/codec/kryo/KryoCodecsSpec.scala @@ -0,0 +1,55 @@ +package fluence.codec.kryo + +import cats.instances.try_._ +import org.scalatest.{ Matchers, WordSpec } + +import scala.util.Try + +class KryoCodecsSpec extends WordSpec with Matchers { + + val testBlob = Array("3".getBytes(), "4".getBytes(), "5".getBytes()) + val testClass = TestClass("one", 2, testBlob) + + private val testCodecs = + KryoCodecs() + .add[Array[Array[Byte]]] + .addCase(classOf[TestClass]).build[Try]() + + "encode and decode" should { + "be inverse functions" when { + "object defined" in { + + val codec = testCodecs.codec[TestClass] + + val result = codec.encode(testClass).flatMap(codec.decode).get + + result.str shouldBe "one" + result.num shouldBe 2 + result.blob should contain theSameElementsAs testBlob + } + + "object is null" in { + val codec = testCodecs.codec[TestClass] + val result = codec.encode(null).flatMap(codec.decode) + result.isFailure shouldBe true + } + } + } + + "encode" should { + "don't write full class name to binary representation" when { + "class registered" in { + //val codec = KryoCodec[TestClass](Seq(classOf[TestClass], classOf[Array[Byte]], classOf[Array[Array[Byte]]]), registerRequired = true) + val codec = testCodecs.codec[TestClass] + val encoded = codec.encode(testClass).map(new String(_)).get + val reasonableMaxSize = 20 // bytes + encoded should not contain "TestClass" + encoded.length should be < reasonableMaxSize + } + } + + } +} + +case class TestClass(str: String, num: Long, blob: Array[Array[Byte]]) +