From 79ad43315b5b858be9ce5970e7fc592a8ab190a5 Mon Sep 17 00:00:00 2001 From: Pavel Date: Thu, 21 Oct 2021 17:56:21 +0300 Subject: [PATCH] FluencePeer: add option to specify default TTL for all new particles (#91) --- src/__test__/integration/peer.spec.ts | 31 ++++++++++------ src/internal/FluencePeer.ts | 53 +++++++++++++++++---------- src/internal/Particle.ts | 4 +- src/internal/utils.ts | 8 +++- 4 files changed, 61 insertions(+), 35 deletions(-) diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index cde1859f..69a62f29 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -189,7 +189,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('address as multiaddr', async () => { @@ -201,7 +201,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('address as node', async () => { @@ -213,7 +213,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('peerid as peer id', async () => { @@ -225,7 +225,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('peerid as seed', async () => { @@ -237,7 +237,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: dialTimeout', async () => { @@ -249,7 +249,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: skipCheckConnection', async () => { @@ -261,7 +261,7 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); }); it('With connection options: checkConnectionTTL', async () => { @@ -273,7 +273,19 @@ describe('Typescript usage suite', () => { const isConnected = await checkConnection(anotherPeer); // assert - expect(isConnected).toBeTruthy; + expect(isConnected).toBeTruthy(); + }); + + it('With connection options: defaultTTL', async () => { + // arrange + const addr = nodes[0]; + + // act + await anotherPeer.start({ connectTo: addr, defaultTtlMs: 1 }); + const isConnected = await checkConnection(anotherPeer); + + // assert + expect(isConnected).toBeFalsy(); }); }); @@ -297,9 +309,6 @@ describe('Typescript usage suite', () => { resolve(res); }, }, - // op: { - // identity: (req) => {}, - // }, _timeout: reject, }); diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 71035b2f..2f8717da 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -94,6 +94,14 @@ export interface PeerConfig { * The dialing timeout in milliseconds */ dialTimeoutMs?: number; + + /** + * Sets the default TTL for all particles originating from the peer with no TTL specified. + * If the originating particle's TTL is defined then that value will be used + * If the option is not set default TTL will be 7000 + * Value 0 (zero) is treated as if the option was not set + */ + defaultTtlMs?: number; } /** @@ -169,6 +177,8 @@ export class FluencePeer { this._keyPair = await KeyPair.randomEd25519(); } + this._defaultTTL = config?.defaultTtlMs || DEFAULT_TTL; + this._interpreter = await createInterpreter(config?.avmLogLevel || 'off'); if (config?.connectTo) { @@ -234,7 +244,7 @@ export class FluencePeer { } if (particle.ttl === undefined) { - particle.ttl = DEFAULT_TTL; + particle.ttl = this._defaultTTL; } this._incomingParticles.next(particle); @@ -333,37 +343,29 @@ export class FluencePeer { // Internal peer state + private _defaultTTL: number; private _relayPeerId: PeerIdB58 | null = null; private _keyPair: KeyPair; private _connection: FluenceConnection; private _interpreter: AirInterpreter; private _timeouts: Array = []; + private _particleQueues = new Map>(); private _startParticleProcessing() { - const particleQueues = new Map>(); - this._incomingParticles .pipe( tap((x) => x.logTo('debug', 'particle received:')), - filterExpiredParticles(), + filterExpiredParticles(this._expireParticle.bind(this)), ) .subscribe((p) => { - let particlesQueue = particleQueues.get(p.id); + let particlesQueue = this._particleQueues.get(p.id); if (!particlesQueue) { particlesQueue = this._createParticlesProcessingQueue(); - particleQueues.set(p.id, particlesQueue); + this._particleQueues.set(p.id, particlesQueue); const timeout = setTimeout(() => { - log.debug(`particle ${p.id} has expired. Deleting particle-related queues and handlers`); - - particleQueues.delete(p.id); - const timeoutHandler = this._timeoutHandlers.get(p.id); - if (timeoutHandler) { - timeoutHandler(); - } - this._particleSpecificHandlers.delete(p.id); - this._timeoutHandlers.delete(p.id); + this._expireParticle(p.id); }, p.actualTtl()); this._timeouts.push(timeout); @@ -377,6 +379,18 @@ export class FluencePeer { }); } + private _expireParticle(particleId: string) { + log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`); + + this._particleQueues.delete(particleId); + const timeoutHandler = this._timeoutHandlers.get(particleId); + if (timeoutHandler) { + timeoutHandler(); + } + this._particleSpecificHandlers.delete(particleId); + this._timeoutHandlers.delete(particleId); + } + private _createParticlesProcessingQueue() { let particlesQueue = new Subject(); let prevData: Uint8Array = Buffer.from([]); @@ -384,7 +398,7 @@ export class FluencePeer { particlesQueue .pipe( // force new line - filterExpiredParticles(), + filterExpiredParticles(this._expireParticle.bind(this)), ) .subscribe((x) => { const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData); @@ -510,6 +524,7 @@ export class FluencePeer { for (let item of this._timeouts) { clearTimeout(item); } + this._particleQueues.clear(); } /** @@ -569,11 +584,11 @@ function runInterpreter( return interpreterResult; } -function filterExpiredParticles() { +function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) { return pipe( tap((p: Particle) => { - if (p.hasExpired) { - log.debug(`particle ${p.id} has expired`); + if (p.hasExpired()) { + onParticleExpiration(p.id); } }), filter((x: Particle) => !x.hasExpired()), diff --git a/src/internal/Particle.ts b/src/internal/Particle.ts index c472b6a7..519ec66b 100644 --- a/src/internal/Particle.ts +++ b/src/internal/Particle.ts @@ -21,8 +21,6 @@ import log from 'loglevel'; import { ParticleContext } from './commonTypes'; import { dataToString } from './utils'; -const DefaultTTL = 7000; - export class Particle { id: string; initPeerId: string; @@ -37,7 +35,7 @@ export class Particle { const res = new Particle(); res.id = genUUID(); res.script = script; - res.ttl = ttlMs || DefaultTTL; + res.ttl = ttlMs; res.data = Buffer.from([]); res.timestamp = Date.now(); diff --git a/src/internal/utils.ts b/src/internal/utils.ts index 3f8173d2..aa3b198b 100644 --- a/src/internal/utils.ts +++ b/src/internal/utils.ts @@ -54,7 +54,7 @@ export const MakeServiceCall = (fn: (args: any[]) => CallServiceResultType) => { }; /** - * Checks the network connection by sending a ping-like request to relat node + * Checks the network connection by sending a ping-like request to relay node * @param { FluenceClient } peer - The Fluence Client instance. */ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise => { @@ -127,11 +127,15 @@ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise< }), ); + peer.internals.regHandler.timeout(particle.id, () => { + reject('particle timed out'); + }); + peer.internals.initiateParticle(particle); }); try { - const [result] = await promise; + const result = await promise; if (result != msg) { log.warn("unexpected behavior. 'identity' must return the passed arguments."); }