tmdemo-initial

This commit is contained in:
Dmitry Sergeev 2018-06-05 13:15:12 +05:00
parent 0fa27f5139
commit 08dc8574f5
14 changed files with 826 additions and 1 deletions

101
.gitignore vendored Normal file
View File

@ -0,0 +1,101 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# dotenv
.env
# virtualenv
.venv
venv/
ENV/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/

View File

@ -1,2 +1,4 @@
# tendermint_demo
# Tendermint research
Research artifacts and tools for Tendermint
[Tendermint demo application](https://github.com/fluencelabs/tendermint_demo/tree/master/tmdemoapp)

101
parse/parse_block.py Normal file
View File

@ -0,0 +1,101 @@
import sys, urllib, json, datetime, time
from parse_common import uvarint, parseutc, formatbytes, readjson, getmaxheight
if len(sys.argv) < 2:
print "usage: python parse_block.py host:port minheight [maxheight]"
sys.exit()
tmaddress = sys.argv[1]
minheight = int(sys.argv[2])
maxheight = int(sys.argv[3]) if len(sys.argv) > 3 else getmaxheight(tmaddress)
accsize = 0
acclatency = 0
minlatency = 1e20
maxlatency = 0
txcount = 0
blockcount = 0
firsttx = 1e20
lasttx = 0
firstblock = 1e20
lastblock = 0
maxblocksize = 0
txstat = []
for height in range(minheight, maxheight + 1):
data = readjson(tmaddress + "/block?height=%d" % height)
numtxs = data["result"]["block"]["header"]["num_txs"]
blocktimetxt = data["result"]["block"]["header"]["time"]
blocktime = parseutc(blocktimetxt)
if numtxs > 0:
firstblock = min(firstblock, blocktime)
lastblock = max(lastblock, blocktime)
blockcount += 1
maxblocksize = max(maxblocksize, numtxs)
print height, numtxs, blocktimetxt
txs = data["result"]["block"]["data"]["txs"]
if txs:
for index, txhex in enumerate(txs):
txbytes = bytearray.fromhex(txhex)
key = chr(txbytes[0]) if chr(txbytes[1]) == '=' else "*"
connindex = uvarint(txbytes[2:8])
txnumber = uvarint(txbytes[8:16])
hostnamehash = txhex[32:64]
txtime = uvarint(txbytes[32:40]) / 1e6
if txtime < 1e9:
txtime *= 1e6 # legacy support
latency = blocktime - txtime
accsize += len(txbytes)
acclatency += latency
minlatency = min(minlatency, latency)
maxlatency = max(maxlatency, latency)
txcount += 1
firsttx = min(firsttx, txtime)
lasttx = max(lasttx, txtime)
txtimetxt = datetime.datetime.fromtimestamp(txtime)
txstat.append((txtime, 1))
txstat.append((blocktime, -1))
if index < 5:
print key, connindex, txnumber, hostnamehash, txtimetxt, latency
print "Transactions: ", txcount, "=", formatbytes(accsize)
print " ", "%.3f s" % (lasttx - firsttx), "from", datetime.datetime.fromtimestamp(firsttx), "to", datetime.datetime.fromtimestamp(lasttx)
print "Blocks: ", blockcount
print " ", "%.3f s" % (lastblock - firstblock), "from", datetime.datetime.fromtimestamp(firstblock), "to", datetime.datetime.fromtimestamp(lastblock)
print "Tx send rate: ", "%.3f tx/s" % (txcount / (lasttx - firsttx)), "=", formatbytes(accsize / (lasttx - firsttx)) + "/s"
print "Tx throughput: ", "%.3f tx/s" % (txcount / (lastblock - firsttx)), "=", formatbytes(accsize / (lastblock - firsttx)) + "/s"
print "Block throughput:", "%.3f block/s" % (blockcount / (lastblock - firsttx))
print "Avg tx latency: ", "%.3f s" % (acclatency / txcount)
print "Min tx latency: ", "%.3f s" % minlatency
print "Max tx latency: ", "%.3f s" % maxlatency
txstat = sorted(txstat)
cursum = 0
curindex = 0
steps = 1000
stepstat = []
for i in range(steps + 1):
t = firsttx + (lastblock - firsttx) / steps * i
while curindex < len(txstat) and txstat[curindex][0] <= t:
cursum += txstat[curindex][1]
curindex += 1
stepstat.append(cursum)
import matplotlib.pyplot as plt
f = plt.figure(figsize=(15, 5))
plt.plot([i * (lastblock - firsttx) / steps for i in range(steps + 1)], stepstat)
plt.title("Duration: %.1f s, Tx size: %s, Tx send rate: %.3f tx/s = %s/s, Tx throughput: %.3f tx/s = %s/s" %
(lasttx - firsttx, formatbytes(accsize / txcount),
txcount / (lasttx - firsttx), formatbytes(accsize / (lasttx - firsttx)),
txcount / (lastblock - firsttx), formatbytes(accsize / (lastblock - firsttx))))
plt.xlabel("seconds from first tx")
plt.ylabel("txs in backlog")
f.savefig("tdmnt-stat-%d-%d-%d-%.1f-%.0f-%.0f.pdf" %
(minheight, maxheight, maxblocksize, lasttx - firsttx, accsize / txcount, txcount / (lasttx - firsttx)), bbox_inches='tight')
plt.show(block=True)

46
parse/parse_chain.py Normal file
View File

@ -0,0 +1,46 @@
import sys, urllib, json, datetime, time
from parse_common import parseutc, readjson, getmaxheight
if len(sys.argv) < 2:
print "usage: python parse_chain.py host:port [minheight]"
sys.exit()
blocks_fetch = 20 # tendermint can't return more blocks
tmaddress = sys.argv[1]
maxheight = getmaxheight(tmaddress)
minheight = int(sys.argv[2]) if len(sys.argv) > 2 else max(1, maxheight - 49)
lastempty = -1
last_fetched_height = minheight - 1
for height in range(minheight, maxheight + 1):
if height > last_fetched_height:
last_fetched_height = min(height + blocks_fetch - 1, maxheight)
bulk_data = (readjson(tmaddress + "/blockchain?minHeight=%d&maxHeight=%d" % (height, last_fetched_height)))["result"]["block_metas"]
data = bulk_data[last_fetched_height - height]["header"]
numtxs = data["num_txs"]
totaltxs = data["total_txs"]
app_hash = data["app_hash"]
blocktimetxt = data["time"]
blocktime = parseutc(blocktimetxt)
if numtxs > 0 or height == maxheight:
blockdata = readjson(tmaddress + "/block?height=%d" % height)
txs = blockdata["result"]["block"]["data"]["txs"]
txsummary = ""
if txs:
for tx in txs[0:5]:
txstr = tx.decode('base64')
if len(txstr) > 30:
txsummary += "%27s... " % txstr[0:27]
else:
txsummary += "%30s " % txstr
if len(txs) > 5:
txsummary += "..."
print "%5s: %s %7d %7d %s... %s" % (height, datetime.datetime.fromtimestamp(blocktime), numtxs, totaltxs, app_hash[0:6], txsummary)
else:
if lastempty < height - 1:
print "..."
lastempty = height

46
parse/parse_common.py Normal file
View File

@ -0,0 +1,46 @@
import sys, urllib, json, datetime, time
def uvarint(buf):
x = long(0)
s = 0
for b in buf:
if b < 0x80:
return x | long(b) << s
x |= long(b & 0x7f) << s
s += 7
return 0
def parseutc(utctxt):
#tz conversion may be wrong
now_timestamp = time.time()
offset = datetime.datetime.fromtimestamp(now_timestamp) - datetime.datetime.utcfromtimestamp(now_timestamp)
dt, _, tail = utctxt.partition(".")
if tail == "":
dt, _, _ = utctxt.partition("Z")
tail = "0Z"
pure = int((datetime.datetime.strptime(dt, '%Y-%m-%dT%H:%M:%S') + offset).strftime("%s"))
ns = int(tail.rstrip("Z").ljust(9, "0"), 10)
return pure + ns / 1e9
def formatbytes(value):
if value < 1024:
return "%.0f B" % value
elif value < 1024 * 1024:
return "%.3f KiB" % (value / 1024.0)
else:
return "%.3f MiB" % (value / 1024.0 / 1024.0)
def readjson(url):
response = urllib.urlopen("http://" + url)
return json.loads(response.read())
def getsyncinfo(tmaddress):
status = readjson(tmaddress + "/status")["result"]
if "sync_info" in status: # compatibility
return status["sync_info"]
else:
return status
def getmaxheight(tmaddress):
return getsyncinfo(tmaddress)["latest_block_height"]

53
parse/query.py Normal file
View File

@ -0,0 +1,53 @@
import sys, urllib, json, datetime, time
from parse_common import readjson, getsyncinfo
CMD_TX = "tx"
CMD_TX_VERIFY = "txverify"
CMD_QUERY = "query"
def abci_query(tmaddress, height, query):
response = readjson(tmaddress + '/abci_query?height=' + str(height) + '&data="' + query + '"')["result"]["response"]
return (
response["value"].decode('base64') if "value" in response else None,
response["proof"].decode('base64') if "proof" in response else None
)
if len(sys.argv) < 3:
print "usage: python query.py host:port command arg"
sys.exit()
tmaddress = sys.argv[1]
command = sys.argv[2]
arg = sys.argv[3]
if command in {CMD_TX, CMD_TX_VERIFY}:
response = readjson(tmaddress + '/broadcast_tx_commit?tx="' + arg + '"')
if "error" in response:
print "ERROR :", response["error"]["data"]
else:
height = response["result"]["height"]
if response["result"].get("deliver_tx", {}).get("code", "0") != "0":
log = response["result"].get("deliver_tx", {}).get("log")
print "BAD"
print "HEIGHT:", height
print "LOG: ", log or "EMPTY"
else:
info = response["result"].get("deliver_tx", {}).get("info")
print "OK"
print "HEIGHT:", height
print "INFO: ", info or "EMPTY"
if command == CMD_TX_VERIFY and info is not None:
query_key = arg.split("=")[0]
query_response = abci_query(tmaddress, height, "get:" + query_key)
print "VERIFY:", query_response[0] or "EMPTY"
print "PROOF :", query_response[1] or "NO_PROOF"
elif command == CMD_QUERY:
syncinfo = getsyncinfo(tmaddress)
height = syncinfo["latest_block_height"]
apphash = syncinfo["latest_app_hash"]
print "HEIGHT:", height
print "HASH :", apphash
query_response = abci_query(tmaddress, height, arg)
print "RESULT:", query_response[0] or "EMPTY"
print "PROOF :", query_response[1] or "NO_PROOF"

132
tmdemoapp/README.md Normal file
View File

@ -0,0 +1,132 @@
# Tendermint Demo ABCI KVStore on Scala
This is demo application implementing Tendermint ABCI interface. It models in-memory key-value string storage. Key here are hierarchical, `/`-separated. This key hierarchy is *merkelized*, so every node stores Merkle hash of its associated value (if present) and its children.
The application is compatible with `Tendermint v0.19.x` and uses `com.github.jtendermint.jabci` for Java ABCI definitions.
## Installation and running
For single-node run just launch the application:
```bash
sbt run
```
And launch Tendermint:
```bash
# uncomment line below to initialize Tendermint
#tendermint init
# uncomment line below to clear all Tendermint data
#tendermint unsafe_reset_all
tendermint node --consensus.create_empty_blocks=false
```
In case Tendermint launched first, it would periodically try to connect the app until the app started.
## Changing and observing the application state: transactions and queries
Tendermint offers two main ways of interaction with the app: transactions and queries.
Transactions are treated by Tendermint just like arrays of bytes and stored in the blockchain after block formation also just like arrays of bytes. The transaction semantics only make sense for the application once Tendermint delivers a transaction to it. A transaction could (and usually does) change the application state upon being committed and could provide some metadata to verify that it's actually added to the blockchain and applied to the state. However in order to get some trustful information about the committed transaction result one needs to query the blockchain explicitly.
Queries, in comparison with transactions, do not change the state and are not stored in the blockchain. Queries can only be applied to already committed state that's why they could be used in order to get trustful information (signed by quorum during voting for one of existing blocks) just requesting only a single node.
For working with transactions and queries use Python scripts in [`parse`](https://github.com/fluencelabs/tendermint_research/tree/master/parse) directory.
## Making transactions
To set a new key-value mapping use:
```bash
python query.py localhost:46657 tx a/b=10
...
OK
HEIGHT: 2
INFO: 10
```
This would create hierarchical key `a/b` (if necessary) and map it to `10`. `HEIGHT` value could be used later to verify the `INFO` by querying the blockchain.
This script would output the height value corresponding to provided transaction. The height is available upon executing because `query.py` script uses `broadcast_tx_commit` RPC to send transactions to Tendermint. You can later find the latest transactions by running:
```bash
python parse_chain.py localhost:46657
```
This command would output last 50 non-empty blocks in chain with short summary about transactions. Here you can ensure that provided transaction indeed included in the block with height from response. This fact verifies that Tendermint majority (more than 2/3 of configured validator nodes) agreed on including this transaction in the mentioned block which certified by their signatures. Signature details (including information about all Consensus rounds and phases) can be found by requesting Tendermint RPC:
```bash
curl -s 'localhost:46657/block?height=_' # replace _ with actual height number
```
`get` transaction allows to copy a value from one key to another:
```bash
python query.py localhost:46657 tx a/c=get:a/b
...
INFO: 10
```
Submitting an `increment` transaction would increment the referenced key value and copy the old referenced key value to target key:
```bash
python query.py localhost:46657 tx a/d=increment:a/c
...
INFO: 10
```
To prevent Tendermint from declining transaction that repeats one of the previous applied transactions, it's possible to put any characters after `###` at the end of transaction string, this part of string would be ignored:
```bash
python query.py localhost:46657 tx a/d=increment:a/c###again
...
INFO: 11
```
`sum` transaction would sum the values of references keys and assign the result to the target key:
```bash
python query.py localhost:46657 tx a/e=sum:a/c,a/d
...
INFO: 23
```
`factorial` transaction would calculate the factorial of the referenced key value:
```bash
python query.py localhost:46657 tx a/f=factorial:a/b
...
INFO: 3628800
```
`hiersum` transaction would calculate the sum of non-empty values for the referenced key and its descendants by hierarchy (all non-empty values should be integer):
```bash
python query.py localhost:46657 tx c/asum=hiersum:a
...
INFO: 3628856
```
Transactions are not applied in case of wrong arguments (non-integer values to `increment`, `sum`, `factorial` or wrong number of arguments). Transactions with a target key like `get`, `increment`, `sum`, `factorial` return the new value of the target key as `INFO`, but this values cannot be trusted if the serving node is not reliable. To verify the returned `INFO` one needs to `query` the target key explicitly.
In case of massive broadcasting of multiple transactions via `broadcast_tx_sync` or `broadcast_tx_async` RPC, the app would not calculate Merkle hashes during `DeliverTx` processing. Instead it would modify key tree and mark changed paths by clearing Merkle hashes until ABCI `Commit` processing. On `Commit` the app would recalculate Merkle hash along changed paths only. Finally the app would return the resulting root Merkle hash to Tendermint and this hash would be stored as `app_hash` for corresponding height in the blockchain.
Note that described merkelized structure is just for demo purposes and not self-balanced, it would remain efficient only until it the user transactions keep it relatively balanced. Something like [Patricia tree](https://github.com/ethereum/wiki/wiki/Patricia-Tree) should be more appropriate to achieve self-balancing.
## Making queries
Use `get:` queries to read values from KVStore:
```bash
python query.py localhost:46657 query get:a/e
...
RESULT: 23
```
Use `ls:` queries to read key hierarchy:
```bash
python query.py localhost:46657 query ls:a
...
RESULT: e f b c d
```
These commands implemented by requesting `abci_query` RPC (which immediately proxies to ABCI `Query` in the app). Together with requested information the app method would return Merkle proof of this information. This Merkle proof is comma-separated list (`<level-1-proof>,<level-2-proof>,...`) of level proofs along the path to the requested key. For this implementation SHA-3 of a level in the list is exactly:
* either one of the space-separated item from the upper (the previous in comma-separated list) level proof;
* or the root app hash for the uppermost (the first) level proof.
The app stores historical changes and handle queries for any particular height. The requested height (the latest by default) and the corresponding `app_hash` also returned for `query` Python script. This combination (result, Merkle proof and `app_hash` from the blockchain) verifies the correctness of the result (because this `app_hash` could only appear in the blockchain as a result of Tendermint quorum consistent decision).
## Heavy-weight transactions
Applying simple transactions with different target keys makes the sizes of the blockchain (which contains transaction list) and the app state relatively close to each other. If target keys are often repeated, the blockchain size would become much larger than the app state size. To demonstrate the opposite situating (the app state much larger than the blockchain) *range* transactions are supported:
```bash
python query.py localhost:46657 tx 0-200:b/@1/@0=1
...
INFO: 1
```
Here `0-200:` prefix means that this transaction should consist of 200 subsequent key-value mappings, each of them obtained by applying a template `b/@1/@0=1` to a counter from 0 to 199, inclusive. `@0` and `@1` are substitution markers for the two lowermost hexadecimal digits of the counter. I. e. this transaction would create 200 keys: `b/0/0`, `b/0/1`, ..., `b/c/7` and put `1` to each of them.
We can check the result by querying the hierarchical sum of `b` children:
```bash
python query.py localhost:46657 tx c/bsum=hiersum:b
...
INFO: 200
```

9
tmdemoapp/build.sbt Normal file
View File

@ -0,0 +1,9 @@
name := "tmdemoapp"
version := "0.1"
scalaVersion := "2.12.6"
libraryDependencies += "com.github.jtendermint" % "jabci" % "0.17.1"
libraryDependencies += "org.bouncycastle" % "bcpkix-jdk15on" % "1.56"

View File

@ -0,0 +1 @@
sbt.version = 1.1.5

View File

@ -0,0 +1,131 @@
package kvstore
import java.nio.ByteBuffer
import com.github.jtendermint.jabci.api._
import com.github.jtendermint.jabci.socket.TSocket
import com.github.jtendermint.jabci.types.{ResponseCheckTx, _}
import com.google.protobuf.ByteString
import scala.collection.mutable.ArrayBuffer
object KVStoreServerRunner extends IDeliverTx with ICheckTx with ICommit with IQuery {
def main(args: Array[String]): Unit = {
KVStoreServerRunner.start()
}
private val storage: ArrayBuffer[Node] = new ArrayBuffer[Node]()
private var consensusRoot: Node = Node.emptyNode
@volatile
private var mempoolRoot: Node = Node.emptyNode
def start(): Unit = {
System.out.println("starting KVStore")
val socket = new TSocket
socket.registerListener(this)
val t = new Thread(() => socket.start(46658))
t.setName("KVStore server Main Thread")
t.start()
while (true) {
Thread.sleep(1000L)
}
}
override def receivedDeliverTx(req: RequestDeliverTx): ResponseDeliverTx = {
val tx = req.getTx.toStringUtf8
val txPayload = tx.split("###")(0)
val binaryOpPattern = "(.+)=(.+):(.*),(.*)".r
val unaryOpPattern = "(.+)=(.+):(.*)".r
val plainValuePattern = "(.+)=(.*)".r
val result = txPayload match {
case "BAD_DELIVER" => Left("BAD_DELIVER")
case binaryOpPattern(key, op, arg1, arg2) =>
op match {
case "sum" => SumOperation(arg1, arg2)(consensusRoot, key)
case _ => Left("Unknown binary op")
}
case unaryOpPattern(key, op, arg) =>
op match {
case "get" => GetOperation(arg)(consensusRoot, key)
case "increment" => IncrementOperation(arg)(consensusRoot, key)
case "factorial" => FactorialOperation(arg)(consensusRoot, key)
case "hiersum" => HierarchicalSumOperation(arg)(consensusRoot, key)
case _ => Left("Unknown unary op")
}
case plainValuePattern(key, value) => SetValueOperation(value)(consensusRoot, key)
case key => SetValueOperation(key)(consensusRoot, key)
}
result match {
case Right((newRoot, info)) =>
consensusRoot = newRoot
ResponseDeliverTx.newBuilder.setCode(CodeType.OK).setInfo(info).build
case Left(message) => ResponseDeliverTx.newBuilder.setCode(CodeType.BAD).setLog(message).build
}
}
override def requestCheckTx(req: RequestCheckTx): ResponseCheckTx = {
// check mempoolRoot
val tx = req.getTx.toStringUtf8
if (tx == "BAD_CHECK") {
System.out.println(s"CheckTx: $tx BAD")
ResponseCheckTx.newBuilder.setCode(CodeType.BAD).setLog("BAD_CHECK").build
} else {
System.out.println(s"CheckTx: $tx OK")
ResponseCheckTx.newBuilder.setCode(CodeType.OK).build
}
}
override def requestCommit(requestCommit: RequestCommit): ResponseCommit = {
consensusRoot = consensusRoot.merkelize()
val buf = ByteBuffer.allocate(MerkleUtil.merkleSize)
buf.put(consensusRoot.merkleHash.get)
buf.rewind
storage.append(consensusRoot)
mempoolRoot = consensusRoot
ResponseCommit.newBuilder.setData(ByteString.copyFrom(buf)).build
}
override def requestQuery(req: RequestQuery): ResponseQuery = {
val height = if (req.getHeight != 0) req.getHeight.toInt - 1 else storage.size - 1
val root = storage(height)
val getPattern = "get:(.*)".r
val lsPattern = "ls:(.*)".r
val query = req.getData.toStringUtf8
query match {
case getPattern(key) =>
val result = root.getValue(key)
val proof = if (result.isDefined && req.getProve) twoLevelMerkleListToString(root.getProof(key)) else ""
ResponseQuery.newBuilder.setCode(CodeType.OK)
.setValue(ByteString.copyFromUtf8(result.getOrElse("")))
.setProof(ByteString.copyFromUtf8(proof))
.build
case lsPattern(key) =>
val result = root.listChildren(key)
val proof = if (result.isDefined && req.getProve) twoLevelMerkleListToString(root.getProof(key)) else ""
ResponseQuery.newBuilder.setCode(CodeType.OK)
.setValue(ByteString.copyFromUtf8(result.map(x => x.mkString(" ")).getOrElse("")))
.setProof(ByteString.copyFromUtf8(proof))
.build
case _ =>
ResponseQuery.newBuilder.setCode(CodeType.BAD).setLog("Invalid query path. Got " + query).build
}
}
private def twoLevelMerkleListToString(list: List[List[MerkleHash]]): String =
list.map(level => level.map(MerkleUtil.merkleHashToHex).mkString(" ")).mkString(", ")
}

View File

@ -0,0 +1,22 @@
package kvstore
import org.bouncycastle.jcajce.provider.digest.SHA3
sealed trait MerkleMergeRule
case object BINARY_BASED_MERKLE_MERGE extends MerkleMergeRule
case object HEX_BASED_MERKLE_MERGE extends MerkleMergeRule
object MerkleUtil {
val merkleSize: Int = 32
def singleMerkle(data: String): MerkleHash = new SHA3.Digest256().digest(data.getBytes)
def mergeMerkle(parts: List[MerkleHash], mergeRule: MerkleMergeRule): MerkleHash =
mergeRule match {
case BINARY_BASED_MERKLE_MERGE => new SHA3.Digest256().digest(parts.fold(new Array[Byte](0))(_ ++ _))
case HEX_BASED_MERKLE_MERGE => new SHA3.Digest256().digest(parts.map(merkleHashToHex).mkString(" ").getBytes)
}
def merkleHashToHex(merkleHash: MerkleHash): String =
merkleHash.map("%02x".format(_)).mkString
}

View File

@ -0,0 +1,101 @@
package kvstore
import kvstore.MerkleUtil._
import scala.collection.immutable.HashMap
import scala.util.Try
case class Node(children: NodeStorage, value: Option[String], merkleHash: Option[MerkleHash]) {
def merkelize(): Node =
if (merkleHash.isDefined)
this
else {
val newChildren = children.mapValues(x => x.merkelize())
val withNewChildren = Node(newChildren, value, None)
Node(newChildren, value, Some(mergeMerkle(withNewChildren.merkleItems(), HEX_BASED_MERKLE_MERGE)))
}
private def merkleItems(): List[MerkleHash] =
singleMerkle(value.getOrElse("")) :: children.flatMap(x => List(singleMerkle(x._1), x._2.merkleHash.get)).toList
def getProof(key: String): List[List[MerkleHash]] = {
if (key.isEmpty)
List(merkleItems())
else {
val (next, rest) = splitPath(key)
merkleItems() :: children(next).getProof(rest)
}
}
def longValue: Option[Long] = value.flatMap(x => Try(x.toLong).toOption)
def add(key: String, value: String): Node = {
val rangeKeyValuePattern = "(\\d{1,8})-(\\d{1,8}):(.+)".r
key match {
case rangeKeyValuePattern(rangeStartStr, rangeEndStr, keyPattern) =>
val rangeStart = rangeStartStr.toInt
val rangeEnd = rangeEndStr.toInt
System.out.println(s"setting range from=$rangeStart to=$rangeEnd keyPattern=$keyPattern valuePattern=$value")
var currentNode = this
for (index <- rangeStart until rangeEnd) {
var key = keyPattern
var effectiveValue = value
for (hexPosition <- 0 to 6) {
val target = "@" + hexPosition
val replacement = ((index >> hexPosition * 4) & 0xf).toHexString
key = key.replace(target, replacement)
effectiveValue = effectiveValue.replace(target, replacement)
}
System.out.println(s"setting key=$key value=$effectiveValue")
currentNode = currentNode.addValue(key, effectiveValue)
}
currentNode
case _ =>
System.out.println(s"setting key=$key value=$value")
addValue(key, value)
}
}
private def addValue(key: String, value: String): Node = {
if (key.isEmpty)
Node(children, Some(value), None)
else {
val (next, rest) = splitPath(key)
Node(children + (next -> children.getOrElse(next, Node.emptyNode).addValue(rest, value)), this.value, None)
}
}
def getNode(key: String): Option[Node] = {
if (key.isEmpty)
Some(this)
else {
val (next, rest) = splitPath(key)
children.get(next).flatMap(_.getNode(rest))
}
}
def getValue(key: String): Option[String] = getNode(key).flatMap(_.value)
def getLongValue(key: String): Option[Long] = getNode(key).flatMap(_.longValue)
def listChildren(key: String): Option[List[String]] = {
if (key.isEmpty)
Some(children.keys.toList)
else {
val (next, rest) = splitPath(key)
children.get(next).flatMap(_.listChildren(rest))
}
}
private def splitPath(path: String): (String, String) = {
val (next, rest) = path.span(_ != '/')
(next, rest.replaceFirst("/", ""))
}
}
object Node {
val emptyNode: Node = Node(HashMap.empty[String, Node], None, None)
}

View File

@ -0,0 +1,74 @@
package kvstore
trait Operation {
def apply(root: Node, targetKey: String): Either[String, (Node, String)]
}
case class SetValueOperation(value: String) extends Operation {
override def apply(root: Node, targetKey: String): Either[String, (Node, String)] = {
System.out.println(s"process set value=$value")
Right(root.add(targetKey, value), value)
}
}
case class GetOperation(arg: String) extends Operation {
override def apply(root: Node, targetKey: String): Either[String, (Node, String)] = {
System.out.println(s"process get arg=$arg")
root.getValue(arg)
.toRight("Wrong argument")
.map(value => (root.add(targetKey, value), value))
}
}
case class IncrementOperation(arg: String) extends Operation {
override def apply(root: Node, targetKey: String): Either[String, (Node, String)] = {
System.out.println(s"process increment arg=$arg")
root.getLongValue(arg)
.map(value => (value.toString, (value + 1).toString))
.toRight("Wrong argument")
.map(values => (root.add(targetKey, values._1).add(arg, values._2), values._1))
}
}
case class FactorialOperation(arg: String) extends Operation {
override def apply(root: Node, targetKey: String): Either[String, (Node, String)] = {
System.out.println(s"process factorial arg=$arg")
root.getLongValue(arg)
.filter(_ >= 0)
.map(value => (1L to value).product.toString)
.toRight("Wrong argument")
.map(factorial => (root.add(targetKey, factorial), factorial))
}
}
case class HierarchicalSumOperation(arg: String) extends Operation {
override def apply(root: Node, targetKey: String): Either[String, (Node, String)] = {
System.out.println(s"process hierarchical sum arg=$arg")
root.getNode(arg)
.flatMap(calculate)
.map(_.toString)
.toRight("Wrong argument")
.map(sum => (root.add(targetKey, sum), sum))
}
private def calculate(node: Node): Option[Long] = {
val nodeValue = if (node.value.isDefined) node.longValue else Some(0L)
val childrenValues = node.children.values.foldLeft(Option(0L))((acc, node) => acc.flatMap(x => calculate(node).map(_ + x)))
nodeValue.flatMap(x => childrenValues.map(x + _))
}
}
case class SumOperation(arg1: String, arg2: String) extends Operation {
override def apply(root: Node, targetKey: String): Either[String, (Node, String)] = {
System.out.println(s"process sum arg1=$arg1 arg2=$arg2")
val value = for {
arg1Value <- root.getLongValue(arg1)
arg2Value <- root.getLongValue(arg2)
} yield arg1Value + arg2Value
value
.map(_.toString)
.toRight("Wrong arguments")
.map(sum => (root.add(targetKey, sum), sum))
}
}

View File

@ -0,0 +1,6 @@
package object kvstore {
type NodeStorage = Map[String, Node]
type MerkleHash = Array[Byte]
}