Editor state synchronization (WIP)

This commit is contained in:
Pavel Murygin 2021-01-14 13:17:48 +03:00
parent 735c5cbe0c
commit e225a7818d
6 changed files with 180 additions and 322 deletions

View File

@ -16,3 +16,7 @@ textarea {
width: 500px;
height: 200px;
}
.wrapper {
display: flex;
}

View File

@ -1,241 +0,0 @@
import * as Automerge from 'automerge';
import DiffMatchPatch from 'diff-match-patch';
import React, { useEffect, useState } from 'react';
import { connect, fluenceClient } from 'src/fluence';
import * as calls from 'src/fluence/calls';
import { User } from 'src/fluence/calls';
import { fluentPadServiceId } from 'src/fluence/constants';
import './App.scss';
const dmp = new DiffMatchPatch();
const withErrorHandling = (action: Function) => {
return () => {
try {
action();
} catch (e) {
console.log('Error occured: ', e);
}
};
};
const broadcastChanges = async (changes: Automerge.Change[]) => {
const obj = {
fluentPadChanges: changes,
};
const result = await calls.addMessage(JSON.stringify(obj));
console.log(`${changes.length} changes written with result: `, result);
};
const parseState = (message: calls.Message) => {
try {
const obj = JSON.parse(message.body);
if (obj.fluentPadState) {
return Automerge.load(obj.fluentPadState);
}
} catch (e) {
console.log('couldnt parse state format: ' + message.body);
return undefined;
}
};
const applyStates = (startingDoc, messages: calls.Message[]) => {
let res = startingDoc;
for (let m of messages) {
const state = parseState(m) as any;
if (state) {
res = Automerge.merge(res, state);
}
}
return res;
};
const App = () => {
const [isConnected, setIsConnected] = useState(false);
const [isInRoom, setIsInRoom] = useState(false);
const [nickName, setNickName] = useState('myNickName');
const [users, setUsers] = useState<User[]>([]);
const addUserToList = (user: User) => {};
const removeUser = (user: User) => {};
const [editorTextDoc, setEditorTextDoc] = useState(Automerge.from({ value: new Automerge.Text() }));
const amHistory = Automerge.getHistory(editorTextDoc).map((x) => {
return x.snapshot.value;
});
const handleTextUpdate = (e: React.ChangeEvent<HTMLTextAreaElement>) => {
const prevText = editorTextDoc.value.toString();
const newText = e.target.value;
const diff = dmp.diff_main(prevText, newText);
dmp.diff_cleanupSemantic(diff);
const patches = dmp.patch_make(prevText, diff);
const newDoc = Automerge.change(editorTextDoc, (doc) => {
patches.forEach((patch) => {
let idx = patch.start1;
patch.diffs.forEach(([operation, changeText]) => {
switch (operation) {
case 1: // Insertion
doc.value.insertAt!(idx, ...changeText.split(''));
break;
case 0: // No Change
idx += changeText.length;
break;
case -1: // Deletion
for (let i = 0; i < changeText.length; i++) {
doc.value.deleteAt!(idx);
}
break;
}
});
});
});
setImmediate(async () => {
const message = {
fluentPadState: Automerge.save(editorTextDoc),
};
const messageStr = JSON.stringify(message);
const result = await calls.addMessage(messageStr);
console.log(`state written with result: `, result);
});
setEditorTextDoc(newDoc);
};
const joinRoom = withErrorHandling(async () => {
// await calls.joinRoom(nickName);
const users = await calls.getCurrentUsers();
setUsers(users);
const currentUser: User = {
peer_id: fluenceClient.selfPeerId.toB58String(),
relay_id: fluenceClient.relayPeerID.toB58String(),
name: nickName,
};
calls.notifyPeers(users, fluentPadServiceId, 'userJoined', currentUser);
// const history = await calls.getHistory();
// if (history) {
// const newDoc = applyStates(editorTextDoc, history);
// setEditorTextDoc(newDoc);
// }
setIsInRoom(true);
});
const leaveRoom = async () => {
await calls.leaveRoom;
const currentUser: User = {
peer_id: fluenceClient.selfPeerId.toB58String(),
relay_id: fluenceClient.relayPeerID.toB58String(),
name: nickName,
};
calls.notifyPeers(users, fluentPadServiceId, 'userLeft', currentUser);
setIsInRoom(false);
};
const clean = async () => {
for (let u of users) {
const res = await calls.removeUser(u.peer_id);
console.log(res);
}
setIsInRoom(false);
};
useEffect(() => {
const fn = async () => {
await connect();
setIsConnected(true);
fluenceClient.subscribe(fluentPadServiceId, (evt) => {
console.log('got notification: ', evt);
switch (evt.type) {
case 'userJoined':
addUserToList(evt.args[0]);
break;
case 'userLeft':
removeUser(evt.args[0]);
break;
case 'textUpdated':
break;
}
});
};
fn();
return () => {};
}, []);
return (
<div className="App">
<div>
<div>Connection status: {isConnected ? 'connected' : 'disconnected'}</div>
<div>
<label>Nickname: </label>
<input
type="text"
value={nickName}
disabled={isInRoom}
onChange={(e) => {
const name = e.target.value;
setNickName(name);
}}
/>
</div>
<div>
<button disabled={isInRoom} onClick={joinRoom}>
Join Room
</button>
</div>
<div>
<button disabled={!isInRoom} onClick={leaveRoom}>
Leave Room
</button>
</div>
<div>
<button onClick={clean}>Clean</button>
</div>
</div>
<br />
{isInRoom && (
<>
<div>
Users:
<ul>
{users.map((value, index) => (
<li key={value.peer_id}>
{value.name}: {value.peer_id}
</li>
))}
</ul>
</div>
<div>
Editor
<br />
<textarea
disabled={!isInRoom}
onChange={handleTextUpdate}
value={editorTextDoc.value.toString()}
/>
</div>
<div>
History:
<ul>
{amHistory.map((value, index) => (
<li key={index}>{value}</li>
))}
</ul>
</div>
</>
)}
</div>
);
};
export default App;

