Merge branch 'master' into mirror

# Conflicts:
#	package.json
This commit is contained in:
Mitra Ardron 2019-05-05 21:25:33 +10:00
commit c126d58652
15 changed files with 982 additions and 169 deletions

1
.gitignore vendored
View File

@ -2,3 +2,4 @@ node_modules
/package-lock.json
data.json
/radata/
yarn.lock

60
API.md
View File

@ -147,6 +147,16 @@ sig Signature data structure (see below - contains url, date, signedby, s
signature - verifiable signature of date+urls
signedby - url of data structure (typically CommonList) holding public key used for the signature
```
##### seed({directoryPath=undefined, fileRelativePath=undefined, ipfsHash=undefined, urlToFile=undefined}, cb)
Seed the file to any transports that can handle it.
```
ipfsHash: When passed as a parameter, its checked against whatever IPFS calculates.
Its reported, but not an error if it doesn't match. (the cases are complex, for example the file might have been updated).
urlFile: The URL where that file is available, this is to enable transports (e.g. IPFS) that just map an internal id to a URL.
directoryPath: Absolute path to the directory, for transports that think in terms of directories (e.g. WebTorrent)
this is the unit corresponding to a torrent, and should be where the torrent file will be found or should be built
fileRelativePath: Path (relative to directoryPath) to the file to be seeded.
```
##### p_rawlist(url)
Fetch all the objects in a list, these are identified by the url of the public key used for signing.
@ -239,14 +249,26 @@ returns: Dictionary of Key:Value pairs, note take care if this could
```
### Transports - other functions
##### static async p_f_createReadStream(url, {wanturl})
##### static async p_f_createReadStream(url, {wanturl, preferredTransports=[] })
Provide a function of the form needed by <VIDEO> tag and renderMedia library etc
```
url Urls of stream
wanturl True if want the URL of the stream (for service workers)
preferredTransports: preferred order to select stream transports (usually determined by application)
returns f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
```
##### static createReadStream(urls, opts, cb)
Different interface, more suitable when just want a stream, now.
```
urls: Url or [urls] of the stream
opts{
start, end: First and last byte wanted (default to 0...last)
preferredTransports: preferred order to select stream transports (usually determined by application)
}
returns open readable stream from the net via cb or promise
```
##### supports(url, funcl)
Determines if the Transport supports urls of this form. For example TransportIPFS supports URLs starting ipfs:
```
@ -419,6 +441,8 @@ static async p_connection(urls)||Tries all parallel
static monitor(urls, cb, { current})||Tries all sequentially
##### static async p_rawfetch(urls, {timeoutMS, start, end, relay})
FOR NEW CODE USE `fetch` instead of p_rawfetch
Tries to fetch on all valid transports until successful. See Transport.p_rawfetch
```
timeoutMS: Max time to wait on transports that support it (IPFS for fetch)
@ -426,6 +450,9 @@ start,end Inclusive byte range wanted - passed to
relay If first transport fails, try and retrieve on 2nd, then store on 1st, and so on.
```
##### fetch(url, {timeoutMS, start, end, relay}, cb)
As for p_rawfetch but returns either via callback or Promise
## httptools
A utility class to support HTTP with or without TransportHTTP
e.g. `httptools.http().p_httpfetch("http://foo.com/bar", {method: 'GET'} )`
@ -441,20 +468,23 @@ returns: Depends on mime type;
If text/* returns text
Oherwise Buffer
##### p_GET(url, {start, end})
##### p_GET(url, {start, end, retries})
Shortcut to do a HTTP/POST get, sets `mode: cors, redirect: follow, keepalive: true, cache: default`
start: First byte to retrieve
end: Last byte to retrieve (undefined means end of file)
start: First byte to retrieve
end: Last byte to retrieve (undefined means end of file)
wantstream: Return a stream rather than data
retries: How may times to retry if fails at the network layer (i.e. 404 is a success)
Note that it passes start and end as the Range header, most servers support it,
but it does not (yet) explicitly check the result.
##### p_POST(url, type, data)
##### p_POST(url, type, data, {retries})
Shortcut to do a HTTP/HTTPS POST. sets same options as p_GET
data: Data to send to fetch, typically the body,
type: Currently not passed as header{Content-type} because fetch appears to ignore it.
contenttype: Currently not passed as header{Content-type} because fetch appears to ignore it.
retries: How may times to retry if fails at the network layer (i.e. 404 is a success)
## TransportHTTP
@ -497,7 +527,8 @@ SupportFunctions (note YJS uses IPFS and supports some other functions):
SupportFeatures:
fetch.range Not supported (currently April 2018))
Currently there is code for p_f_createReadStream. It works but because of some other IPFS issues is disabled.
Currently there is code for p_f_createReadStream. It works but because IPFS cannot return an error even if it
cannot open the stream, IPFS is usually set as the last choice transport for streams.
## TransportYJS
A subclass of Transport for handling YJS connections.
@ -517,8 +548,10 @@ When used with a SW, it will attempt to retrieve from the http backup URL that i
In the SW it will also generate errors about trackers because the only reason to use trackers is to get the WebRTC links.
supportURLS = `magnet:*` (TODO: may in the future support `dweb:/magnet/*`)
supportFunctions:
`fetch, createReadStream`
`fetch`, `createReadStream`
supportFeatures:
fetch.range Not supported (currently April 2018)
@ -526,10 +559,17 @@ supportFeatures:
A subclass of Transport for handling GUN connections (decentralized database)
supportURLS = `gun:*` (TODO: may in the future support `dweb:/gun/*`)
supportFunctions
`add, list, listmonitor, newlisturls, connection, get, set, getall, keys, newdatabase, newtable, monitor`
supportFunctions = `add`, `list`, `listmonitor`, `newlisturls`, `connection`, `get`, `set`, `getall`, `keys`, `newdatabase`, `newtable`, `monitor`
supportFeatures:
## TransportWOLK
A subclass of Transport for handling the WOLK transport layer (decentralized, block chain based, incentivised storage)
supportURLs = ['wolk'];
supportFunctions = [ 'fetch', 'connection', 'get', 'set', ]; // 'store' - requires chunkdata; 'createReadStream' not implemented
## Naming
Independently from the transport, the Transport library can resolve names if provided an appropriate callback.
See p_resolveNames(urls) and resolveNamesWith(cb)

View File

@ -25,7 +25,7 @@ to your package.json file in the dependencies section.
### Installation and usage in the Browser
* Install npm & node
* Clone this repo and cd to it.
* `npm bundle` will create dist/dweb_transports_bundle.js
* `npm run build` will create dist/dweb_transports_bundle.js
* Add `<SCRIPT type="text/javascript" src="dweb_transports_bundle.js"></SCRIPT>` to your `<HEAD>`
Then code like this should work.
@ -126,9 +126,17 @@ See [Dweb document index](./DOCUMENTINDEX.md) for a list of the repos that make
### Release Notes
* 0.1.33: Bug fixes; support for gatewayUrls (for dweb-mirror)
* 0.1.35: package update (note wont work with latest versions of yjs or uglify)
* 0.1.36: Made httptools accessable at Transports.httptools so it doesnt have to be separately 'require'd
* 0.1.42: Better behavior when cant see gateway
* 0.1.41: Remove createReadStream for browser (it was added for node in 0.1.40), add fetch(url,opts,cb)
* 0.1.40: Bug fix in httpfetch({count=0}),
* 0.1.40: Added support for "seed" and tested in IPFS
* 0.1.40: WOLK - moved to their production sys and master branch
* 0.1.39: WOLK - updated wolk.js module to fix bugs
* 0.1.38: httptools - adds retries
* 0.1.38: WOLK - added to the library
* 0.1.37: IPFS - dont stop it if we didnt start it (were stopping via API)
* 0.1.37: Start move to unpromisify pattern v5
* 0.1.37: IPFS - updated to (significant) v0.34.0 API changes
* 0.1.36: Made httptools accessable at Transports.httptools so it doesnt have to be separately 'require'd
* 0.1.35: package update (note wont work with latest versions of yjs or uglify)
* 0.1.33: Bug fixes; support for gatewayUrls (for dweb-mirror)

View File

@ -92,7 +92,11 @@ class Transport {
return this.status;
}
supports(url, func) {
connected() {
// True if connected (status==STATUS_CONNECTED==0) should not need subclassing
return ! this.status;
}
supports(url, func) { //TODO-API
/*
Determine if this transport supports a certain set of URLs and a func
@ -111,6 +115,13 @@ class Transport {
&& (!func || this.supportFunctions.includes(func)))
}
validFor(url, func) { //TODO-API
// By default a transport can handle a url and a func if its connected and supports that url/func
// This shouldnt need subclassing, an exception is HTTP which only applies "connected" against urls heading for the gateway
return this.connected() && this.supports(url, func);
}
p_rawstore(data, opts) {
/*
Store a blob of data onto the decentralised transport.

View File

@ -8,7 +8,7 @@ const stringify = require('canonical-json');
defaulthttpoptions = {
urlbase: 'https://dweb.me:443'
urlbase: 'https://dweb.me'
};
servercommands = { // What the server wants to see to return each of these
@ -29,9 +29,13 @@ class TransportHTTP extends Transport {
constructor(options) {
super(options); // These are now options.http
this.options = options;
this.urlbase = options.urlbase;
this.urlbase = options.urlbase; // e.g. https://dweb.me
this.supportURLs = ['contenthash', 'http','https'];
this.supportFunctions = ['fetch', 'store', 'add', 'list', 'reverse', 'newlisturls', "get", "set", "keys", "getall", "delete", "newtable", "newdatabase"]; //Does not support: listmonitor - reverse is disabled somewhere not sure if here or caller
if (typeof window === "undefined") {
// running in node, can support createReadStream, (browser can't - see createReadStream below)
this.supportFunctions.push("createReadStream");
}
// noinspection JSUnusedGlobalSymbols
this.supportFeatures = ['fetch.range'];
this.name = "HTTP"; // For console log etc
@ -82,13 +86,19 @@ class TransportHTTP extends Transport {
url = url + (parmstr ? "?"+parmstr : "");
return url;
}
validFor(url, func) {
// Overrides Transport.prototype.validFor because HTTP's connection test is only really for dweb.me
// in particular this allows urls like https://be-api.us.archive.org
return (this.connected() || (url.protocol.startsWith("http") && ! url.href.startsWith(this.urlbase))) && this.supports(url, func);
}
// noinspection JSCheckFunctionSignatures
async p_rawfetch(url, opts={}) {
/*
Fetch from underlying transport,
Fetch is used both for contenthash requests and table as when passed to SmartDict.p_fetch may not know what we have
url: Of resource - which is turned into the HTTP url in p_httpfetch
opts: {start, end} see p_GET for documentation
opts: {start, end, retries} see p_GET for documentation
throws: TransportError if fails
*/
//if (!(url && url.includes(':') ))
@ -164,6 +174,7 @@ class TransportHTTP extends Transport {
Node.js readable stream docs: https://nodejs.org/api/stream.html#stream_readable_streams
:param string url: URL of object being retrieved of form magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
:param boolean wanturl True if want the URL of the stream (for service workers)
:resolves to: f({start, end}) => stream (The readable stream.)
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
*/
@ -192,7 +203,8 @@ class TransportHTTP extends Transport {
:param opts: { start: byte to start from; end: optional end byte }
:returns stream: The readable stream - it is returned immediately, though won't be sending data until the http completes
*/
// This breaks in browsers ... as 's' doesn't have .pipe but has .pipeTo and .pipeThrough neither of which work with stream.PassThrough
// TODO See https://github.com/nodejs/readable-stream/issues/406 in case its fixed in which case enable createReadStream in constructor above.
debughttp("createreadstream %s %o", Url.parse(url).href, opts);
let through;
through = new stream.PassThrough();
@ -298,7 +310,7 @@ class TransportHTTP extends Transport {
}
*/
p_info() { return httptools.p_GET(`${this.urlbase}/info`); }
p_info() { return httptools.p_GET(`${this.urlbase}/info`, {retries: 5}); } // Try info, but dont wait more than approx 10secs
static async p_test(opts={}) {
{console.log("TransportHTTP.test")}

View File

@ -60,11 +60,15 @@ class TransportIPFS extends Transport {
constructor(options) {
super(options);
if (options.urlUrlstore) {
this.urlUrlstore = options.urlUrlstore;
delete options.urlUrlstore;
}
this.ipfs = undefined; // Undefined till start IPFS
this.options = options; // Dictionary of options
this.name = "IPFS"; // For console log etc
this.supportURLs = ['ipfs'];
this.supportFunctions = ['fetch', 'store', 'createReadStream']; // Does not support reverse
this.supportFunctions = ['fetch', 'store', 'seed', 'createReadStream']; // Does not support reverse
this.status = Transport.STATUS_LOADED;
}
@ -348,6 +352,36 @@ class TransportIPFS extends Transport {
return TransportIPFS.urlFrom(res);
}
seed({directoryPath=undefined, fileRelativePath=undefined, ipfsHash=undefined, urlToFile=undefined}, cb) {
/* Note always passed a cb by Transports.seed - no need to support Promise here
ipfsHash: IPFS hash if known (usually not known)
urlToFile: Where the IPFS server can get the file - must be live before this called as will fetch and hash
TODO support directoryPath/fileRelativePath, but to working around IPFS limitation in https://github.com/ipfs/go-ipfs/issues/4224 will need to check relative to IPFS home, and if not symlink it and add symlink
TODO maybe support adding raw data (using add)
Note neither js-ipfs-http-client nor js-ipfs appear to support urlstore yet, see https://github.com/ipfs/js-ipfs-http-client/issues/969
*/
// This is the URL that the IPFS server uses to get the file from the local mirrorHttp
if (!(this.urlUrlstore && urlToFile)) { // Not doing IPFS
debug("IPFS.seed support requires urlUrlstore and urlToFile"); // Report, though Transports.seed currently ignores this
cb(new Error("IPFS.seed support requires urlUrlstore and urlToFile")); // Report, though Transports.seed currently ignores this
} else {
// Building by hand becase of lack of support in js-ipfs-http-client
const url = `${this.urlUrlstore}?arg=${encodeURIComponent(urlToFile)}`;
// Have to be careful to avoid loops, the call to addIPFS should only be after file is retrieved and cached, and then addIPFS shouldnt be called if already cached
httptools.p_GET(url, {retries:0}, (err, res) => {
if (err) {
debug("IPFS.seed for %s failed in http: %s", urlToFile, err.message);
cb(err); // Note error currently ignored in Transports
} else {
debug("Added %s to IPFS key=", urlToFile, res.Key);
// Check for mismatch - this isn't an error, for example it could be an updated file, old IPFS hash will now fail, but is out of date and shouldnt be shared
if (ipfsHash && ipfsHash !== res.Key) { debug("ipfs hash doesnt match expected metadata has %s daemon returned %s", ipfsHash, res.Key); }
cb(null, res)
}
})
}
}
async p_f_createReadStream(url, {wanturl=false}={}) {
/*
Fetch bytes progressively, using a node.js readable stream, based on a url of the form:
@ -362,6 +396,7 @@ class TransportIPFS extends Transport {
:param string url: URL of object being retrieved of form:
magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
ipfs:/ipfs/Q123
:param boolean wanturl True if want the URL of the stream (for service workers)
:resolves to: f({start, end}) => stream (The readable stream.)
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
*/

View File

@ -253,6 +253,7 @@ class TransportWEBTORRENT extends Transport {
Node.js readable stream docs: https://nodejs.org/api/stream.html#stream_readable_streams
:param string url: URL of object being retrieved of form magnet:xyzabc/path/to/file (Where xyzabc is the typical magnet uri contents)
:param boolean wanturl True if want the URL of the stream (for service workers)
:resolves to: f({start, end}) => stream (The readable stream.)
:throws: TransportError if url invalid - note this happens immediately, not as a catch in the promise
*/

269
TransportWOLK.js Normal file
View File

@ -0,0 +1,269 @@
/*
This Transport layers uses Wolk NoSQL + Cloudstore.
*/
var WOLK
const Url = require('url');
if( typeof window === 'undefined' ) {
WOLK = require("wolkjs").FS;
} else {
WOLK = require("wolkjs").WOLK;
}
const stringify = require('canonical-json');
const debug = require('debug')('dweb-transports:wolk');
// Other Dweb modules
const errors = require('./Errors'); // Standard Dweb Errors
const Transport = require('./Transport.js'); // Base class for TransportXyz
const Transports = require('./Transports'); // Manage all Transports that are loaded
const utils = require('./utils'); // Utility functions
let defaultoptions = {
wolk_addr: "https://cloud.wolk.com",
};
class TransportWOLK extends Transport {
/* Wolk specific transport */
constructor(options) {
super(options);
this.options = options; // Dictionary of options
this.wolk = undefined;
this.name = "WOLK"; // For console log etc
//TODO: Change name to WOLK once we understand everything
this.supportURLs = ['wolk'];
this.supportFunctions = [ 'fetch', 'connection', 'get', 'set', ]; // 'store' - requires chunkdata; 'createReadStream' not implemented
this.status = Transport.STATUS_LOADED;
}
connection(url) {
debug("connection call")
var wolknode = new WOLK();
return wolknode
}
//stuff that happens b/f using ntwk bandwidth (config/connect/stuff)
static setup0(options) {
let combinedoptions = Transport.mergeoptions(defaultoptions, options.wolk);
debug("setup options=%o", combinedoptions);
let t = new TransportWOLK(combinedoptions);
t.wolk = new WOLK();
//var successinit = await Promise.all(t.wolk.init())
t.wolk.setProvider(t.options.wolk_addr);
Transports.addtransport(t);
return t;
}
//make the connection
async p_setup1(cb) {
await this.wolk.init()
.then( async () => { //TODO-WOLK check - I'm just not familiar with this construct - an async function inside a .then
if( this.wolk.ecdsaKey == undefined || this.wolk.ecdsaKey == null ) {
var wolkName = "user" + Math.floor((Math.random() * 1000) + 1);
debug("createAccount because ecdsaKey null")
return await this.wolk.createAccount(wolkName)
.then( hash => {
debug("Account Created: [" + wolkName + "] hash: " + hash + " KEY: " + this.wolk.ecdsaKey)
})
.catch( err => {
throw new Error("Error Creating Account: " + err);
})
}
})
.catch( (err) => {
throw new Error("Error Initializing Wolk: " + err);
});
try {
this.status = Transport.STATUS_STARTING; // Should display, but probably not refreshed in most case
if (cb) cb(this);
await this.p_status();
} catch(err) {
this.status = Transport.STATUS_FAILED;
}
if (cb) cb(this);
return this;
}
async p_status() {
/* Return an integer for the status of a transport see Transport */
return this.wolk.getLatestBlockNumber()
.then( (bn) => {
if (bn >= 0) {
debug("STATUS: connected? [1] = BN: %s", bn)
this.status = Transport.STATUS_CONNECTED;
} else {
debug("STATUS: connected? [0] = BN: %s", bn)
this.status = Transport.STATUS_FAILED;
}
return this.status;
})
.catch( (err) => { console.error("Error getting bn: " + err); })
}
// ===== DATA ======
async p_rawstore(chunk) {
/*
Store a blob of data onto the decentralised transport.
Returns a promise that resolves to the url of the data
:param string|Buffer data: Data to store - no assumptions made to size or content
:resolve string: url of data stored
*/
console.assert(chunk, "TransportWOLK.p_rawstore: requires chunkdata");
/* TODO:
const rawRes = this.wolk.setChunk(chunk);
if (rawRes.err) {
throw new errors.TransportError("Error encountered storing chunk: " + rawRes.err);
}
return "wolk://wolk/" + rawRes.h;
*/
}
parseWolkUrl(url) {
var url = Url.parse(url);
if(url.protocol != "wolk:") {
throw new errors.TransportError("WOLK Error encountered retrieving val: url (" + url.href + ") is not a valid WOLK url | protocol = " + url.protocol);
}
let wolkowner = url.host
var urlParts = url.path.split("/");
let wolkbucket = urlParts[1];
let wolkpath = url.path.substring(wolkbucket.length + 2);
var wolkurltype = "key"
if( wolkowner == "wolk" && wolkbucket == "chunk" ) {
wolkurltype = "chunk"
}
let wolkquery = url.query
return { owner: wolkowner, bucket: wolkbucket, path: wolkpath, urltype: wolkurltype, query: wolkquery }
}
async p_rawfetch(url) {
//TODO: use this.wolk.parseWolkUrl eventually
var wolkurl = this.parseWolkUrl(url)
/*
console.log("WOLK p_rawfetch url: " + JSON.stringify(wolkurl));
console.log("WOLK owner: " + wolkurl.owner);
console.log("WOLK bucket: " + wolkurl.bucket);
console.log("WOLK key: " + wolkurl.path);
console.log("WOLK query: " + wolkurl.query);
console.log("WOLK urltype: " + wolkurl.urltype);
*/
var responseData = ""
if( wolkurl.urltype == "key" ) {
debug("Checking Wolk NoSQL for: %s", url)
return this.wolk.getKey(wolkurl.owner, wolkurl.bucket, wolkurl.path, "latest")
.then(function(responseData) {
//TODO-WOLK: error checking
//debug("Response: %s", JSON.stringify(responseData)); //Commented as could be big
return responseData;
})
.catch( (err) => {
throw new Error("ERROR: p_rawfetch - " + err);
})
}
}
//=======KEY VALUE TABLES ========
async p_newdatabase(pubkey) {
}
async p_newtable(pubkey, table) {
}
async p_set(url, keyvalues, value) {
/*
Set key values
keyvalues: string (key) in which case value should be set there OR object in which case value is ignored
*/
var wolkurl = this.parseWolkUrl(url)
/*
console.log("WOLK p_set url: " + JSON.stringify(wolkurl));
console.log("WOLK owner: " + wolkurl.owner);
console.log("WOLK bucket: " + wolkurl.bucket);
console.log("WOLK key: " + wolkurl.path);
console.log("WOLK query: " + wolkurl.query);
console.log("WOLK urltype: " + wolkurl.urltype);
*/
if (typeof keyvalues === "string") {
return this.wolk.setKey(wolkurl.owner, wolkurl.bucket, keyvalues, stringify(value))
.then( (hash) => {
return hash;
})
.catch( (err) => {
throw new Error("TransportWOLK - Error setting key value pair: " + err)
});
} else {
// Store all key-value pairs without destroying any other key/value pairs previously set
//TODO: Why not support Arrays?
console.assert(!Array.isArray(keyvalues), "TransportWOLK - shouldnt be passsing an array as the keyvalues");
//TODO: better understand dictionary objects
/*
table.put(
Object.keys(keyvalues).reduce(
function(previous, key) {
previous[key] = stringify(keyvalues[key]);
return previous;
},
{}
)
)
*/
}
}
async p_get(url, keys) {
var wolkurl = this.parseWolkUrl(url)
debug("Getting url: %s", JSON.stringify(wolkurl));
/*
console.log("WOLK owner: " + wolkurl.owner);
console.log("WOLK bucket: " + wolkurl.bucket);
console.log("WOLK key: " + wolkurl.path);
console.log("WOLK query: " + wolkurl.query);
console.log("WOLK urltype: " + wolkurl.urltype);
*/
if (Array.isArray(keys)) {
throw new errors.ToBeImplementedError("p_get(url, [keys]) isn't supported - because of ambiguity better to explicitly loop on set of keys");
/*
return keys.reduce(function(previous, key) {
let val = table.get(key);
previous[key] = typeof val === "string" ? JSON.parse(val) : val; // Handle undefined
return previous;
}, {});
*/
} else {
return this.wolk.getKey(wolkurl.owner, wolkurl.bucket, keys, "latest")
.then( (value) => { return value; })
.catch( (err) => {
throw new errors.TransportError("Error encountered getting keyvalues: " + err);
})
}
}
async p_delete(url, keys) {
var wolkurl = this.parseWolkUrl(url)
if ( typeof keys === "string") {
return this.wolk.deleteKey(wolkurl.owner, wolkurl.bucket, keys)
.then( (res) => { return res; })
.catch( (err) => { throw new errors.TransportError("Error deleting key(s): " + err)})
} else {
keys.map( (key) => {
this.wolk.deleteKey(wolkurl.owner, wolkurl.bucket, key)
})
}
}
async p_keys(url) {
var wolkurl = this.parseWolkUrl(url)
return this.listCollection(wolkurl.owner, wolkurl.bucket, {})
}
async p_getall(url) {
//TODO: difference between this and p_keys
}
}
Transports._transportclasses["WOLK"] = TransportWOLK;
exports = module.exports = TransportWOLK;

View File

@ -2,8 +2,9 @@ const Url = require('url');
const errors = require('./Errors');
const utils = require('./utils');
//process.env.DEBUG = "dweb-transports"; //TODO-DEBUG set at top level
const debugtransports = require('debug')('dweb-transports');
const debug = require('debug')('dweb-transports');
const httptools = require('./httptools');
const each = require('async/each');
class Transports {
/*
@ -56,18 +57,18 @@ class Transports {
throws: CodingError if urls empty or [undefined...]
*/
if (typeof urls === "string") urls = [urls];
if (!((urls && urls[0]) || ["store", "newlisturls", "newdatabase", "newtable"].includes(func))) {
if (!((urls && urls[0]) || ["store", "newlisturls", "newdatabase", "newtable", "seed"].includes(func))) {
console.error("Transports.validFor called with invalid arguments: urls=", urls, "func=", func); // FOr debugging old calling patterns with [ undefined ]
return [];
}
if (!(urls && urls.length > 0)) {
return this._connected().filter((t) => (t.supports(undefined, func)))
if (!(urls && urls.length > 0)) { // No url supplied we are just checking which transports support this function on no url.
return this._transports.filter((t) => (t.validFor(undefined, func)))
.map((t) => [undefined, t]);
} else {
return [].concat(
...urls.map((url) => typeof url === 'string' ? Url.parse(url) : url) // parse URLs once
.map((url) =>
this._connected().filter((t) => (t.supports(url, func))) // [ t1, t2 ]
this._transports.filter((t) => (t.validFor(url, func))) // [ t1, t2 ]
.map((t) => [url, t]))); // [[ u, t1], [u, t2]]
}
}
@ -79,11 +80,17 @@ class Transports {
// SEE-OTHER-ADDTRANSPORT
static http() {
// Find an http transport if it exists, so for example YJS can use it.
// Find an http transport if it exists.
return Transports._connected().find((t) => t.name === "HTTP")
}
static wolk() {
// Find a Wolk transport if it exists.
return Transports._connected().find((t) => t.name === "WOLK")
}
static ipfs() {
// Find an ipfs transport if it exists, so for example YJS can use it.
// Find an ipfs transport if it exists, in particular, so YJS can use it.
return Transports._connected().find((t) => t.name === "IPFS")
}
@ -120,19 +127,19 @@ class Transports {
let errs = [];
let rr = await Promise.all(tt.map(async function(t) {
try {
debugtransports("Storing %d bytes to %s", data.length, t.name);
debug("Storing %d bytes to %s", data.length, t.name);
let url = await t.p_rawstore(data);
debugtransports("Storing %d bytes to %s succeeded: %s", data.length, t.name, url);
debug("Storing %d bytes to %s succeeded: %s", data.length, t.name, url);
return url; //url
} catch(err) {
debugtransports("Storing %d bytes to %s failed: %s", data.length, t.name, err.message);
debug("Storing %d bytes to %s failed: %s", data.length, t.name, err.message);
errs.push(err);
return undefined;
}
}));
rr = rr.filter((r) => !!r); // Trim any that had errors
if (!rr.length) {
debugtransports("Storing %d bytes failed on all transports", data.length);
debug("Storing %d bytes failed on all transports", data.length);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
}
return rr;
@ -146,40 +153,11 @@ class Transports {
*/
let tt = this.validFor(undefined, "store").map(([u, t]) => t); // Valid connected transports that support "store"
if (!tt.length) {
debugtransports("Storing %d bytes failed: no transports available", data.length);
debug("Storing %d bytes failed: no transports available", data.length);
throw new errors.TransportError('Transports.p_rawstore: Cant find transport for store');
}
return this._p_rawstore(tt, data);
}
static async p_rawlist(urls) {
urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
let tt = this.validFor(urls, "list"); // Valid connected transports that support "store"
if (!tt.length) {
throw new errors.TransportError('Transports.p_rawlist: Cant find transport to "list" urls:'+urls.join(','));
}
let errs = [];
let ttlines = await Promise.all(tt.map(async function([url, t]) {
try {
debugtransports("Listing %s via %s", url, t.name);
let res = await t.p_rawlist(url); // [sig]
debugtransports("Listing %s via %s retrieved %d items", url, t.name, res.length);
return res;
} catch(err) {
debugtransports("Listing %s via %s failed: %s", url, t.name, err.message);
errs.push(err);
return [];
}
})); // [[sig,sig],[sig,sig]]
if (errs.length >= tt.length) {
// All Transports failed (maybe only 1)
debugtransports("Listing %o failed on all transports", urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
}
let uniques = {}; // Used to filter duplicates
return [].concat(...ttlines)
.filter((x) => (!uniques[x.signature] && (uniques[x.signature] = true)));
}
static async p_rawfetch(urls, opts={}) {
/*
Fetch the data for a url, transports act on the data, typically storing it.
@ -205,28 +183,92 @@ class Transports {
let failedtransports = []; // Will accumulate any transports fail on before the success
for (const [url, t] of tt) {
try {
debugtransports("Fetching %s via %s", url.href, t.name);
debug("Fetching %s via %s", url.href, t.name);
let data = await t.p_rawfetch(url, opts); // throws errors if fails or timesout
debugtransports("Fetching %s via %s succeeded %d bytes", url.href, t.name, data.length);
debug("Fetching %s via %s succeeded %d bytes", url.href, t.name, data.length);
//TODO-MULTI-GATEWAY working here - it doesnt quite work yet as the "Add" on browser gets different url than on server
if (opts.relay && failedtransports.length) {
debugtransports("Fetching attempting relay of %d bytes from %s to %o", data.length, url.href, failedtransports.map(t=>t.name));
debug("Fetching attempting relay of %d bytes from %s to %o", data.length, url.href, failedtransports.map(t=>t.name));
this._p_rawstore(failedtransports, data)
.then(uu => debugtransports(`Fetching relayed %d bytes to %o`, data.length, uu)); // Happening async, not waiting and dont care if fails
.then(uu => debug(`Fetching relayed %d bytes to %o`, data.length, uu)); // Happening async, not waiting and dont care if fails
}
//END TODO-MULTI-GATEWAY
return data;
} catch (err) {
failedtransports.push(t);
errs.push(err);
debugtransports("Fetching %s via %s failed: %s", url.href, t.name, err.message);
debug("Fetching %s via %s failed: %s", url.href, t.name, err.message);
// Don't throw anything here, loop round for next, only throw if drop out bottom
//TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
}
}
debugtransports("Fetching %o failed on all transports", urls);
debug("Fetching %o failed on all transports", urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
}
static fetch(urls, opts={}, cb) { //TODO-API
if (typeof opts === "function") { cb = opts; opts={}; }
const prom = this.p_rawfetch(urls, opts);
if (cb) { prom.then((res)=>{ try { cb(null,res)} catch(err) { debug("Uncaught error in fetch %O",err)}}).catch((err) => cb(err)); } else { return prom; } // Unpromisify pattern v5
}
// Seeding =====
// Similar to storing.
static seed({directoryPath=undefined, fileRelativePath=undefined, ipfsHash=undefined, urlToFile=undefined}, cb) {
/*
TODO-API get thsi from the issue
ipfsHash: When passed as a parameter, its checked against whatever IPFS calculates.
Its reported, but not an error if it doesn't match. (the cases are complex, for example the file might have been updated).
urlFile: The URL where that file is available, this is to enable transports (e.g. IPFS) that just map an internal id to a URL.
directoryPath: Absolute path to the directory, for transports that think in terms of directories (e.g. WebTorrent)
this is the unit corresponding to a torrent, and should be where the torrent file will be found or should be built
fileRelativePath: Path (relative to directoryPath) to the file to be seeded.
*/
if (cb) { try { f.call(this, cb) } catch(err) { cb(err)}} else { return new Promise((resolve, reject) => { try { f.call(this, (err, res) => { if (err) {reject(err)} else {resolve(res)} })} catch(err) {reject(err)}})} // Promisify pattern v2
function f(cb1) {
let tt = this.validFor(undefined, "seed").map(([u, t]) => t); // Valid connected transports that support "seed"
if (!tt.length) {
debug("Seeding: no transports available");
cb1(null); // Its not (currently) an error to be unable to seed
} else {
const res = {};
each(tt, // [ Transport]
(t, cb2) => t.seed({directoryPath, fileRelativePath, ipfsHash, urlToFile},
(err, oneres) => { res[t.name] = err ? { err: err.message } : oneres; cb2(null)}), // Its not an error for t.seed to fail - errors should have been logged by transports
(unusederr) => cb1(null, res)); // Return result of any seeds that succeeded as e.g. { HTTP: {}, IPFS: {ipfsHash:} }
}
}
}
// List handling ===========================================
static async p_rawlist(urls) {
urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
let tt = this.validFor(urls, "list"); // Valid connected transports that support "store"
if (!tt.length) {
throw new errors.TransportError('Transports.p_rawlist: Cant find transport to "list" urls:'+urls.join(','));
}
let errs = [];
let ttlines = await Promise.all(tt.map(async function([url, t]) {
try {
debug("Listing %s via %s", url, t.name);
let res = await t.p_rawlist(url); // [sig]
debug("Listing %s via %s retrieved %d items", url, t.name, res.length);
return res;
} catch(err) {
debug("Listing %s via %s failed: %s", url, t.name, err.message);
errs.push(err);
return [];
}
})); // [[sig,sig],[sig,sig]]
if (errs.length >= tt.length) {
// All Transports failed (maybe only 1)
debug("Listing %o failed on all transports", urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
}
let uniques = {}; // Used to filter duplicates
return [].concat(...ttlines)
.filter((x) => (!uniques[x.signature] && (uniques[x.signature] = true)));
}
static async p_rawadd(urls, sig) {
/*
@ -241,24 +283,24 @@ class Transports {
urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
let tt = this.validFor(urls, "add"); // Valid connected transports that support "store"
if (!tt.length) {
debugtransports("Adding to %o failed: no transports available", urls);
debug("Adding to %o failed: no transports available", urls);
throw new errors.TransportError('Transports.p_rawstore: Cant find transport for urls:'+urls.join(','));
}
let errs = [];
await Promise.all(tt.map(async function([u, t]) {
try {
debugtransports("Adding to %s via %s", u, t.name);
debug("Adding to %s via %s", u, t.name);
await t.p_rawadd(u, sig); //undefined
debugtransports("Adding to %s via %s succeeded", u, t.name);
debug("Adding to %s via %s succeeded", u, t.name);
return undefined;
} catch(err) {
debugtransports("Adding to %s via %s failed: %s", u, t.name, err.message);
debug("Adding to %s via %s failed: %s", u, t.name, err.message);
errs.push(err);
return undefined;
}
}));
if (errs.length >= tt.length) {
debugtransports("Adding to %o failed on all transports", urls);
debug("Adding to %o failed on all transports", urls);
// All Transports failed (maybe only 1)
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
}
@ -274,7 +316,7 @@ class Transports {
this.validFor(urls, "listmonitor")
.map(([u, t]) => {
t.listmonitor(u, cb, opts);
debugtransports("Monitoring list %s via %s", u, t.name);
debug("Monitoring list %s via %s", u, t.name);
});
}
@ -293,48 +335,52 @@ class Transports {
static async p_f_createReadStream(urls, {wanturl=false, preferredTransports=[]}={}) { // Note options is options for selecting a stream, not the start/end in a createReadStream call
/*
urls: Url or [urls] of the stream
wanturl True if want the URL of the stream (for service workers)
returns: f(opts) => stream returning bytes from opts.start || start of file to opts.end-1 || end of file
*/
// Find all the transports that CAN support this request
let tt = this.validFor(urls, "createReadStream", {}); //[ [Url,t],[Url,t]] // Can pass options TODO-STREAM support options in validFor
if (!tt.length) {
debugtransports("Opening stream to %o failed: no transports available", urls);
debug("Opening stream from %o failed: no transports available", urls);
throw new errors.TransportError("Transports.p_createReadStream cant find any transport for urls: " + urls);
}
//With multiple transports, it should return when the first one returns something.
let errs = [];
// Until we have transport ordering, try randomly TODO Transport ordering
//Debugging: preferredTransports = [] // ["WEBTORRENT", "IPFS", "HTTP"];
// Select first from preferredTransports in the order presented, then the rest at random
tt.sort((a,b) =>
((preferredTransports.indexOf(a[1].name)+1) || 999+Math.random()) - ((preferredTransports.indexOf(b[1].name)+1) || 999+Math.random())
);
for (const [url, t] of tt) {
try {
debugtransports("Opening stream to %s via %s", url.href, t.name);
debug("Opening stream from %s via %s", url.href, t.name);
let res = await t.p_f_createReadStream(url, {wanturl} );
debugtransports("Opening stream to %s via %s succeeded", url.href, t.name);
debug("Opening stream from %s via %s succeeded", url.href, t.name);
return res;
} catch (err) {
errs.push(err);
debugtransports("Opening stream to %s via %s failed: %s", url.href, t.name, err.message);
debug("Opening stream from %s via %s failed: %s", url.href, t.name, err.message);
// Don't throw anything here, loop round for next, only throw if drop out bottom
//TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
}
}
debugtransports("Opening stream to %o failed on all transports", urls);
debug("Opening stream from %o failed on all transports", urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
}
static createReadStream(urls, opts, cb) { //TODO-API
static createReadStream(urls, opts, cb) {
/*
Different interface, more suitable when just want a stream, now.
urls: Url or [urls] of the stream
opts{start, end}: First and last byte wanted (default to 0...last)
opts{
start, end: First and last byte wanted (default to 0...last)
preferredTransports: preferred order to select stream transports (usually determined by application)
}
cb(err, stream): Called with open readable stream from the net.
Returns promise if no cb
*/
if (typeof opts === "function") { cb = opts; opts = {start: 0}; } // Allow skipping opts
DwebTransports.p_f_createReadStream(urls)
DwebTransports.p_f_createReadStream(urls, {preferredTransports: (opts.preferredTransports || [])})
.then(f => {
let s = f(opts);
if (cb) { cb(null, s); } else { return(s); }; // Callback or resolve stream
@ -363,24 +409,24 @@ class Transports {
let tt = this.validFor(urls, "get"); //[ [Url,t],[Url,t]]
let debug1 = Array.isArray(keys) ? `${keys.length} keys` : keys; // "1 keys" or "foo"
if (!tt.length) {
debugtransports("Getting %s from %o failed: no transports available", debug1, urls);
debug("Getting %s from %o failed: no transports available", debug1, urls);
throw new errors.TransportError("Transports.p_get cant find any transport to get keys from urls: " + urls);
}
//With multiple transports, it should return when the first one returns something.
let errs = [];
for (const [url, t] of tt) {
try {
debugtransports("Getting %s from %s via %s", debug1, url.href, t.name);
debug("Getting %s from %s via %s", debug1, url.href, t.name);
let res = await t.p_get(url, keys); //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
debugtransports("Getting %s from %s via %s succeeded length=%d", debug1, url.href, t.name, res.length);
debug("Getting %s from %s via %s succeeded length=%d", debug1, url.href, t.name, res.length);
return res;
} catch (err) {
errs.push(err);
debugtransports("Getting %s from %s via %s failed: %s", debug1, url.href, t.name, err.message);
debug("Getting %s from %s via %s failed: %s", debug1, url.href, t.name, err.message);
// Don't throw anything here, loop round for next, only throw if drop out bottom
}
}
debugtransports("Getting %s from %o failed on all transports", debug1, urls);
debug("Getting %s from %o failed on all transports", debug1, urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
}
static async p_set(urls, keyvalues, value) {
@ -393,24 +439,24 @@ class Transports {
let debug1 = typeof keyvalues === "object" ? `${keyvalues.length} keys` : keyvalues; // "1 keys" or "foo"
let tt = this.validFor(urls, "set"); //[ [Url,t],[Url,t]]
if (!tt.length) {
debugtransports("Setting %s on %o failed: no transports available", debug1, urls);
debug("Setting %s on %o failed: no transports available", debug1, urls);
throw new errors.TransportError("Transports.p_set cant find any transport for urls: " + urls);
}
let errs = [];
let success = false;
await Promise.all(tt.map(async function([url, t]) {
try {
debugtransports("Setting %s on %s via %s", debug1, url.href, t.name);
debug("Setting %s on %s via %s", debug1, url.href, t.name);
await t.p_set(url, keyvalues, value);
debugtransports("Setting %s on %s via %s succeeded", debug1, url.href, t.name);
debug("Setting %s on %s via %s succeeded", debug1, url.href, t.name);
success = true; // Any one success will return true
} catch(err) {
debugtransports("Setting %s on %s via %s failed: %s", debug1, url.href, t.name, err.message);
debug("Setting %s on %s via %s failed: %s", debug1, url.href, t.name, err.message);
errs.push(err);
}
}));
if (!success) {
debugtransports("Setting %s on %o failed on all transports", debug1, urls);
debug("Setting %s on %o failed on all transports", debug1, urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
}
}
@ -425,24 +471,24 @@ class Transports {
let debug1 = Array.isArray(keys) ? `${keys.length} keys` : keys; // "1 keys" or "foo"
let tt = this.validFor(urls, "set"); //[ [Url,t],[Url,t]]
if (!tt.length) {
debugtransports("Deleting %s on %o failed: no transports available", debug1, urls);
debug("Deleting %s on %o failed: no transports available", debug1, urls);
throw new errors.TransportError("Transports.p_set cant find any transport for urls: " + urls);
}
let errs = [];
let success = false;
await Promise.all(tt.map(async function([url, t]) {
try {
debugtransports("Deleting %s on %s via %s", debug1, url.href, t.name);
debug("Deleting %s on %s via %s", debug1, url.href, t.name);
await t.p_delete(url, keys);
debugtransports("Deleting %s on %s via %s succeeded", debug1, url.href, t.name);
debug("Deleting %s on %s via %s succeeded", debug1, url.href, t.name);
success = true; // Any one success will return true
} catch(err) {
debugtransports("Deleting %s on %s via %s failed: %s", debug1, url.href, t.name, err.message);
debug("Deleting %s on %s via %s failed: %s", debug1, url.href, t.name, err.message);
errs.push(err);
}
}));
if (!success) {
debugtransports("Deleting %s on %o failed on all transports", debug1, urls);
debug("Deleting %s on %o failed on all transports", debug1, urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); // New error with concatenated messages
}
}
@ -457,24 +503,24 @@ class Transports {
urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
let tt = this.validFor(urls, "keys"); //[ [Url,t],[Url,t]]
if (!tt.length) {
debugtransports("Getting all keys on %o failed: no transports available", urls);
debug("Getting all keys on %o failed: no transports available", urls);
throw new errors.TransportError("Transports.p_keys cant find any transport for urls: " + urls);
}
//With multiple transports, it should return when the first one returns something. TODO make it return the aggregate
let errs = [];
for (const [url, t] of tt) {
try {
debugtransports("Getting all keys on %s via %s", url.href, t.name);
debug("Getting all keys on %s via %s", url.href, t.name);
let res = await t.p_keys(url); //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
debugtransports("Getting all keys on %s via %s succeeded with %d keys", url.href, t.name, res.length);
debug("Getting all keys on %s via %s succeeded with %d keys", url.href, t.name, res.length);
return res;
} catch (err) {
errs.push(err);
debugtransports("Getting all keys on %s via %s failed: %s", url.href, t.name, err.message);
debug("Getting all keys on %s via %s failed: %s", url.href, t.name, err.message);
// Don't throw anything here, loop round for next, only throw if drop out bottom
}
}
debugtransports("Getting all keys on %o failed on all transports", urls);
debug("Getting all keys on %o failed on all transports", urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
}
@ -489,24 +535,24 @@ class Transports {
urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
let tt = this.validFor(urls, "getall"); //[ [Url,t],[Url,t]]
if (!tt.length) {
debugtransports("Getting all values on %o failed: no transports available", urls);
debug("Getting all values on %o failed: no transports available", urls);
throw new errors.TransportError("Transports.p_getall cant find any transport for urls: " + urls);
}
//With multiple transports, it should return when the first one returns something.
let errs = [];
for (const [url, t] of tt) {
try {
debugtransports("Getting all values on %s via %s", url.href, t.name);
debug("Getting all values on %s via %s", url.href, t.name);
let res = await t.p_getall(url); //TODO-MULTI-GATEWAY potentially copy from success to failed URLs.
debugtransports("Getting all values on %s via %s succeeded with %d values", url.href, t.name, res.length);
debug("Getting all values on %s via %s succeeded with %d values", url.href, t.name, res.length);
return res;
} catch (err) {
errs.push(err);
debugtransports("Getting all values on %s via %s failed: %s", url.href, t.name, err.message);
debug("Getting all values on %s via %s failed: %s", url.href, t.name, err.message);
// Don't throw anything here, loop round for next, only throw if drop out bottom
}
}
debugtransports("Getting all keys on %o failed on all transports", urls);
debug("Getting all keys on %o failed on all transports", urls);
throw new errors.TransportError(errs.map((err)=>err.message).join(', ')); //Throw err with combined messages if none succeed
}
@ -552,7 +598,7 @@ class Transports {
//Can't its async. urls = await this.p_resolveNames(urls); // If naming is loaded then convert to a name
this.validFor(urls, "monitor")
.map(([u, t]) => {
debugtransports("Monitoring table %s via %s", u, t.name);
debug("Monitoring table %s via %s", u, t.name);
t.monitor(u, cb, {current})
}
);
@ -583,10 +629,10 @@ class Transports {
return tabbrevs.map((tabbrev) => {
let transportclass = this._transportclasses[ (tabbrev === "LOCAL") ? "HTTP" : tabbrev ];
if (!transportclass) {
debugtransports("Connection to %s unavailable", tabbrev);
debug("Connection to %s unavailable", tabbrev);
return undefined;
} else {
debugtransports("Setting up connection to %s with options %o", tabbrev, options);
debug("Setting up connection to %s with options %o", tabbrev, options);
return transportclass.setup0(tabbrev === "LOCAL" ? localoptions : options);
}
}).filter(f => !!f); // Trim out any undefined
@ -597,7 +643,7 @@ class Transports {
const prom = Promise.all(this._transports
.filter((t) => (! this._optionspaused.includes(t.name)))
.map((t) => {
debugtransports("Connection stage 1 to %s", t.name);
debug("Connection stage 1 to %s", t.name);
return t.p_setup1(refreshstatus);
}))
if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
@ -609,7 +655,7 @@ class Transports {
const prom = Promise.all(this._transports
.filter((t) => (! this._optionspaused.includes(t.name)))
.map((t) => {
debugtransports("Connection stage 2 to %s", t.name);
debug("Connection stage 2 to %s", t.name);
return t.p_setup2(refreshstatus);
}));
if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
@ -619,7 +665,7 @@ class Transports {
const prom = Promise.all(this._connected()
.map((t) => {
debugtransports("Stopping %s", t.name);
debug("Stopping %s", t.name);
return t.p_stop(refreshstatus);
}));
if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)); } else { return prom; } // This should be a standard unpromisify pattern
@ -653,7 +699,7 @@ class Transports {
let tabbrevs = options.transports; // Array of transport abbreviations
this._optionspaused = (options.paused || []).map(n => n.toUpperCase()); // Array of transports paused - defaults to none, upper cased
if (!(tabbrevs && tabbrevs.length)) { tabbrevs = options.defaulttransports || [] }
if (! tabbrevs.length) { tabbrevs = ["HTTP", "YJS", "IPFS", "WEBTORRENT", "GUN"]; } // SEE-OTHER-ADDTRANSPORT
if (! tabbrevs.length) { tabbrevs = ["HTTP", "YJS", "IPFS", "WEBTORRENT", "GUN", "WOLK"]; } // SEE-OTHER-ADDTRANSPORT
tabbrevs = tabbrevs.map(n => n.toUpperCase());
let transports = this.setup0(tabbrevs, options);
["statuscb", "mirror"].forEach(k => { if (options[k]) this[k] = options[k];} )
@ -662,17 +708,17 @@ class Transports {
while (statuselement.lastChild) {statuselement.removeChild(statuselement.lastChild); } // Remove any exist status
statuselement.appendChild(
utils.createElement("UL", {}, transports.map(t => {
let el = utils.createElement("LI",
{onclick: "this.source.togglePaused(DwebTransports.refreshstatus);", source: t, name: t.name}, //TODO-SW figure out how t osend this back
t.name);
t.statuselement = el; // Save status element on transport
return el;
}
)));
let el = utils.createElement("LI",
{onclick: "this.source.togglePaused(DwebTransports.refreshstatus);", source: t, name: t.name}, //TODO-SW figure out how t osend this back
t.name);
t.statuselement = el; // Save status element on transport
return el;
}))
);
}
await this.p_setup1(this.refreshstatus);
await this.p_setup2(this.refreshstatus);
debugtransports("Connection completed to %o", this._connected().map(t=>t.name))
debug("Connection completed to %o", this._connected().map(t=>t.name))
} catch(err) {
console.error("ERROR in p_connect:",err.message);
throw(err);
@ -710,8 +756,9 @@ class Transports {
*/
if (typeof url !== "string") url = Url.parse(url).href;
// In patterns below http or https; and :/ or :// are treated the same
const gateways = ["dweb.me", "ipfs.io"]; // Kniwn gateways, may dynamically load this at some point
const protocols = ["ipfs","gun","magnet","yjs","arc", "contenthash", "http", "https"];
const gateways = ["dweb.me", "ipfs.io"]; // Known gateways, may dynamically load this at some point
// SEE-OTHER-ADDTRANSPORT
const protocols = ["ipfs","gun","magnet","yjs","wolk","arc", "contenthash", "http", "https"];
const protocolsWantingDomains = ["arc", "http", "https"];
const gatewaypatts = [ // Must be before patts because gateway names often start with a valid proto
/^http[s]?:[/]+([^/]+)[/](\w+)[/](.*)/i, // https://(gateway)/proto/(internal) + gateway in list (IPFS gateways. dweb.me)

File diff suppressed because one or more lines are too long

View File

@ -34,6 +34,7 @@ async function loopfetch(req, ms, count, what) {
*/
let lasterr;
let loopguard = (typeof window != "undefined") && window.loopguard; // Optional global parameter, will cancel any loops if changes
count = count || 1; // count of 0 actually means 1
while (count-- && (loopguard === ((typeof window != "undefined") && window.loopguard)) ) {
try {
return await fetch(req);
@ -41,7 +42,7 @@ async function loopfetch(req, ms, count, what) {
lasterr = err;
debug("Delaying %s by %d ms because %s", what, ms, err.message);
await new Promise(resolve => {setTimeout(() => { resolve(); },ms)})
ms = ms*(1+Math.random()); // Spread out delays incase all requesting same time
ms = Math.floor(ms*(1+Math.random())); // Spread out delays incase all requesting same time
}
}
console.warn("loopfetch of",what,"failed");
@ -53,7 +54,7 @@ async function loopfetch(req, ms, count, what) {
}
}
httptools.p_httpfetch = async function(httpurl, init, {wantstream=false}={}) { // Embrace and extend "fetch" to check result etc.
httptools.p_httpfetch = async function(httpurl, init, {wantstream=false, retries=undefined}={}) { // Embrace and extend "fetch" to check result etc.
/*
Fetch a url
@ -70,7 +71,7 @@ httptools.p_httpfetch = async function(httpurl, init, {wantstream=false}={}) { /
// Using window.fetch, because it doesn't appear to be in scope otherwise in the browser.
let req = new Request(httpurl, init);
//let response = await fetch(req);
let response = await loopfetch(req, 500, (init.method === "GET") ? ( init.count || 12) : 1, "fetching "+httpurl);
let response = await loopfetch(req, 500, retries, "fetching "+httpurl);
// fetch throws (on Chrome, untested on Firefox or Node) TypeError: Failed to fetch)
// Note response.body gets a stream and response.blob gets a blob and response.arrayBuffer gets a buffer.
if (response.ok) {
@ -104,12 +105,14 @@ httptools.p_GET = function(httpurl, opts={}, cb) { //TODO-API rearranged and add
opts {
start, end, // Range of bytes wanted - inclusive i.e. 0,1023 is 1024 bytes
wantstream, // Return a stream rather than data
retries=12, // How many times to retry
}
returns result via promise or cb(err, result)
*/
if (typeof opts === "function") { cb = opts; opts = {}; }
let headers = new Headers();
if (opts.start || opts.end) headers.append("range", `bytes=${opts.start || 0}-${(opts.end<Infinity) ? opts.end : ""}`);
const retries = typeof opts.retries === "undefined" ? 12 : opts.retries;
let init = { //https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch
method: 'GET',
headers: headers,
@ -118,20 +121,21 @@ httptools.p_GET = function(httpurl, opts={}, cb) { //TODO-API rearranged and add
redirect: 'follow', // Chrome defaults to manual
keepalive: true // Keep alive - mostly we'll be going back to same places a lot
};
const prom = httptools.p_httpfetch(httpurl, init, {wantstream: opts.wantstream}); // This s a real http url
const prom = httptools.p_httpfetch(httpurl, init, {retries, wantstream: opts.wantstream}); // This s a real http url
//if (cb) { prom.then((res)=>cb(null,res)).catch((err) => cb(err)); } else { return prom; } // Unpromisify pattern v3
//if (cb) { prom.catch((err) => cb(err)).then((res)=>cb(null,res)).catch((err) => debug("Uncaught error %O",err)); } else { return prom; } // Unpromisify pattern v4
if (cb) { prom.then((res)=>{ try { cb(null,res)} catch(err) { debug("Uncaught error %O",err)}}).catch((err) => cb(err)); } else { return prom; } // Unpromisify pattern v5
}
httptools.p_POST = function(httpurl, opts={}, cb) { //TODO-API rearranged and addded cb
/* Locate and return a block, based on its url
opts = { data, contenttype }
opts = { data, contenttype, retries }
returns result via promise or cb(err, result)
*/
// Throws TransportError if fails
//let headers = new window.Headers();
//headers.set('content-type',type); Doesn't work, it ignores it
if (typeof opts === "function") { cb = opts; opts = {}; }
const retries = typeof opts.retries === "undefined" ? 0 : opts.retries;
let init = {
//https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/fetch
//https://developer.mozilla.org/en-US/docs/Glossary/Forbidden_header_name for headers tat cant be set
@ -145,7 +149,7 @@ httptools.p_POST = function(httpurl, opts={}, cb) { //TODO-API rearranged and ad
keepalive: false // Keep alive - mostly we'll be going back to same places a lot
};
if (opts.contenttype) init.headers["Content-Type"] = opts.contenttype;
const prom = httptools.p_httpfetch(httpurl, init);
const prom = httptools.p_httpfetch(httpurl, init, {retries});
if (cb) { prom.then((res)=>cb(null,res)).catch((err) => cb(err)); } else { return prom; } // Unpromisify pattern v3
}

View File

@ -6,6 +6,7 @@ require("./TransportHTTP.js"); // Can access via window.DwebTransports._transp
require("./TransportIPFS.js");
require("./TransportYJS.js");
require("./TransportWEBTORRENT.js");
require("./TransportWOLK.js");
require("./TransportGUN.js");
if (typeof window !== "undefined") { window.DwebTransports = DwebTransports; }
exports = module.exports = DwebTransports;

View File

@ -11,16 +11,18 @@
"url": "https://github.com/internetarchive/dweb-transports/issues"
},
"dependencies": {
"canonical-json": "latest",
"canonical-json": "^0.0.4",
"cids": "^0.5.7",
"debug": "^4.1.1",
"gun": "^0.9.9999991",
"ipfs": "^0.34.4",
"ipfs-http-client": "^29.1.1",
"ipfs-unixfs": "^0.1.16",
"node-fetch": "^2.3.0",
"readable-stream": "^3.1.1",
"readable-stream": "^3.3.0",
"webpack": "^4.29.3",
"webtorrent": "git://github.com/jhiesey/webtorrent.git#rescan",
"wolkjs": "git://github.com/wolkdb/wolkjs.git#master",
"y-array": "^10.1.4",
"y-indexeddb": "^8.1.9",
"y-ipfs-connector": "^2.3.0",
@ -51,5 +53,5 @@
"test": "cd src; node ./test.js",
"help": "echo 'test (test it)'; echo 'build (creates dweb-transports-bundle)'"
},
"version": "0.1.37"
"version": "0.1.42"
}

View File

@ -86,7 +86,7 @@ p_test({transport: ["GUN"])
.then(() => test_transports);
*/
/*
// Intentionally testing this with no connection
const sampleMagnetURL = "magnet:?xt=urn:btih:465HQWPEN374LABVHUBUPBUX4WZU6HDS&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Fdweb.archive.org%3A6969&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&tr=wss%3A%2F%2Ftracker.fastcast.nz&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Ffav-mitra/fav-mitra_members.json";
const sampleMagnetURLMirrorresolve = "http://localhost:4244/magnet/?xt=urn:btih:465HQWPEN374LABVHUBUPBUX4WZU6HDS&tr=http%3A%2F%2Fbt1.archive.org%3A6969%2Fannounce&tr=http%3A%2F%2Fbt2.archive.org%3A6969%2Fannounce&tr=wss%3A%2F%2Fdweb.archive.org%3A6969&tr=wss%3A%2F%2Ftracker.btorrent.xyz&tr=wss%3A%2F%2Ftracker.openwebtorrent.com&tr=wss%3A%2F%2Ftracker.fastcast.nz&ws=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Fdownload%2F&xs=https%3A%2F%2Fdweb.me%2Farc%2Farchive.org%2Ftorrent%2Ffav-mitra/fav-mitra_members.json"
@ -107,3 +107,4 @@ tests.forEach(t => {
res = DwebTransports.gatewayUrl(url); console.assert( res === t.gw, "GatewayURL:", url, t.gw,"!==", res);
res = DwebTransports.p_resolveNames([url]).then(res => console.assert(res[0] === t.resolveM, "Resolve with Mirror", url, t.resolveM, "!==", res ))
})
*/

View File

@ -1,4 +1,6 @@
const UglifyJsPlugin = require('uglifyjs-webpack-plugin');
const webpack = require('webpack'); //to access built-in plugins
module.exports = {
entry: {
'dweb-transports': './index.js',
@ -20,9 +22,16 @@ module.exports = {
console: false
},
plugins: [
new webpack.EnvironmentPlugin({
WOLK_ENV: 'idb',
})
],
resolve: {
alias: {
zlib: 'browserify-zlib-next'
zlib: 'browserify-zlib-next',
zlib: 'zlib'
}
},
optimization: {