FluencePeer: add option to specify default TTL for all new particles (#91)

This commit is contained in:
Pavel
2021-10-21 17:56:21 +03:00
committed by GitHub
parent 9d00b70897
commit 79ad43315b
4 changed files with 61 additions and 35 deletions

View File

@ -94,6 +94,14 @@ export interface PeerConfig {
* The dialing timeout in milliseconds
*/
dialTimeoutMs?: number;
/**
* Sets the default TTL for all particles originating from the peer with no TTL specified.
* If the originating particle's TTL is defined then that value will be used
* If the option is not set default TTL will be 7000
* Value 0 (zero) is treated as if the option was not set
*/
defaultTtlMs?: number;
}
/**
@ -169,6 +177,8 @@ export class FluencePeer {
this._keyPair = await KeyPair.randomEd25519();
}
this._defaultTTL = config?.defaultTtlMs || DEFAULT_TTL;
this._interpreter = await createInterpreter(config?.avmLogLevel || 'off');
if (config?.connectTo) {
@ -234,7 +244,7 @@ export class FluencePeer {
}
if (particle.ttl === undefined) {
particle.ttl = DEFAULT_TTL;
particle.ttl = this._defaultTTL;
}
this._incomingParticles.next(particle);
@ -333,37 +343,29 @@ export class FluencePeer {
// Internal peer state
private _defaultTTL: number;
private _relayPeerId: PeerIdB58 | null = null;
private _keyPair: KeyPair;
private _connection: FluenceConnection;
private _interpreter: AirInterpreter;
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<Particle>>();
private _startParticleProcessing() {
const particleQueues = new Map<string, Subject<Particle>>();
this._incomingParticles
.pipe(
tap((x) => x.logTo('debug', 'particle received:')),
filterExpiredParticles(),
filterExpiredParticles(this._expireParticle.bind(this)),
)
.subscribe((p) => {
let particlesQueue = particleQueues.get(p.id);
let particlesQueue = this._particleQueues.get(p.id);
if (!particlesQueue) {
particlesQueue = this._createParticlesProcessingQueue();
particleQueues.set(p.id, particlesQueue);
this._particleQueues.set(p.id, particlesQueue);
const timeout = setTimeout(() => {
log.debug(`particle ${p.id} has expired. Deleting particle-related queues and handlers`);
particleQueues.delete(p.id);
const timeoutHandler = this._timeoutHandlers.get(p.id);
if (timeoutHandler) {
timeoutHandler();
}
this._particleSpecificHandlers.delete(p.id);
this._timeoutHandlers.delete(p.id);
this._expireParticle(p.id);
}, p.actualTtl());
this._timeouts.push(timeout);
@ -377,6 +379,18 @@ export class FluencePeer {
});
}
private _expireParticle(particleId: string) {
log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`);
this._particleQueues.delete(particleId);
const timeoutHandler = this._timeoutHandlers.get(particleId);
if (timeoutHandler) {
timeoutHandler();
}
this._particleSpecificHandlers.delete(particleId);
this._timeoutHandlers.delete(particleId);
}
private _createParticlesProcessingQueue() {
let particlesQueue = new Subject<Particle>();
let prevData: Uint8Array = Buffer.from([]);
@ -384,7 +398,7 @@ export class FluencePeer {
particlesQueue
.pipe(
// force new line
filterExpiredParticles(),
filterExpiredParticles(this._expireParticle.bind(this)),
)
.subscribe((x) => {
const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData);
@ -510,6 +524,7 @@ export class FluencePeer {
for (let item of this._timeouts) {
clearTimeout(item);
}
this._particleQueues.clear();
}
/**
@ -569,11 +584,11 @@ function runInterpreter(
return interpreterResult;
}
function filterExpiredParticles() {
function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) {
return pipe(
tap((p: Particle) => {
if (p.hasExpired) {
log.debug(`particle ${p.id} has expired`);
if (p.hasExpired()) {
onParticleExpiration(p.id);
}
}),
filter((x: Particle) => !x.hasExpired()),