View File

@ -6,6 +6,7 @@ import './App.scss';
import { FluenceClientContext, useFluenceClient } from './FluenceClientContext';
import { UserList } from './UserList';
import * as calls from 'src/fluence/calls';
import { CollaborativeEditor } from './CollaborativeEditor';
const App = () => {
const [client, setClient] = useState<FluenceClient | null>(null);
@ -67,7 +68,10 @@ const App = () => {
</div>
</div>
<div>{isInRoom && client && <UserList selfName={nickName} />}</div>
<div className="wrapper">
<div>{isInRoom && client && <CollaborativeEditor />}</div>
{/* <div>{isInRoom && client && <UserList selfName={nickName} />}</div> */}
</div>
</div>
</FluenceClientContext.Provider>
);

View File

@ -0,0 +1,130 @@
import * as Automerge from 'automerge';
import DiffMatchPatch from 'diff-match-patch';
import { useEffect, useState } from 'react';
import { fluentPadServiceId, notifyTextUpdateFnName } from 'src/fluence/constants';
import { subscribeToEvent } from 'src/fluence/exApi';
import { useFluenceClient } from './FluenceClientContext';
import * as calls from 'src/fluence/calls';
interface TextDoc {
value: Automerge.Text;
}
const dmp = new DiffMatchPatch();
const getUpdatedDocFromText = (oldDoc: TextDoc | null, newText: string) => {
const prevText = oldDoc ? oldDoc.value.toString() : '';
const diff = dmp.diff_main(prevText, newText);
dmp.diff_cleanupSemantic(diff);
const patches = dmp.patch_make(prevText, diff);
const newDoc = Automerge.change(oldDoc, (doc) => {
patches.forEach((patch) => {
let idx = patch.start1;
patch.diffs.forEach(([operation, changeText]) => {
switch (operation) {
case 1: // Insertion
doc.value.insertAt!(idx, ...changeText.split(''));
break;
case 0: // No Change
idx += changeText.length;
break;
case -1: // Deletion
for (let i = 0; i < changeText.length; i++) {
doc.value.deleteAt!(idx);
}
break;
}
});
});
});
return newDoc;
};
const parseState = (message: calls.Message) => {
try {
const obj = JSON.parse(message.body);
if (obj.fluentPadState) {
return Automerge.load(obj.fluentPadState) as TextDoc;
}
return null;
} catch (e) {
console.log('couldnt parse state format: ' + message.body);
return null;
}
};
const applyStates = (startingDoc: TextDoc | null, messages: calls.Message[]) => {
let res = startingDoc;
for (let m of messages) {
const state = parseState(m) as TextDoc;
if (state) {
if (!res) {
res = state;
} else {
res = Automerge.merge(res, state);
}
}
}
return res;
};
export const CollaborativeEditor = () => {
const client = useFluenceClient()!;
const [text, setText] = useState<TextDoc | null>(null);
useEffect(() => {
const unsub1 = subscribeToEvent(client, fluentPadServiceId, notifyTextUpdateFnName, (args, tetraplets) => {
console.log(args, tetraplets);
// TODO
});
// don't block
calls.getHistory(client).then((res) => {
const newDoc = applyStates(text, res);
setText(newDoc);
});
return () => {
unsub1();
};
}, []);
const amHistory = text
? Automerge.getHistory(text).map((x) => {
return x.snapshot.value;
})
: [];
const textValue = text ? text.value.toString() : '';
const handleTextUpdate = (e: React.ChangeEvent<HTMLTextAreaElement>) => {
const newDoc = getUpdatedDocFromText(text, e.target.value)!;
setText(newDoc);
// don't block
setImmediate(async () => {
const message = {
fluentPadState: Automerge.save(newDoc),
};
const messageStr = JSON.stringify(message);
await calls.addMessage(client, messageStr);
});
};
return (
<div>
<textarea value={textValue} disabled={!text} onChange={handleTextUpdate} />
Automerge changes:
<ul>
{amHistory.map((value, index) => (
<li key={index}>{value}</li>
))}
</ul>
</div>
);
};

View File

@ -4,6 +4,7 @@ import {
fluentPadServiceId,
historyServiceId,
notifyOnlineFnName,
notifyTextUpdateFnName,
notifyUserAddedFnName,
notifyUserRemovedFnName,
servicesNodePid,
@ -202,22 +203,7 @@ export const leaveRoom = async (client: FluenceClient) => {
await sendParticle(client, particle);
};
export const removeUser = async (userPeerId: string) => {
let removeUserAir = `
(call node (userlist "leave") [userPeerId] callResult)
`;
const data = new Map();
data.set('userPeerId', userPeerId);
data.set('userlist', userListServiceId);
data.set('node', servicesNodePid);
const [result] = await fluenceClient.fetch<[ServiceResult]>(removeUserAir, ['callResult'], data);
throwIfError(result);
return result;
};
export const getHistory = async () => {
export const getHistory = async (client: FluenceClient) => {
let getHistoryAir = `
(seq
(call node (userlist "is_authenticated") [] token)
@ -230,73 +216,47 @@ export const getHistory = async () => {
data.set('history', historyServiceId);
data.set('node', servicesNodePid);
const [result] = await fluenceClient.fetch<[GetMessagesResult]>(getHistoryAir, ['messages'], data);
const [result] = await client.fetch<[GetMessagesResult]>(getHistoryAir, ['messages'], data);
throwIfError(result);
return result.messages;
};
export const getCurrentUsers = async () => {
let getUsersAir = `
(call node (userlist "get_users") [] currentUsers)
`;
const data = new Map();
data.set('userlist', userListServiceId);
data.set('node', servicesNodePid);
const [result] = await fluenceClient.fetch<[GetUsersResult]>(getUsersAir, ['currentUsers'], data);
throwIfError(result);
return result.users;
};
export const addMessage = async (messageBody: string) => {
let addMessageAir = `
export const addMessage = async (client: FluenceClient, messageBody: string) => {
const particle = new Particle(
`
(seq
(call myRelay ("op" "identity") [])
(seq
(call node (userlist "is_authenticated") [] token)
(call node (history "add") [message token.$.["is_authenticated"]] callResult)
)
`;
const data = new Map();
data.set('message', messageBody);
data.set('userlist', userListServiceId);
data.set('history', historyServiceId);
data.set('node', servicesNodePid);
const [result] = await fluenceClient.fetch<[ServiceResult]>(addMessageAir, ['callResult'], data);
if (result.ret_code !== 0) {
throw new Error(result.err_msg);
}
return result;
};
export const notifyPeer = async <T>(peerId: string, peerRelayId: string, channel: string, event: string, data?: T) => {
let addMessageAir = `
(seq
(call peerRelayId ("op" "identity") [])
(call peerId (channel event) [${data ? 'data' : ''}])
(call node (history "add") [message token.$.["is_authenticated"]])
(seq
(call node (userlist "get_users") [] allUsers)
(fold allUsers.$.users! u
(par
(seq
(call u.$.relay_id ("op" "identity") [])
(call u.$.peer_id (fluentPadServiceId notifyTextUpdate) [message token.$.["is_authenticated"]])
)
`;
(next u)
)
)
)
)
)
)
`,
{
node: servicesNodePid,
message: messageBody,
userlist: userListServiceId,
history: historyServiceId,
myRelay: client.relayPeerID.toB58String(),
myPeerId: client.selfPeerId.toB58String(),
fluentPadServiceId: fluentPadServiceId,
notifyTextUpdate: notifyTextUpdateFnName,
},
);
const particleData = new Map();
particleData.set('peerId', peerId);
particleData.set('peerRelayId', peerRelayId);
particleData.set('channel', channel);
particleData.set('event', event);
if (data) {
particleData.set('data', data);
}
await fluenceClient.fireAndForget(addMessageAir, particleData);
};
export const notifyPeers = async <T>(
peers: Array<{ peer_id: string; relay_id: string; name: string }>,
channel: string,
event: string,
data?: T,
) => {
for (let p of peers) {
notifyPeer(p.peer_id, p.relay_id, channel, event, data);
}
await sendParticle(client, particle);
};

View File

@ -5,6 +5,7 @@ export const fluentPadServiceId = 'fluence/fluent-pad';
export const notifyOnlineFnName = 'notifyOnline';
export const notifyUserAddedFnName = 'notifyUserAdded';
export const notifyUserRemovedFnName = 'notifyUserRemoved';
export const notifyTextUpdateFnName = 'notifyTextUpdate';
export const userListServiceId = 'd4506f7d-be4a-4332-87b2-eb530f350861';
export const historyServiceId = 'd9abbacf-6ee2-49e5-9683-536a5c931fa1';