diff --git a/examples/custom-ipfs-repo/package.json b/examples/custom-ipfs-repo/package.json index 49fa3f558d..909d26dc5c 100644 --- a/examples/custom-ipfs-repo/package.json +++ b/examples/custom-ipfs-repo/package.json @@ -12,7 +12,7 @@ "dependencies": { "datastore-fs": "^0.9.1", "ipfs": "^0.41.0", - "ipfs-repo": "^0.30.1", + "ipfs-repo": "github:ipfs/js-ipfs-repo#store-pins-in-datastore", "it-all": "^1.0.1" }, "devDependencies": { diff --git a/packages/interface-ipfs-core/SPEC/PIN.md b/packages/interface-ipfs-core/SPEC/PIN.md index 0958ce80b7..1de696b088 100644 --- a/packages/interface-ipfs-core/SPEC/PIN.md +++ b/packages/interface-ipfs-core/SPEC/PIN.md @@ -8,11 +8,11 @@ > Adds an IPFS object to the pinset and also stores it to the IPFS repo. pinset is the set of hashes currently pinned (not gc'able). -##### `ipfs.pin.add(hash, [options])` +##### `ipfs.pin.add(source, [options])` Where: -- `hash` is an IPFS multihash. +- `source` is a [CID], an array of CIDs or an (async) iterable that yields CIDs - `options` is an object that can contain the following keys - `recursive` (`boolean`) - Recursively pin the object linked. Type: bool. Default: `true` - `timeout` (`number`|`string`) - Throw an error if the request does not complete within the specified milliseconds timeout. If `timeout` is a string, the value is parsed as a [human readable duration](https://www.npmjs.com/package/parse-duration). There is no timeout by default. @@ -21,9 +21,9 @@ Where: | Type | Description | | -------- | -------- | -| `Promise<{ cid: CID }>` | An array of objects that represent the files that were pinned | +| `AsyncIterable` | An async iterable that yields objects containing the CIDs that were pinned | -an array of objects is returned, each of the form: +Each yielded object has the form: ```JavaScript { @@ -77,10 +77,10 @@ A great source of [examples][] can be found in the tests for this API. > Remove a hash from the pinset -##### `ipfs.pin.rm(hash, [options])` +##### `ipfs.pin.rm(source, [options])` Where: -- `hash` is a multihash. +- `source` is a [CID], an array of CIDs or an (async) iterable that yields CIDs - `options` is an object that can contain the following keys - 'recursive' - Recursively unpin the object linked. Type: bool. Default: `true` @@ -88,15 +88,16 @@ Where: | Type | Description | | -------- | -------- | -| `Promise<{ cid: CID }>` | An array of unpinned objects | +| `AsyncIterable<{ cid: CID }>` | An async iterable that yields objects containing the CIDs that were unpinned | **Example:** ```JavaScript -const pinset = await ipfs.pin.rm('QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u') -console.log(pinset) +for await (const unpinned of ipfs.pin.rm(new CID('QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u'))) { + console.log(unpinned) +} // prints the hashes that were unpinned -// [ { cid: CID('QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u') } ] +// { cid: CID('QmWATWQ7fVPP2EFGu71UkfnqhYXDYH566qy47CnJDgvs8u') } ``` A great source of [examples][] can be found in the tests for this API. diff --git a/packages/interface-ipfs-core/package.json b/packages/interface-ipfs-core/package.json index eb111e5028..78ce77471d 100644 --- a/packages/interface-ipfs-core/package.json +++ b/packages/interface-ipfs-core/package.json @@ -45,6 +45,7 @@ "is-ipfs": "^0.6.1", "it-all": "^1.0.1", "it-concat": "^1.0.0", + "it-drain": "^1.0.0", "it-last": "^1.0.1", "it-pushable": "^1.3.1", "multiaddr": "^7.2.1", diff --git a/packages/interface-ipfs-core/src/block/rm.js b/packages/interface-ipfs-core/src/block/rm.js index 37cd17d98f..ee43bdf203 100644 --- a/packages/interface-ipfs-core/src/block/rm.js +++ b/packages/interface-ipfs-core/src/block/rm.js @@ -4,6 +4,8 @@ const { getDescribe, getIt, expect } = require('../utils/mocha') const hat = require('hat') const all = require('it-all') +const last = require('it-last') +const drain = require('it-drain') /** @typedef { import("ipfsd-ctl/src/factory") } Factory */ /** @@ -142,13 +144,12 @@ module.exports = (common, options) => { format: 'raw', hashAlg: 'sha2-256' }) - await ipfs.pin.add(cid.toString()) + await drain(ipfs.pin.add(cid)) - const result = await all(ipfs.block.rm(cid)) + const result = await last(ipfs.block.rm(cid)) - expect(result).to.be.an('array').and.to.have.lengthOf(1) - expect(result[0]).to.have.property('error') - expect(result[0].error.message).to.include('pinned') + expect(result).to.have.property('error').that.is.an('Error') + .with.property('message').that.includes('pinned') }) }) } diff --git a/packages/interface-ipfs-core/src/pin/add.js b/packages/interface-ipfs-core/src/pin/add.js index 5e4cb2f3db..6c2e3559a4 100644 --- a/packages/interface-ipfs-core/src/pin/add.js +++ b/packages/interface-ipfs-core/src/pin/add.js @@ -28,8 +28,8 @@ module.exports = (common, options) => { after(() => common.clean()) it('should add a pin', async () => { - const pinset = await ipfs.pin.add(fixtures.files[0].cid, { recursive: false }) - expect(pinset.map(p => p.cid.toString())).to.include(fixtures.files[0].cid) + const pinset = await all(ipfs.pin.add(fixtures.files[0].cid, { recursive: false })) + expect(pinset.map(p => p.cid)).to.deep.include(fixtures.files[0].cid) }) }) } diff --git a/packages/interface-ipfs-core/src/pin/index.js b/packages/interface-ipfs-core/src/pin/index.js index b3723d07c5..ad97a96df0 100644 --- a/packages/interface-ipfs-core/src/pin/index.js +++ b/packages/interface-ipfs-core/src/pin/index.js @@ -4,7 +4,8 @@ const { createSuite } = require('../utils/suite') const tests = { ls: require('./ls'), rm: require('./rm'), - add: require('./add') + add: require('./add'), + core: require('./pins') } module.exports = createSuite(tests) diff --git a/packages/interface-ipfs-core/src/pin/ls.js b/packages/interface-ipfs-core/src/pin/ls.js index ffdfe7986b..29d21f2e9a 100644 --- a/packages/interface-ipfs-core/src/pin/ls.js +++ b/packages/interface-ipfs-core/src/pin/ls.js @@ -4,6 +4,7 @@ const { fixtures } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') const all = require('it-all') +const drain = require('it-drain') /** @typedef { import("ipfsd-ctl/src/factory") } Factory */ /** @@ -23,22 +24,21 @@ module.exports = (common, options) => { ipfs = (await common.spawn()).api // two files wrapped in directories, only root CID pinned recursively const dir = fixtures.directory.files.map((file) => ({ path: file.path, content: file.data })) - await all(ipfs.add(dir, { pin: false, cidVersion: 0 })) - await ipfs.pin.add(fixtures.directory.cid, { recursive: true }) + await drain(ipfs.add(dir, { pin: false, cidVersion: 0 })) + await drain(ipfs.pin.add(fixtures.directory.cid, { recursive: true })) // a file (CID pinned recursively) - await all(ipfs.add(fixtures.files[0].data, { pin: false, cidVersion: 0 })) - await ipfs.pin.add(fixtures.files[0].cid, { recursive: true }) + await drain(ipfs.add(fixtures.files[0].data, { pin: false, cidVersion: 0 })) + await drain(ipfs.pin.add(fixtures.files[0].cid, { recursive: true })) // a single CID (pinned directly) - await all(ipfs.add(fixtures.files[1].data, { pin: false, cidVersion: 0 })) - await ipfs.pin.add(fixtures.files[1].cid, { recursive: false }) + await drain(ipfs.add(fixtures.files[1].data, { pin: false, cidVersion: 0 })) + await drain(ipfs.pin.add(fixtures.files[1].cid, { recursive: false })) }) after(() => common.clean()) // 1st, because ipfs.add pins automatically it('should list all recursive pins', async () => { - const pinset = (await all(ipfs.pin.ls({ type: 'recursive' }))) - .map(p => ({ ...p, cid: p.cid.toString() })) + const pinset = await all(ipfs.pin.ls({ type: 'recursive' })) expect(pinset).to.deep.include({ type: 'recursive', @@ -51,8 +51,7 @@ module.exports = (common, options) => { }) it('should list all indirect pins', async () => { - const pinset = (await all(ipfs.pin.ls({ type: 'indirect' }))) - .map(p => ({ ...p, cid: p.cid.toString() })) + const pinset = await all(ipfs.pin.ls({ type: 'indirect' })) expect(pinset).to.not.deep.include({ type: 'recursive', @@ -77,8 +76,7 @@ module.exports = (common, options) => { }) it('should list all types of pins', async () => { - const pinset = (await all(ipfs.pin.ls())) - .map(p => ({ ...p, cid: p.cid.toString() })) + const pinset = await all(ipfs.pin.ls()) expect(pinset).to.not.be.empty() // check the three "roots" @@ -107,15 +105,19 @@ module.exports = (common, options) => { it('should list all direct pins', async () => { const pinset = await all(ipfs.pin.ls({ type: 'direct' })) expect(pinset).to.have.lengthOf(1) - expect(pinset[0].type).to.equal('direct') - expect(pinset[0].cid.toString()).to.equal(fixtures.files[1].cid) + expect(pinset).to.deep.include({ + type: 'direct', + cid: fixtures.files[1].cid + }) }) it('should list pins for a specific hash', async () => { const pinset = await all(ipfs.pin.ls(fixtures.files[0].cid)) expect(pinset).to.have.lengthOf(1) - expect(pinset[0].type).to.equal('recursive') - expect(pinset[0].cid.toString()).to.equal(fixtures.files[0].cid) + expect(pinset).to.deep.include({ + type: 'recursive', + cid: fixtures.files[0].cid + }) }) it('should throw an error on missing direct pins for existing path', () => { @@ -136,22 +138,26 @@ module.exports = (common, options) => { it('should list indirect pins for a specific path', async () => { const pinset = await all(ipfs.pin.ls(`/ipfs/${fixtures.directory.cid}/files/ipfs.txt`, { type: 'indirect' })) expect(pinset).to.have.lengthOf(1) - expect(pinset[0].type).to.equal(`indirect through ${fixtures.directory.cid}`) - expect(pinset[0].cid.toString()).to.equal(fixtures.directory.files[1].cid) + expect(pinset).to.deep.include({ + type: `indirect through ${fixtures.directory.cid}`, + cid: fixtures.directory.files[1].cid + }) }) it('should list recursive pins for a specific hash', async () => { const pinset = await all(ipfs.pin.ls(fixtures.files[0].cid, { type: 'recursive' })) expect(pinset).to.have.lengthOf(1) - expect(pinset[0].type).to.equal('recursive') - expect(pinset[0].cid.toString()).to.equal(fixtures.files[0].cid) + expect(pinset).to.deep.include({ + type: 'recursive', + cid: fixtures.files[0].cid + }) }) it('should list pins for multiple CIDs', async () => { const pinset = await all(ipfs.pin.ls([fixtures.files[0].cid, fixtures.files[1].cid])) - const cids = pinset.map(p => p.cid.toString()) - expect(cids).to.include(fixtures.files[0].cid) - expect(cids).to.include(fixtures.files[1].cid) + const cids = pinset.map(p => p.cid) + expect(cids).to.deep.include(fixtures.files[0].cid) + expect(cids).to.deep.include(fixtures.files[1].cid) }) }) } diff --git a/packages/interface-ipfs-core/src/pin/pins.js b/packages/interface-ipfs-core/src/pin/pins.js new file mode 100644 index 0000000000..a171d0b91c --- /dev/null +++ b/packages/interface-ipfs-core/src/pin/pins.js @@ -0,0 +1,460 @@ +/* eslint-env mocha */ +'use strict' + +const { getDescribe, getIt, expect } = require('../utils/mocha') +const { + DAGNode +} = require('ipld-dag-pb') +const all = require('it-all') +const last = require('it-last') +const drain = require('it-drain') +const CID = require('cids') + +// fixture structure: +// planets/ +// solar-system.md +// mercury/ +// wiki.md +/* const pins = { + root: new CID('QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys'), + solarWiki: new CID('QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG'), + mercuryDir: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q'), + mercuryWiki: new CID('QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi') +} */ +let pins +const pinTypes = { + direct: 'direct', + recursive: 'recursive', + indirect: 'indirect', + all: 'all' +} + +/** @typedef { import("ipfsd-ctl/src/factory") } Factory */ +/** + * @param {Factory} common + * @param {Object} options + */ +module.exports = (common, options) => { + const describe = getDescribe(options) + const it = getIt(options) + + describe('pin', function () { + this.timeout(50 * 1000) + + const fixtures = [{ + path: 'planets/mercury/wiki.md', + content: 'solar system content' + }, { + path: 'planets/solar-system.md', + content: 'wiki content' + }] + + let ipfs + + async function isPinnedWithType (path, type) { + try { + for await (const _ of ipfs.pin.ls(path, { type })) { // eslint-disable-line no-unused-vars + return true + } + return false + } catch (err) { + return false + } + } + + async function expectPinned (cid, type = pinTypes.all, pinned = true) { + if (typeof type === 'boolean') { + pinned = type + type = pinTypes.all + } + + const result = await isPinnedWithType(cid, type) + expect(result).to.eql(pinned) + } + + async function clearPins () { + for await (const { cid } of ipfs.pin.ls({ type: pinTypes.recursive })) { + await drain(ipfs.pin.rm(cid)) + } + + for await (const { cid } of ipfs.pin.ls({ type: pinTypes.direct })) { + await drain(ipfs.pin.rm(cid)) + } + } + + before(async () => { + ipfs = (await common.spawn()).api + const added = (await all(ipfs.add(fixtures))).reduce((acc, curr) => { + acc[curr.path] = curr.cid + + return acc + }, {}) + + pins = { + root: added.planets, + solarWiki: added['planets/solar-system.md'], + mercuryDir: added['planets/mercury'], + mercuryWiki: added['planets/mercury/wiki.md'] + } + }) + + after(() => common.clean()) + + describe('pinned status', function () { + beforeEach(async () => { + await clearPins() + await drain(ipfs.pin.add(pins.root)) + }) + + it('should be pinned when added', async () => { + await drain(ipfs.pin.add(pins.solarWiki)) + return expectPinned(pins.solarWiki) + }) + + it('should not be pinned when not in datastore', () => { + const falseHash = `${`${pins.root}`.slice(0, -2)}ss` + return expectPinned(falseHash, false) + }) + + it('should not be pinned when in datastore but not added', async () => { + await drain(ipfs.pin.rm(pins.root)) + return expectPinned(pins.root, false) + }) + + it('should be pinned recursively when added', () => { + return expectPinned(pins.root, pinTypes.recursive) + }) + + it('should be pinned indirectly', () => { + return expectPinned(pins.mercuryWiki, pinTypes.indirect) + }) + + it('should be pinned directly', async () => { + await drain(ipfs.pin.add(pins.mercuryDir, { recursive: false })) + return expectPinned(pins.mercuryDir, pinTypes.direct) + }) + + it('should not be pinned when not in datastore or added', async () => { + await clearPins() + return expectPinned(pins.mercuryDir, pinTypes.direct, false) + }) + }) + + describe('add', function () { + beforeEach(function () { + return clearPins() + }) + + it('should add recursively', async () => { + await drain(ipfs.pin.add(pins.root)) + await expectPinned(pins.root, pinTypes.recursive) + + const pinChecks = Object.values(pins).map(hash => expectPinned(hash)) + return Promise.all(pinChecks) + }) + + it('should add directly', async () => { + await drain(ipfs.pin.add(pins.root, { recursive: false })) + await Promise.all([ + expectPinned(pins.root, pinTypes.direct), + expectPinned(pins.solarWiki, false) + ]) + }) + + it('should recursively pin parent of direct pin', async () => { + await drain(ipfs.pin.add(pins.solarWiki, { recursive: false })) + await drain(ipfs.pin.add(pins.root)) + await Promise.all([ + // solarWiki is pinned both directly and indirectly o.O + expectPinned(pins.solarWiki, pinTypes.direct), + expectPinned(pins.solarWiki, pinTypes.indirect) + ]) + }) + + it('should fail to directly pin a recursive pin', async () => { + await drain(ipfs.pin.add(pins.root)) + return expect(last(ipfs.pin.add(pins.root, { recursive: false }))) + .to.eventually.be.rejected() + .with(/already pinned recursively/) + }) + + it('should fail to pin a hash not in datastore', function () { + this.timeout(5 * 1000) + const falseHash = `${`${pins.root}`.slice(0, -2)}ss` + return expect(last(ipfs.pin.add(falseHash, { timeout: '2s' }))) + .to.eventually.be.rejected() + // TODO: http api TimeoutErrors do not have this property + // .with.a.property('code').that.equals('ERR_TIMEOUT') + }) + + // TODO block rm breaks subsequent tests + // it.skip('needs all children in datastore to pin recursively', () => { + // return ipfs.block.rm(pins.mercuryWiki) + // .then(() => expectTimeout(pin.add(pins.root), 4000)) + // }) + }) + + describe('ls', function () { + before(async () => { + await clearPins() + await drain(ipfs.pin.add(pins.root)) + await drain(ipfs.pin.add(pins.mercuryDir, { recursive: false })) + }) + + it('should list pins of a particular CID', async () => { + const out = await all(ipfs.pin.ls(pins.mercuryDir)) + expect(out[0].cid).to.deep.equal(pins.mercuryDir) + expect(out[0].type).to.eql(pinTypes.direct) + }) + + it('should list indirect pins that supersede direct pins', async () => { + const ls = await all(ipfs.pin.ls()) + + const pinType = ls.find(out => out.cid.equals(pins.mercuryDir)).type + expect(pinType).to.eql(pinTypes.indirect) + }) + + it('should list all pins', async () => { + const out = await all(ipfs.pin.ls()) + + expect(out).to.deep.include.members([ + { + type: 'recursive', + cid: new CID(pins.root) + }, + { + type: 'indirect', + cid: new CID(pins.solarWiki) + }, + { + type: 'indirect', + cid: new CID(pins.mercuryDir) + }, + { + type: 'indirect', + cid: new CID(pins.mercuryWiki) + } + ]) + }) + + it('should list all direct pins', async () => { + const out = await all(ipfs.pin.ls({ type: 'direct' })) + + expect(out).to.deep.include.members([ + { + type: 'direct', + cid: new CID(pins.mercuryDir) + } + ]) + }) + + it('should list all recursive pins', async () => { + const out = await all(ipfs.pin.ls({ type: 'recursive' })) + + expect(out).to.deep.include.members([ + { + type: 'recursive', + cid: new CID(pins.root) + } + ]) + }) + + it('should list all indirect pins', async () => { + const out = await all(ipfs.pin.ls({ type: 'indirect' })) + + expect(out).to.deep.include.members([ + { + type: 'indirect', + cid: new CID(pins.solarWiki) + }, + { + type: 'indirect', + cid: new CID(pins.mercuryDir) + }, + { + type: 'indirect', + cid: new CID(pins.mercuryWiki) + } + ]) + }) + + it('should list direct pins for CID', async () => { + const out = await all(ipfs.pin.ls(pins.mercuryDir, { type: 'direct' })) + + expect(out).to.have.deep.members([ + { + type: 'direct', + cid: new CID(pins.mercuryDir) + } + ]) + }) + + it('should list direct pins for path', async () => { + const out = await all(ipfs.pin.ls(`/ipfs/${pins.root}/mercury/`, { type: 'direct' })) + + expect(out).to.have.deep.members([ + { + type: 'direct', + cid: new CID(pins.mercuryDir) + } + ]) + }) + + it('should list direct pins for path (no match)', () => { + return expect(all(ipfs.pin.ls(`/ipfs/${pins.root}/mercury/wiki.md`, { type: 'direct' }))) + .to.eventually.be.rejected() + }) + + it('should list direct pins for CID (no match)', () => { + return expect(all(ipfs.pin.ls(pins.root, { type: 'direct' }))) + .to.eventually.be.rejected() + }) + + it('should list recursive pins for CID', async () => { + const out = await all(ipfs.pin.ls(pins.root, { type: 'recursive' })) + + expect(out).to.have.deep.members([ + { + type: 'recursive', + cid: new CID(pins.root) + } + ]) + }) + + it('should list recursive pins for CID (no match)', () => { + return expect(all(ipfs.pin.ls(pins.mercuryDir, { type: 'recursive' }))) + .to.eventually.be.rejected() + }) + + it('should list indirect pins for CID', async () => { + const out = await all(ipfs.pin.ls(pins.solarWiki, { type: 'indirect' })) + + expect(out).to.have.deep.members([ + { + type: `indirect through ${pins.root}`, + cid: new CID(pins.solarWiki) + } + ]) + }) + + it('should list indirect pins for CID (no match)', () => { + return expect(all(ipfs.pin.ls(pins.root, { type: 'indirect' }))) + .to.eventually.be.rejected() + }) + }) + + describe('rm', function () { + beforeEach(async () => { + await clearPins() + await drain(ipfs.pin.add(pins.root)) + }) + + it('should remove a recursive pin', async () => { + await drain(ipfs.pin.rm(pins.root)) + await Promise.all([ + expectPinned(pins.root, false), + expectPinned(pins.mercuryWiki, false) + ]) + }) + + it('should remove a direct pin', async () => { + await clearPins() + await drain(ipfs.pin.add(pins.mercuryDir, { recursive: false })) + await drain(ipfs.pin.rm(pins.mercuryDir)) + await expectPinned(pins.mercuryDir, false) + }) + + it('should fail to remove an indirect pin', async () => { + await expect(last(ipfs.pin.rm(pins.solarWiki))) + .to.eventually.be.rejected() + .with(/is pinned indirectly under/) + await expectPinned(pins.solarWiki) + }) + + it('should fail when an item is not pinned', async () => { + await drain(ipfs.pin.rm(pins.root)) + await expect(last(ipfs.pin.rm(pins.root))) + .to.eventually.be.rejected() + .with(/is not pinned/) + }) + }) + + describe('non-dag-pb nodes', function () { + it('should pin dag-cbor', async () => { + const cid = await ipfs.dag.put({}, { + format: 'dag-cbor', + hashAlg: 'sha2-256' + }) + + await drain(ipfs.pin.add(cid)) + + const pins = await all(ipfs.pin.ls()) + + expect(pins).to.deep.include({ + type: 'recursive', + cid + }) + }) + + it('should pin raw', async () => { + const cid = await ipfs.dag.put(Buffer.alloc(0), { + format: 'raw', + hashAlg: 'sha2-256' + }) + + await drain(ipfs.pin.add(cid)) + + const pins = await all(ipfs.pin.ls()) + + expect(pins).to.deep.include({ + type: 'recursive', + cid + }) + }) + + it('should pin dag-cbor with dag-pb child', async () => { + const child = await ipfs.dag.put(new DAGNode(Buffer.from(`${Math.random()}`)), { + format: 'dag-pb', + hashAlg: 'sha2-256' + }) + const parent = await ipfs.dag.put({ + child + }, { + format: 'dag-cbor', + hashAlg: 'sha2-256' + }) + + await drain(ipfs.pin.add(parent, { + recursive: true + })) + + const pins = await all(ipfs.pin.ls()) + + expect(pins).to.deep.include({ + cid: parent, + type: 'recursive' + }) + expect(pins).to.deep.include({ + cid: child, + type: 'indirect' + }) + }) + }) + + describe('ls', () => { + it('should throw error for invalid non-string pin type option', () => { + return expect(all(ipfs.pin.ls({ type: 6 }))) + .to.eventually.be.rejected() + // TODO: go-ipfs does not return error codes + // .with.property('code').that.equals('ERR_INVALID_PIN_TYPE') + }) + + it('should throw error for invalid string pin type option', () => { + return expect(all(ipfs.pin.ls({ type: '__proto__' }))) + .to.eventually.be.rejected() + // TODO: go-ipfs does not return error codes + // .with.property('code').that.equals('ERR_INVALID_PIN_TYPE') + }) + }) + }) +} diff --git a/packages/interface-ipfs-core/src/pin/rm.js b/packages/interface-ipfs-core/src/pin/rm.js index 7565381687..91bc802ea5 100644 --- a/packages/interface-ipfs-core/src/pin/rm.js +++ b/packages/interface-ipfs-core/src/pin/rm.js @@ -4,6 +4,7 @@ const { fixtures } = require('./utils') const { getDescribe, getIt, expect } = require('../utils/mocha') const all = require('it-all') +const drain = require('it-drain') /** @typedef { import("ipfsd-ctl/src/factory") } Factory */ /** @@ -20,31 +21,31 @@ module.exports = (common, options) => { let ipfs before(async () => { ipfs = (await common.spawn()).api - await all(ipfs.add(fixtures.files[0].data, { pin: false })) - await ipfs.pin.add(fixtures.files[0].cid, { recursive: true }) - await all(ipfs.add(fixtures.files[1].data, { pin: false })) - await ipfs.pin.add(fixtures.files[1].cid, { recursive: false }) + await drain(ipfs.add(fixtures.files[0].data, { pin: false })) + await drain(ipfs.pin.add(fixtures.files[0].cid, { recursive: true })) + await drain(ipfs.add(fixtures.files[1].data, { pin: false })) + await drain(ipfs.pin.add(fixtures.files[1].cid, { recursive: false })) }) after(() => common.clean()) it('should remove a recursive pin', async () => { - const removedPinset = await ipfs.pin.rm(fixtures.files[0].cid, { recursive: true }) - expect(removedPinset.map(p => p.cid.toString())).to.deep.equal([fixtures.files[0].cid]) + const removedPinset = await all(ipfs.pin.rm(fixtures.files[0].cid, { recursive: true })) + expect(removedPinset.map(p => p.cid)).to.deep.equal([fixtures.files[0].cid]) const pinset = await all(ipfs.pin.ls({ type: 'recursive' })) - expect(pinset.map(p => ({ ...p, cid: p.cid.toString() }))).to.not.deep.include({ + expect(pinset).to.not.deep.include({ cid: fixtures.files[0].cid, type: 'recursive' }) }) it('should remove a direct pin', async () => { - const removedPinset = await ipfs.pin.rm(fixtures.files[1].cid, { recursive: false }) - expect(removedPinset.map(p => p.cid.toString())).to.deep.equal([fixtures.files[1].cid]) + const removedPinset = await all(ipfs.pin.rm(fixtures.files[1].cid, { recursive: false })) + expect(removedPinset.map(p => p.cid)).to.deep.equal([fixtures.files[1].cid]) const pinset = await all(ipfs.pin.ls({ type: 'direct' })) - expect(pinset.map(p => p.cid.toString())).to.not.include(fixtures.files[1].cid) + expect(pinset.map(p => p.cid)).to.not.deep.include(fixtures.files[1].cid) }) }) } diff --git a/packages/interface-ipfs-core/src/pin/utils.js b/packages/interface-ipfs-core/src/pin/utils.js index 3ee67f0d53..34b33eee6b 100644 --- a/packages/interface-ipfs-core/src/pin/utils.js +++ b/packages/interface-ipfs-core/src/pin/utils.js @@ -1,26 +1,27 @@ 'use strict' const loadFixture = require('aegir/fixtures') +const CID = require('cids') exports.fixtures = Object.freeze({ // NOTE: files under 'directory' need to be different than standalone ones in 'files' directory: Object.freeze({ - cid: 'QmY8KdYQSYKFU5hM7F5ioZ5yYSgV5VZ1kDEdqfRL3rFgcd', + cid: new CID('QmY8KdYQSYKFU5hM7F5ioZ5yYSgV5VZ1kDEdqfRL3rFgcd'), files: Object.freeze([Object.freeze({ path: 'test-folder/ipfs-add.js', data: loadFixture('test/fixtures/test-folder/ipfs-add.js', 'interface-ipfs-core'), - cid: 'QmbKtKBrmeRHjNCwR4zAfCJdMVu6dgmwk9M9AE9pUM9RgG' + cid: new CID('QmbKtKBrmeRHjNCwR4zAfCJdMVu6dgmwk9M9AE9pUM9RgG') }), Object.freeze({ path: 'test-folder/files/ipfs.txt', data: loadFixture('test/fixtures/test-folder/files/ipfs.txt', 'interface-ipfs-core'), - cid: 'QmdFyxZXsFiP4csgfM5uPu99AvFiKH62CSPDw5TP92nr7w' + cid: new CID('QmdFyxZXsFiP4csgfM5uPu99AvFiKH62CSPDw5TP92nr7w') })]) }), files: Object.freeze([Object.freeze({ data: loadFixture('test/fixtures/testfile.txt', 'interface-ipfs-core'), - cid: 'Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP' + cid: new CID('Qma4hjFTnCasJ8PVp3mZbZK5g2vGDT4LByLJ7m8ciyRFZP') }), Object.freeze({ data: loadFixture('test/fixtures/test-folder/files/hello.txt', 'interface-ipfs-core'), - cid: 'QmY9cxiHqTFoWamkQVkpmmqzBrY3hCBEL2XNu3NtX74Fuu' + cid: new CID('QmY9cxiHqTFoWamkQVkpmmqzBrY3hCBEL2XNu3NtX74Fuu') })]) }) diff --git a/packages/interface-ipfs-core/src/repo/gc.js b/packages/interface-ipfs-core/src/repo/gc.js index da4966250a..dcf55652ab 100644 --- a/packages/interface-ipfs-core/src/repo/gc.js +++ b/packages/interface-ipfs-core/src/repo/gc.js @@ -4,6 +4,7 @@ const { getDescribe, getIt, expect } = require('../utils/mocha') const { DAGNode } = require('ipld-dag-pb') const all = require('it-all') +const drain = require('it-drain') /** @typedef { import("ipfsd-ctl/src/factory") } Factory */ /** @@ -29,8 +30,8 @@ module.exports = (common, options) => { const pinset = await all(ipfs.pin.ls()) expect(pinset.map(obj => obj.cid.toString())).includes(res[0].cid.toString()) - await ipfs.pin.rm(res[0].cid) - await all(ipfs.repo.gc()) + await drain(ipfs.pin.rm(res[0].cid)) + await drain(ipfs.repo.gc()) const finalPinset = await all(ipfs.pin.ls()) expect(finalPinset.map(obj => obj.cid.toString())).not.includes(res[0].cid.toString()) @@ -53,7 +54,7 @@ module.exports = (common, options) => { expect(refsAfterAdd.map(r => r.ref)).includes(cid.toString()) // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // Get the list of local blocks after GC, should still contain the hash, // because the file is still pinned @@ -61,7 +62,7 @@ module.exports = (common, options) => { expect(refsAfterGc.map(r => r.ref)).includes(cid.toString()) // Unpin the data - await ipfs.pin.rm(cid) + await drain(ipfs.pin.rm(cid)) // Run garbage collection await all(ipfs.repo.gc()) @@ -88,7 +89,7 @@ module.exports = (common, options) => { expect(refsAfterAdd.map(r => r.ref)).includes(hash) // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // Get the list of local blocks after GC, should still contain the hash, // because the file is in MFS @@ -99,7 +100,7 @@ module.exports = (common, options) => { await ipfs.files.rm('/test') // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // The list of local blocks should no longer contain the hash const refsAfterUnpinAndGc = await all(ipfs.refs.local()) @@ -131,7 +132,7 @@ module.exports = (common, options) => { expect(hashesAfterAdd).includes(dataCid.toString()) // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // Get the list of local blocks after GC, should still contain the hash, // because the file is pinned and in MFS @@ -143,7 +144,7 @@ module.exports = (common, options) => { await ipfs.files.rm('/test') // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // Get the list of local blocks after GC, should still contain the hash, // because the file is still pinned @@ -153,10 +154,10 @@ module.exports = (common, options) => { expect(hashesAfterRmAndGc).includes(dataCid.toString()) // Unpin the data - await ipfs.pin.rm(dataCid) + await drain(ipfs.pin.rm(dataCid)) // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // The list of local blocks should no longer contain the hashes const refsAfterUnpinAndGc = await all(ipfs.refs.local()) @@ -174,7 +175,7 @@ module.exports = (common, options) => { const dataCid = addRes[0].cid // Unpin the data - await ipfs.pin.rm(dataCid) + await drain(ipfs.pin.rm(dataCid)) // Create a link to the data from an object const obj = await new DAGNode(Buffer.from('fruit'), [{ @@ -198,14 +199,14 @@ module.exports = (common, options) => { expect(hashesAfterAdd).includes(dataCid.toString()) // Recursively pin the object - await ipfs.pin.add(objCid, { recursive: true }) + await drain(ipfs.pin.add(objCid, { recursive: true })) // The data should now be indirectly pinned const pins = await all(ipfs.pin.ls()) expect(pins.find(p => p.cid.toString() === dataCid.toString()).type).to.eql('indirect') // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // Get the list of local blocks after GC, should still contain the data // hash, because the data is still (indirectly) pinned @@ -213,10 +214,10 @@ module.exports = (common, options) => { expect(refsAfterGc.map(r => r.ref)).includes(dataCid.toString()) // Recursively unpin the object - await ipfs.pin.rm(objCid.toString()) + await drain(ipfs.pin.rm(objCid.toString())) // Run garbage collection - await all(ipfs.repo.gc()) + await drain(ipfs.repo.gc()) // The list of local blocks should no longer contain the hashes const refsAfterUnpinAndGc = await all(ipfs.refs.local()) diff --git a/packages/ipfs-http-client/src/pin/add.js b/packages/ipfs-http-client/src/pin/add.js index 41b119fe87..b42cbb03fd 100644 --- a/packages/ipfs-http-client/src/pin/add.js +++ b/packages/ipfs-http-client/src/pin/add.js @@ -4,7 +4,7 @@ const CID = require('cids') const configure = require('../lib/configure') module.exports = configure(({ ky }) => { - return async (paths, options) => { + return async function * (paths, options) { paths = Array.isArray(paths) ? paths : [paths] options = options || {} @@ -19,6 +19,6 @@ module.exports = configure(({ ky }) => { searchParams }).json() - return (res.Pins || []).map(cid => ({ cid: new CID(cid) })) + yield * (res.Pins || []).map(cid => ({ cid: new CID(cid) })) } }) diff --git a/packages/ipfs-http-client/src/pin/rm.js b/packages/ipfs-http-client/src/pin/rm.js index 83fbca93c4..0a2d46a22b 100644 --- a/packages/ipfs-http-client/src/pin/rm.js +++ b/packages/ipfs-http-client/src/pin/rm.js @@ -4,11 +4,12 @@ const CID = require('cids') const configure = require('../lib/configure') module.exports = configure(({ ky }) => { - return async (path, options) => { + return async function * (paths, options) { + paths = Array.isArray(paths) ? paths : [paths] options = options || {} const searchParams = new URLSearchParams(options.searchParams) - searchParams.set('arg', `${path}`) + paths.forEach(path => searchParams.append('arg', `${path}`)) if (options.recursive != null) searchParams.set('recursive', options.recursive) const res = await ky.post('pin/rm', { @@ -18,6 +19,6 @@ module.exports = configure(({ ky }) => { searchParams }).json() - return (res.Pins || []).map(cid => ({ cid: new CID(cid) })) + yield * (res.Pins || []).map(cid => ({ cid: new CID(cid) })) } }) diff --git a/packages/ipfs-mfs/package.json b/packages/ipfs-mfs/package.json index 75023d1ee7..5d96d455a0 100644 --- a/packages/ipfs-mfs/package.json +++ b/packages/ipfs-mfs/package.json @@ -48,7 +48,7 @@ "form-data": "^3.0.0", "ipfs-block": "^0.8.1", "ipfs-block-service": "^0.16.0", - "ipfs-repo": "^0.30.1", + "ipfs-repo": "github:ipfs/js-ipfs-repo#store-pins-in-datastore", "ipld": "^0.25.0", "memdown": "^5.1.0", "nyc": "^15.0.0", @@ -73,8 +73,8 @@ "ipld-dag-pb": "^0.18.2", "it-all": "^1.0.1", "it-last": "^1.0.1", - "it-to-stream": "^0.1.1", "it-pipe": "^1.1.0", + "it-to-stream": "^0.1.1", "joi-browser": "^13.4.0", "mortice": "^2.0.0", "multicodec": "^1.0.0", diff --git a/packages/ipfs/package.json b/packages/ipfs/package.json index 84e5f1e69a..333633a58b 100644 --- a/packages/ipfs/package.json +++ b/packages/ipfs/package.json @@ -76,6 +76,7 @@ "bl": "^4.0.0", "bs58": "^4.0.1", "byteman": "^1.3.5", + "cbor": "^5.0.1", "cid-tool": "^0.4.0", "cids": "^0.7.3", "class-is": "^1.1.0", @@ -99,7 +100,7 @@ "ipfs-http-response": "^0.5.0", "ipfs-mfs": "^1.0.0", "ipfs-multipart": "^0.3.0", - "ipfs-repo": "^0.30.1", + "ipfs-repo": "github:ipfs/js-ipfs-repo#store-pins-in-datastore", "ipfs-unixfs": "^1.0.0", "ipfs-unixfs-exporter": "^1.0.1", "ipfs-unixfs-importer": "^1.0.1", @@ -117,6 +118,7 @@ "is-ipfs": "^0.6.1", "it-all": "^1.0.1", "it-concat": "^1.0.0", + "it-drain": "^1.0.0", "it-glob": "0.0.7", "it-last": "^1.0.1", "it-pipe": "^1.1.0", diff --git a/packages/ipfs/src/cli/commands/pin/add.js b/packages/ipfs/src/cli/commands/pin/add.js index b4e07130dd..a1273e9c33 100644 --- a/packages/ipfs/src/cli/commands/pin/add.js +++ b/packages/ipfs/src/cli/commands/pin/add.js @@ -25,9 +25,9 @@ module.exports = { async handler ({ ctx, ipfsPath, recursive, cidBase }) { const { ipfs, print } = ctx const type = recursive ? 'recursive' : 'direct' - const results = await ipfs.pin.add(ipfsPath, { recursive }) - results.forEach((res) => { + + for await (const res of ipfs.pin.add(ipfsPath, { recursive })) { print(`pinned ${cidToString(res.cid, { base: cidBase })} ${type}ly`) - }) + } } } diff --git a/packages/ipfs/src/cli/commands/pin/rm.js b/packages/ipfs/src/cli/commands/pin/rm.js index bcf2c438f8..7ad2a4b640 100644 --- a/packages/ipfs/src/cli/commands/pin/rm.js +++ b/packages/ipfs/src/cli/commands/pin/rm.js @@ -24,9 +24,9 @@ module.exports = { async handler ({ ctx, ipfsPath, recursive, cidBase }) { const { ipfs, print } = ctx - const results = await ipfs.pin.rm(ipfsPath, { recursive }) - results.forEach((res) => { + + for await (const res of ipfs.pin.rm(ipfsPath, { recursive })) { print(`unpinned ${cidToString(res.cid, { base: cidBase })}`) - }) + } } } diff --git a/packages/ipfs/src/core/components/add/index.js b/packages/ipfs/src/core/components/add/index.js index 40e69a7fe0..1c5aab9e3e 100644 --- a/packages/ipfs/src/core/components/add/index.js +++ b/packages/ipfs/src/core/components/add/index.js @@ -4,6 +4,7 @@ const importer = require('ipfs-unixfs-importer') const normaliseAddInput = require('ipfs-utils/src/files/normalise-input') const { parseChunkerString } = require('./utils') const pipe = require('it-pipe') +const drain = require('it-drain') module.exports = ({ ipld, gcLock, preload, pin, options: constructorOptions }) => { const isShardingEnabled = constructorOptions.EXPERIMENTAL && constructorOptions.EXPERIMENTAL.sharding @@ -111,10 +112,10 @@ function pinFile (pin, opts) { if (shouldPin) { // Note: addAsyncIterator() has already taken a GC lock, so tell // pin.add() not to take a (second) GC lock - await pin.add(file.cid, { + await drain(pin.add(file.cid, { preload: false, lock: false - }) + })) } yield file diff --git a/packages/ipfs/src/core/components/dag/put.js b/packages/ipfs/src/core/components/dag/put.js index 301c87ba8c..e1e6955b0d 100644 --- a/packages/ipfs/src/core/components/dag/put.js +++ b/packages/ipfs/src/core/components/dag/put.js @@ -2,6 +2,7 @@ const multicodec = require('multicodec') const nameToCodec = name => multicodec[name.toUpperCase().replace(/-/g, '_')] +const drain = require('it-drain') module.exports = ({ ipld, pin, gcLock, preload }) => { return async function put (dagNode, options) { @@ -51,9 +52,9 @@ module.exports = ({ ipld, pin, gcLock, preload }) => { }) if (options.pin) { - await pin.add(cid, { + await drain(pin.add(cid, { lock: false - }) + })) } if (options.preload !== false) { diff --git a/packages/ipfs/src/core/components/init.js b/packages/ipfs/src/core/components/init.js index 7e2f0ce94d..2fda6c3490 100644 --- a/packages/ipfs/src/core/components/init.js +++ b/packages/ipfs/src/core/components/init.js @@ -110,7 +110,6 @@ module.exports = ({ } const pinManager = new PinManager(repo, dag) - await pinManager.load() const pin = { add: Components.pin.add({ pinManager, gcLock, dag }), diff --git a/packages/ipfs/src/core/components/pin/add.js b/packages/ipfs/src/core/components/pin/add.js index 8f4a35c223..8376e57b5f 100644 --- a/packages/ipfs/src/core/components/pin/add.js +++ b/packages/ipfs/src/core/components/pin/add.js @@ -2,57 +2,33 @@ 'use strict' const { resolvePath, withTimeoutOption } = require('../../utils') +const PinManager = require('./pin-manager') +const { PinTypes } = PinManager module.exports = ({ pinManager, gcLock, dag }) => { - return withTimeoutOption(async function add (paths, options) { + return withTimeoutOption(async function * add (paths, options) { options = options || {} const recursive = options.recursive !== false const cids = await resolvePath(dag, paths, { signal: options.signal }) - const pinAdd = async () => { - const results = [] - + const pinAdd = async function * () { // verify that each hash can be pinned for (const cid of cids) { - const key = cid.toBaseEncodedString() - - if (recursive) { - if (pinManager.recursivePins.has(key)) { - // it's already pinned recursively - results.push(cid) + const { reason } = await pinManager.isPinnedWithType(cid, [PinTypes.recursive, PinTypes.direct]) - continue - } - - // entire graph of nested links should be pinned, - // so make sure we have all the objects - await pinManager.fetchCompleteDag(key, { preload: options.preload, signal: options.signal }) + if (reason === 'recursive' && !recursive) { + // only disallow trying to override recursive pins + throw new Error(`${cid} already pinned recursively`) + } - // found all objects, we can add the pin - results.push(cid) + if (recursive) { + await pinManager.pinRecursively(cid) } else { - if (pinManager.recursivePins.has(key)) { - // recursive supersedes direct, can't have both - throw new Error(`${key} already pinned recursively`) - } - - if (!pinManager.directPins.has(key)) { - // make sure we have the object - await dag.get(cid, { preload: options.preload }) - } - - results.push(cid) + await pinManager.pinDirectly(cid) } - } - // update the pin sets in memory - const pinset = recursive ? pinManager.recursivePins : pinManager.directPins - results.forEach(cid => pinset.add(cid.toString())) - - // persist updated pin sets to datastore - await pinManager.flushPins() - - return results.map(cid => ({ cid })) + yield { cid } + } } // When adding a file, we take a lock that gets released after pinning @@ -60,13 +36,14 @@ module.exports = ({ pinManager, gcLock, dag }) => { const lock = Boolean(options.lock) if (!lock) { - return pinAdd() + yield * pinAdd() + return } const release = await gcLock.readLock() try { - await pinAdd() + yield * pinAdd() } finally { release() } diff --git a/packages/ipfs/src/core/components/pin/ls.js b/packages/ipfs/src/core/components/pin/ls.js index 253384c5fb..bfdff150e7 100644 --- a/packages/ipfs/src/core/components/pin/ls.js +++ b/packages/ipfs/src/core/components/pin/ls.js @@ -1,13 +1,22 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const { parallelMap } = require('streaming-iterables') -const CID = require('cids') const { resolvePath } = require('../../utils') const PinManager = require('./pin-manager') const { PinTypes } = PinManager -const PIN_LS_CONCURRENCY = 8 +function toPin (type, cid, name) { + const output = { + type, + cid + } + + if (name) { + output.name = name + } + + return output +} module.exports = ({ pinManager, dag }) => { return async function * ls (paths, options) { @@ -25,67 +34,70 @@ module.exports = ({ pinManager, dag }) => { if (typeof options.type === 'string') { type = options.type.toLowerCase() } - const err = PinManager.checkPinType(type) - if (err) { - throw err - } + + PinManager.checkPinType(type) + } else { + options.type = PinTypes.all } if (paths) { paths = Array.isArray(paths) ? paths : [paths] // check the pinned state of specific hashes - const cids = await resolvePath(dag, paths) + const cids = await resolvePath(dag, paths, { signal: options.signal }) + let noMatch = true - yield * parallelMap(PIN_LS_CONCURRENCY, async cid => { - const { reason, pinned } = await pinManager.isPinnedWithType(cid, type) + for (const cid of cids) { + const { reason, pinned, parent } = await pinManager.isPinnedWithType(cid, type) if (!pinned) { - throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`) + throw new Error(`path '${paths}' is not pinned`) } - if (reason === PinTypes.direct || reason === PinTypes.recursive) { - return { cid, type: reason } + switch (reason) { + case PinTypes.direct: + case PinTypes.recursive: + noMatch = false + yield { + type: reason, + cid + } + break + default: + noMatch = false + yield { + type: `${PinTypes.indirect} through ${parent}`, + cid + } } + } - return { cid, type: `${PinTypes.indirect} through ${reason}` } - }, cids) + if (noMatch) { + throw new Error('No match found') + } return } - // show all pinned items of type - let pins = [] - - if (type === PinTypes.direct || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.directPins).map(cid => ({ - type: PinTypes.direct, - cid: new CID(cid) - })) - ) - } - if (type === PinTypes.recursive || type === PinTypes.all) { - pins = pins.concat( - Array.from(pinManager.recursivePins).map(cid => ({ - type: PinTypes.recursive, - cid: new CID(cid) - })) - ) + for await (const { cid, name } of pinManager.recursiveKeys()) { + yield toPin(PinTypes.recursive, cid, name) + } } if (type === PinTypes.indirect || type === PinTypes.all) { - const indirects = await pinManager.getIndirectKeys(options) - - pins = pins - // if something is pinned both directly and indirectly, - // report the indirect entry - .filter(({ cid }) => !indirects.includes(cid.toString()) || !pinManager.directPins.has(cid.toString())) - .concat(indirects.map(cid => ({ type: PinTypes.indirect, cid: new CID(cid) }))) + for await (const cid of pinManager.indirectKeys(options)) { + yield { + type: PinTypes.indirect, + cid + } + } } - // FIXME: https://github.com/ipfs/js-ipfs/issues/2244 - yield * pins + if (type === PinTypes.direct || type === PinTypes.all) { + for await (const { cid, name } of pinManager.directKeys()) { + yield toPin(PinTypes.direct, cid, name) + } + } } } diff --git a/packages/ipfs/src/core/components/pin/pin-manager.js b/packages/ipfs/src/core/components/pin/pin-manager.js index 8cd8c1c261..24ea7c480d 100644 --- a/packages/ipfs/src/core/components/pin/pin-manager.js +++ b/packages/ipfs/src/core/components/pin/pin-manager.js @@ -1,31 +1,29 @@ /* eslint max-nested-callbacks: ["error", 8] */ 'use strict' -const { DAGNode, DAGLink } = require('ipld-dag-pb') const CID = require('cids') -const { default: Queue } = require('p-queue') -const { Key } = require('interface-datastore') const errCode = require('err-code') -const multicodec = require('multicodec') const dagCborLinks = require('dag-cbor-links') const debug = require('debug') -const { cidToString } = require('../../../utils/cid') - -const createPinSet = require('./pin-set') - -const { Errors } = require('interface-datastore') -const ERR_NOT_FOUND = Errors.notFoundError().code +// const parallelBatch = require('it-parallel-batch') +const first = require('it-first') +const all = require('it-all') +const cbor = require('cbor') // arbitrary limit to the number of concurrent dag operations -const WALK_DAG_CONCURRENCY_LIMIT = 300 -const IS_PINNED_WITH_TYPE_CONCURRENCY_LIMIT = 300 -const PIN_DS_KEY = new Key('/local/pins') +// const WALK_DAG_CONCURRENCY_LIMIT = 300 +// const IS_PINNED_WITH_TYPE_CONCURRENCY_LIMIT = 300 +// const PIN_DS_KEY = new Key('/local/pins') function invalidPinTypeErr (type) { const errMsg = `Invalid type '${type}', must be one of {direct, indirect, recursive, all}` return errCode(new Error(errMsg), 'ERR_INVALID_PIN_TYPE') } +function toKey (cid) { + return '/' + cid.multihash.toString('base64') +} + const PinTypes = { direct: 'direct', recursive: 'recursive', @@ -38,245 +36,201 @@ class PinManager { this.repo = repo this.dag = dag this.log = debug('ipfs:pin') - this.pinset = createPinSet(dag) this.directPins = new Set() this.recursivePins = new Set() } - async _walkDag ({ cid, preload = false, onCid = () => {} }) { - if (!CID.isCID(cid)) { - cid = new CID(cid) + async * _walkDag (cid, { preload = false }) { + const { value: node } = await this.dag.get(cid, { preload }) + + if (cid.codec === 'dag-pb') { + for (const link of node.Links) { + yield link.Hash + yield * this._walkDag(link.Hash, { preload }) + } + } else if (cid.codec === 'dag-cbor') { + for (const [_, childCid] of dagCborLinks(node)) { // eslint-disable-line no-unused-vars + yield childCid + yield * this._walkDag(childCid, { preload }) + } } + } - const walk = (cid) => { - return async () => { - const { value: node } = await this.dag.get(cid, { preload }) + async pinDirectly (cid, options = {}) { + await this.dag.get(cid, options) - onCid(cid) + return this.repo.pins.put(toKey(cid), cbor.encode({ + cid: cid.buffer, + type: PinTypes.direct, + name: options.name + })) + } - if (cid.codec === 'dag-pb') { - queue.addAll( - node.Links.map(link => walk(link.Hash)) - ) - } else if (cid.codec === 'dag-cbor') { - for (const [_, childCid] of dagCborLinks(node)) { // eslint-disable-line no-unused-vars - queue.add(walk(childCid)) - } - } + async unpin (cid) { + if (typeof cid === 'string' || cid instanceof String) { + // find pin with passed name + const result = await first(this.repo.pins.query({ + filters: [entry => { + const pin = cbor.decode(entry.value) + + return pin.name === cid + }], + limit: 1 + })) + + if (!result) { + // no pin with this name + return } - } - const queue = new Queue({ - concurrency: WALK_DAG_CONCURRENCY_LIMIT - }) - queue.add(walk(cid)) + cid = new CID(cbor.decode(result.value).cid) + } - await queue.onIdle() + return this.repo.pins.delete(toKey(cid)) } - directKeys () { - return Array.from(this.directPins, key => new CID(key).buffer) - } + async pinRecursively (cid, options = {}) { + await this.fetchCompleteDag(cid, options) - recursiveKeys () { - return Array.from(this.recursivePins, key => new CID(key).buffer) + await this.repo.pins.put(toKey(cid), cbor.encode({ + cid: cid.buffer, + type: PinTypes.recursive, + name: options.name + })) } - async getIndirectKeys ({ preload }) { - const indirectKeys = new Set() + async * directKeys () { + for await (const entry of this.repo.pins.query({ + filters: [(entry) => { + const pin = cbor.decode(entry.value) - for (const multihash of this.recursiveKeys()) { - await this._walkDag({ - cid: new CID(multihash), - preload: preload || false, - onCid: (cid) => { - cid = cid.toString() + return pin.type === PinTypes.direct + }] + })) { + const pin = cbor.decode(entry.value) - // recursive pins pre-empt indirect pins - if (!this.recursivePins.has(cid)) { - indirectKeys.add(cid) - } - } - }) + yield { + cid: new CID(pin.cid), + name: pin.name + } } - - return Array.from(indirectKeys) } - // Encode and write pin key sets to the datastore: - // a DAGLink for each of the recursive and direct pinsets - // a DAGNode holding those as DAGLinks, a kind of root pin - async flushPins () { - const [ - dLink, - rLink - ] = await Promise.all([ - // create a DAGLink to the node with direct pins - this.pinset.storeSet(this.directKeys()) - .then((result) => { - return new DAGLink(PinTypes.direct, result.node.size, result.cid) - }), - // create a DAGLink to the node with recursive pins - this.pinset.storeSet(this.recursiveKeys()) - .then((result) => { - return new DAGLink(PinTypes.recursive, result.node.size, result.cid) - }), - // the pin-set nodes link to a special 'empty' node, so make sure it exists - this.dag.put(new DAGNode(Buffer.alloc(0)), { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - ]) - - // create a root node with DAGLinks to the direct and recursive DAGs - const rootNode = new DAGNode(Buffer.alloc(0), [dLink, rLink]) - const rootCid = await this.dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - - // save root to datastore under a consistent key - await this.repo.datastore.put(PIN_DS_KEY, rootCid.buffer) - - this.log(`Flushed pins with root: ${rootCid}`) - } + async * recursiveKeys () { + for await (const entry of this.repo.pins.query({ + filters: [(entry) => { + const pin = cbor.decode(entry.value) - async load () { - const has = await this.repo.datastore.has(PIN_DS_KEY) + return pin.type === PinTypes.recursive + }] + })) { + const pin = cbor.decode(entry.value) - if (!has) { - return + yield { + cid: new CID(pin.cid), + name: pin.name + } } + } - const mh = await this.repo.datastore.get(PIN_DS_KEY) - const pinRoot = await this.dag.get(new CID(mh), '', { preload: false }) - - const [ - rKeys, dKeys - ] = await Promise.all([ - this.pinset.loadSet(pinRoot.value, PinTypes.recursive), - this.pinset.loadSet(pinRoot.value, PinTypes.direct) - ]) - - this.directPins = new Set(dKeys.map(k => cidToString(k))) - this.recursivePins = new Set(rKeys.map(k => cidToString(k))) + async * indirectKeys ({ preload }) { + for await (const { cid } of this.recursiveKeys()) { + for await (const childCid of this._walkDag(cid, { preload })) { + // recursive pins override indirect pins + const types = [ + PinTypes.recursive + ] - this.log('Loaded pins from the datastore') - } + const result = await this.isPinnedWithType(childCid, types) - async isPinnedWithType (multihash, type) { - const key = cidToString(multihash) - const { recursive, direct, all } = PinTypes + if (result.pinned) { + continue + } - // recursive - if ((type === recursive || type === all) && this.recursivePins.has(key)) { - return { - key, - pinned: true, - reason: recursive + yield childCid } } + } - if (type === recursive) { - return { - key, - pinned: false - } + async isPinnedWithType (cid, types) { + if (!Array.isArray(types)) { + types = [types] } - // direct - if ((type === direct || type === all) && this.directPins.has(key)) { - return { - key, - pinned: true, - reason: direct - } - } + const all = types.includes(PinTypes.all) + const direct = types.includes(PinTypes.direct) + const recursive = types.includes(PinTypes.recursive) + const indirect = types.includes(PinTypes.indirect) - if (type === direct) { - return { - key, - pinned: false - } - } + if (recursive || direct || all) { + const result = await first(this.repo.pins.query({ + prefix: toKey(cid), + filters: [entry => { + const pin = cbor.decode(entry.value) - // indirect (default) - // check each recursive key to see if multihash is under it - // arbitrary limit, enables handling 1000s of pins. - const queue = new Queue({ - concurrency: IS_PINNED_WITH_TYPE_CONCURRENCY_LIMIT - }) - let cid - - queue.addAll( - this.recursiveKeys() - .map(childKey => { - childKey = new CID(childKey) - - return async () => { - const has = await this.pinset.hasDescendant(childKey, key) - - if (has) { - cid = childKey - queue.clear() - } + if (all) { + return true } - }) - ) - await queue.onIdle() + return types.includes(pin.type) + }], + limit: 1 + })) - return { - key, - pinned: Boolean(cid), - reason: cid - } - } + if (result) { + const pin = cbor.decode(result.value) - // Gets CIDs of blocks used internally by the pinner - async getInternalBlocks () { - let mh + return { + cid, + pinned: true, + reason: pin.type + } + } + } - try { - mh = await this.repo.datastore.get(PIN_DS_KEY) - } catch (err) { - if (err.code === ERR_NOT_FOUND) { - this.log('No pinned blocks') + const self = this - return [] + async function * findChild (key, source) { + for await (const { cid: parentCid } of source) { + for await (const childCid of self._walkDag(parentCid, { preload: false })) { + if (childCid.equals(key)) { + yield parentCid + return + } + } } - - throw new Error(`Could not get pin sets root from datastore: ${err.message}`) } - const cid = new CID(mh) - const obj = await this.dag.get(cid, '', { preload: false }) + if (all || indirect) { + // indirect (default) + // check each recursive key to see if multihash is under it + + const parentCid = await first(findChild(cid, this.recursiveKeys())) - // The pinner stores an object that has two links to pin sets: - // 1. The directly pinned CIDs - // 2. The recursively pinned CIDs - // If large enough, these pin sets may have links to buckets to hold - // the pins - const cids = await this.pinset.getInternalCids(obj.value) + if (parentCid) { + return { + cid, + pinned: true, + reason: PinTypes.indirect, + parent: parentCid + } + } + } - return cids.concat(cid) + return { + cid, + pinned: false + } } async fetchCompleteDag (cid, options) { - await this._walkDag({ - cid, - preload: options.preload - }) + await all(this._walkDag(cid, { preload: options.preload })) } - // Returns an error if the pin type is invalid + // Throws an error if the pin type is invalid static checkPinType (type) { if (typeof type !== 'string' || !Object.keys(PinTypes).includes(type)) { - return invalidPinTypeErr(type) + throw invalidPinTypeErr(type) } } } diff --git a/packages/ipfs/src/core/components/pin/pin-set.js b/packages/ipfs/src/core/components/pin/pin-set.js deleted file mode 100644 index 552fc23313..0000000000 --- a/packages/ipfs/src/core/components/pin/pin-set.js +++ /dev/null @@ -1,317 +0,0 @@ -'use strict' - -const multihashes = require('multihashes') -const CID = require('cids') -const protobuf = require('protons') -const fnv1a = require('fnv1a') -const varint = require('varint') -const { DAGNode, DAGLink } = require('ipld-dag-pb') -const multicodec = require('multicodec') -const { default: Queue } = require('p-queue') -const dagCborLinks = require('dag-cbor-links') -const log = require('debug')('ipfs:pin:pin-set') -const pbSchema = require('./pin.proto') - -const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' -const emptyKey = multihashes.fromB58String(emptyKeyHash) -const defaultFanout = 256 -const maxItems = 8192 -const pb = protobuf(pbSchema) - -const HAS_DESCENDANT_CONCURRENCY = 100 - -function toB58String (hash) { - return new CID(hash).toBaseEncodedString() -} - -function readHeader (rootNode) { - // rootNode.data should be a buffer of the format: - // < varint(headerLength) | header | itemData... > - const rootData = rootNode.Data - const hdrLength = varint.decode(rootData) - const vBytes = varint.decode.bytes - - if (vBytes <= 0) { - throw new Error('Invalid Set header length') - } - - if (vBytes + hdrLength > rootData.length) { - throw new Error('Impossibly large set header length') - } - - const hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) - const header = pb.Set.decode(hdrSlice) - - if (header.version !== 1) { - throw new Error(`Unsupported Set version: ${header.version}`) - } - - if (header.fanout > rootNode.Links.length) { - throw new Error('Impossibly large fanout') - } - - return { - header: header, - data: rootData.slice(hdrLength + vBytes) - } -} - -function hash (seed, key) { - const buf = Buffer.alloc(4) - buf.writeUInt32LE(seed, 0) - const data = Buffer.concat([ - buf, Buffer.from(toB58String(key)) - ]) - return fnv1a(data.toString('binary')) -} - -function * cborCids (node) { - for (const [_, cid] of dagCborLinks(node)) { // eslint-disable-line no-unused-vars - yield cid - } -} - -exports = module.exports = function (dag) { - const pinSet = { - // should this be part of `object` API? - hasDescendant: async (parentCid, childhash) => { - if (parentCid.codec !== 'dag-pb' && parentCid.codec !== 'dag-cbor') { - return false - } - - const { value: root } = await dag.get(parentCid, { preload: false }) - const queue = new Queue({ - concurrency: HAS_DESCENDANT_CONCURRENCY - }) - - if (CID.isCID(childhash) || Buffer.isBuffer(childhash)) { - childhash = toB58String(childhash) - } - - let found = false - const seen = {} - - function searchChild (linkCid) { - return async () => { - if (found) { - return - } - - try { - const { value: childNode } = await dag.get(linkCid, { preload: false }) - - searchChildren(linkCid, childNode) - } catch (err) { - log(err) - } - } - } - - function searchChildren (cid, node) { - let links = [] - - if (cid.codec === 'dag-pb') { - links = node.Links - } else if (cid.codec === 'dag-cbor') { - links = cborCids(node) - } - - for (const link of links) { - const linkCid = cid.codec === 'dag-pb' ? link.Hash : link[1] - const bs58Link = toB58String(linkCid) - - if (bs58Link === childhash) { - queue.clear() - found = true - - return - } - - if (seen[bs58Link]) { - continue - } - - seen[bs58Link] = true - - if (linkCid.codec !== 'dag-pb' && linkCid.codec !== 'dag-cbor') { - continue - } - - queue.add(searchChild(linkCid)) - } - } - - searchChildren(parentCid, root) - - await queue.onIdle() - - return found - }, - - storeSet: async (keys) => { - const pins = keys.map(key => { - if (typeof key === 'string' || Buffer.isBuffer(key)) { - key = new CID(key) - } - - return { - key: key, - data: null - } - }) - - const rootNode = await pinSet.storeItems(pins) - const cid = await dag.put(rootNode, { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - }) - - return { - node: rootNode, - cid - } - }, - - storeItems: async (items) => { // eslint-disable-line require-await - return storePins(items, 0) - - async function storePins (pins, depth) { - const pbHeader = pb.Set.encode({ - version: 1, - fanout: defaultFanout, - seed: depth - }) - const headerBuf = Buffer.concat([ - Buffer.from(varint.encode(pbHeader.length)), pbHeader - ]) - const fanoutLinks = [] - - for (let i = 0; i < defaultFanout; i++) { - fanoutLinks.push(new DAGLink('', 1, emptyKey)) - } - - if (pins.length <= maxItems) { - const nodes = pins - .map(item => { - return ({ - link: new DAGLink('', 1, item.key), - data: item.data || Buffer.alloc(0) - }) - }) - // sorting makes any ordering of `pins` produce the same DAGNode - .sort((a, b) => Buffer.compare(a.link.Hash.buffer, b.link.Hash.buffer)) - - const rootLinks = fanoutLinks.concat(nodes.map(item => item.link)) - const rootData = Buffer.concat( - [headerBuf].concat(nodes.map(item => item.data)) - ) - - return new DAGNode(rootData, rootLinks) - } else { - // If the array of pins is > maxItems, we: - // - distribute the pins among `defaultFanout` bins - // - create a DAGNode for each bin - // - add each pin as a DAGLink to that bin - // - create a root DAGNode - // - add each bin as a DAGLink - // - send that root DAGNode via callback - // (using go-ipfs' "wasteful but simple" approach for consistency) - // https://github.com/ipfs/go-ipfs/blob/master/pin/set.go#L57 - - const bins = pins.reduce((bins, pin) => { - const n = hash(depth, pin.key) % defaultFanout - bins[n] = n in bins ? bins[n].concat([pin]) : [pin] - return bins - }, []) - - let idx = 0 - for (const bin of bins) { - const child = await storePins(bin, depth + 1) - - await storeChild(child, idx) - - idx++ - } - - return new DAGNode(headerBuf, fanoutLinks) - } - - async function storeChild (child, binIdx) { - const opts = { - version: 0, - format: multicodec.DAG_PB, - hashAlg: multicodec.SHA2_256, - preload: false - } - - const cid = await dag.put(child, opts) - - fanoutLinks[binIdx] = new DAGLink('', child.size, cid) - } - } - }, - - loadSet: async (rootNode, name) => { - const link = rootNode.Links.find(l => l.Name === name) - - if (!link) { - throw new Error('No link found with name ' + name) - } - - const res = await dag.get(link.Hash, '', { preload: false }) - const keys = [] - const stepPin = link => keys.push(link.Hash) - - await pinSet.walkItems(res.value, { stepPin }) - - return keys - }, - - walkItems: async (node, { stepPin = () => {}, stepBin = () => {} }) => { - const pbh = readHeader(node) - let idx = 0 - - for (const link of node.Links) { - if (idx < pbh.header.fanout) { - // the first pbh.header.fanout links are fanout bins - // if a fanout bin is not 'empty', dig into and walk its DAGLinks - const linkHash = link.Hash.buffer - - if (!emptyKey.equals(linkHash)) { - stepBin(link, idx, pbh.data) - - // walk the links of this fanout bin - const res = await dag.get(linkHash, '', { preload: false }) - - await pinSet.walkItems(res.value, { stepPin, stepBin }) - } - } else { - // otherwise, the link is a pin - stepPin(link, idx, pbh.data) - } - - idx++ - } - }, - - getInternalCids: async (rootNode) => { - // "Empty block" used by the pinner - const cids = [new CID(emptyKey)] - const stepBin = link => cids.push(link.Hash) - - for (const topLevelLink of rootNode.Links) { - cids.push(topLevelLink.Hash) - - const res = await dag.get(topLevelLink.Hash, '', { preload: false }) - - await pinSet.walkItems(res.value, { stepBin }) - } - - return cids - } - } - - return pinSet -} diff --git a/packages/ipfs/src/core/components/pin/pin.proto.js b/packages/ipfs/src/core/components/pin/pin.proto.js deleted file mode 100644 index 8e94fd8f52..0000000000 --- a/packages/ipfs/src/core/components/pin/pin.proto.js +++ /dev/null @@ -1,19 +0,0 @@ -'use strict' - -/** - * Protobuf interface - * from go-ipfs/pin/internal/pb/header.proto - */ -module.exports = ` - syntax = "proto2"; - - package ipfs.pin; - - option go_package = "pb"; - - message Set { - optional uint32 version = 1; - optional uint32 fanout = 2; - optional fixed32 seed = 3; - } -` diff --git a/packages/ipfs/src/core/components/pin/rm.js b/packages/ipfs/src/core/components/pin/rm.js index 5082c7eca2..3a159ad667 100644 --- a/packages/ipfs/src/core/components/pin/rm.js +++ b/packages/ipfs/src/core/components/pin/rm.js @@ -1,62 +1,51 @@ 'use strict' -const errCode = require('err-code') -const multibase = require('multibase') -const { parallelMap, collect } = require('streaming-iterables') -const pipe = require('it-pipe') const { resolvePath } = require('../../utils') const { PinTypes } = require('./pin-manager') -const PIN_RM_CONCURRENCY = 8 - module.exports = ({ pinManager, gcLock, dag }) => { - return async function rm (paths, options) { + return async function * rm (paths, options) { options = options || {} - const recursive = options.recursive !== false - - if (options.cidBase && !multibase.names.includes(options.cidBase)) { - throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE') - } + const recursive = options.recursive == null ? true : options.recursive + const cids = await resolvePath(dag, paths, { signal: options.signal }) - const cids = await resolvePath(dag, paths) const release = await gcLock.readLock() try { // verify that each hash can be unpinned - const results = await pipe( - cids, - parallelMap(PIN_RM_CONCURRENCY, async cid => { - const { pinned, reason } = await pinManager.isPinnedWithType(cid, PinTypes.all) + for (const cid of cids) { + const { pinned, reason } = await pinManager.isPinnedWithType(cid, PinTypes.all) - if (!pinned) { - throw new Error(`${cid} is not pinned`) - } - if (reason !== PinTypes.recursive && reason !== PinTypes.direct) { - throw new Error(`${cid} is pinned indirectly under ${reason}`) - } - if (reason === PinTypes.recursive && !recursive) { - throw new Error(`${cid} is pinned recursively`) - } + if (!pinned) { + throw new Error(`${cid} is not pinned`) + } - return cid - }), - collect - ) + switch (reason) { + case (PinTypes.recursive): + if (!recursive) { + throw new Error(`${cid} is pinned recursively`) + } - // update the pin sets in memory - results.forEach(cid => { - if (recursive && pinManager.recursivePins.has(cid.toString())) { - pinManager.recursivePins.delete(cid.toString()) - } else { - pinManager.directPins.delete(cid.toString()) - } - }) + await pinManager.unpin(cid) + + yield { + cid + } + + break + case (PinTypes.direct): + await pinManager.unpin(cid) - // persist updated pin sets to datastore - await pinManager.flushPins() + yield { + cid + } - return results.map(cid => ({ cid })) + break + default: + throw new Error(`${cid} is pinned indirectly under ${reason}`) + } + } } finally { release() } diff --git a/packages/ipfs/src/core/components/repo/gc.js b/packages/ipfs/src/core/components/repo/gc.js index 3f19789f37..3e6dd1b23c 100644 --- a/packages/ipfs/src/core/components/repo/gc.js +++ b/packages/ipfs/src/core/components/repo/gc.js @@ -13,7 +13,7 @@ const { parallelMerge, transform, map } = require('streaming-iterables') const BLOCK_RM_CONCURRENCY = 256 // Perform mark and sweep garbage collection -module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { +module.exports = ({ gcLock, pin, refs, repo }) => { return async function * gc () { const start = Date.now() log('Creating set of marked blocks') @@ -22,7 +22,7 @@ module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { try { // Mark all blocks that are being used - const markedSet = await createMarkedSet({ pin, pinManager, refs, repo }) + const markedSet = await createMarkedSet({ pin, refs, repo }) // Get all blocks keys from the blockstore const blockKeys = repo.blocks.query({ keysOnly: true }) @@ -37,14 +37,9 @@ module.exports = ({ gcLock, pin, pinManager, refs, repo }) => { } // Get Set of CIDs of blocks to keep -async function createMarkedSet ({ pin, pinManager, refs, repo }) { +async function createMarkedSet ({ pin, refs, repo }) { const pinsSource = map(({ cid }) => cid, pin.ls()) - const pinInternalsSource = (async function * () { - const cids = await pinManager.getInternalBlocks() - yield * cids - })() - const mfsSource = (async function * () { let mh try { @@ -66,7 +61,7 @@ async function createMarkedSet ({ pin, pinManager, refs, repo }) { })() const output = new Set() - for await (const cid of parallelMerge(pinsSource, pinInternalsSource, mfsSource)) { + for await (const cid of parallelMerge(pinsSource, mfsSource)) { output.add(cidToString(cid, { base: 'base32' })) } return output diff --git a/packages/ipfs/src/core/components/start.js b/packages/ipfs/src/core/components/start.js index ec771cb09b..891cdd9ec9 100644 --- a/packages/ipfs/src/core/components/start.js +++ b/packages/ipfs/src/core/components/start.js @@ -229,7 +229,7 @@ function createApi ({ pubsub, refs, repo: { - gc: Components.repo.gc({ gcLock, pin, pinManager, refs, repo }), + gc: Components.repo.gc({ gcLock, pin, refs, repo }), stat: Components.repo.stat({ repo }), version: Components.repo.version({ repo }) }, diff --git a/packages/ipfs/src/http/api/resources/dag.js b/packages/ipfs/src/http/api/resources/dag.js index 8b6ff198ce..505c0ec8c1 100644 --- a/packages/ipfs/src/http/api/resources/dag.js +++ b/packages/ipfs/src/http/api/resources/dag.js @@ -14,6 +14,7 @@ const { const all = require('it-all') const log = debug('ipfs:http-api:dag') log.error = debug('ipfs:http-api:dag:error') +const drain = require('it-drain') const IpldFormats = { get [multicodec.RAW] () { @@ -252,7 +253,7 @@ exports.put = { } if (request.query.pin) { - await ipfs.pin.add(cid) + await drain(ipfs.pin.add(cid)) } return h.response({ diff --git a/packages/ipfs/src/http/api/resources/pin.js b/packages/ipfs/src/http/api/resources/pin.js index c853f9bdda..4d2e860307 100644 --- a/packages/ipfs/src/http/api/resources/pin.js +++ b/packages/ipfs/src/http/api/resources/pin.js @@ -9,6 +9,7 @@ const pipe = require('it-pipe') const ndjson = require('iterable-ndjson') const { cidToString } = require('../../../utils/cid') const streamResponse = require('../../utils/stream-response') +const all = require('it-all') function parseArgs (request, h) { let { arg } = request.query @@ -93,7 +94,7 @@ exports.add = { let result try { - result = await ipfs.pin.add(path, { recursive }) + result = await all(ipfs.pin.add(path, { recursive })) } catch (err) { if (err.message.includes('already pinned recursively')) { throw Boom.boomify(err, { statusCode: 400 }) @@ -122,7 +123,7 @@ exports.rm = { let result try { - result = await ipfs.pin.rm(path, { recursive }) + result = await all(ipfs.pin.rm(path, { recursive })) } catch (err) { throw Boom.boomify(err, { message: 'Failed to remove pin' }) } diff --git a/packages/ipfs/test/cli/pin.js b/packages/ipfs/test/cli/pin.js index b65e75a8bf..b667221799 100644 --- a/packages/ipfs/test/cli/pin.js +++ b/packages/ipfs/test/cli/pin.js @@ -33,7 +33,7 @@ describe('pin', () => { describe('rm', function () { it('recursively (default)', async () => { - ipfs.pin.rm.withArgs([pins.root], { recursive: true }).resolves([{ + ipfs.pin.rm.withArgs([pins.root], { recursive: true }).returns([{ cid: new CID(pins.root) }]) @@ -42,7 +42,7 @@ describe('pin', () => { }) it('non recursively', async () => { - ipfs.pin.rm.withArgs([pins.root], { recursive: false }).resolves([{ + ipfs.pin.rm.withArgs([pins.root], { recursive: false }).returns([{ cid: new CID(pins.root) }]) @@ -51,7 +51,7 @@ describe('pin', () => { }) it('non recursively (short option)', async () => { - ipfs.pin.rm.withArgs([pins.root], { recursive: false }).resolves([{ + ipfs.pin.rm.withArgs([pins.root], { recursive: false }).returns([{ cid: new CID(pins.root) }]) @@ -60,7 +60,7 @@ describe('pin', () => { }) it('should rm and print CIDs encoded in specified base', async () => { - ipfs.pin.rm.withArgs([pins.root], { recursive: true }).resolves([{ + ipfs.pin.rm.withArgs([pins.root], { recursive: true }).returns([{ cid: new CID(pins.root) }]) @@ -72,7 +72,7 @@ describe('pin', () => { describe('add', function () { it('recursively (default)', async () => { - ipfs.pin.add.withArgs([pins.root], { recursive: true }).resolves([{ + ipfs.pin.add.withArgs([pins.root], { recursive: true }).returns([{ cid: new CID(pins.root) }]) @@ -81,7 +81,7 @@ describe('pin', () => { }) it('non recursively', async () => { - ipfs.pin.add.withArgs([pins.root], { recursive: false }).resolves([{ + ipfs.pin.add.withArgs([pins.root], { recursive: false }).returns([{ cid: new CID(pins.root) }]) @@ -90,7 +90,7 @@ describe('pin', () => { }) it('non recursively (short option)', async () => { - ipfs.pin.add.withArgs([pins.root], { recursive: false }).resolves([{ + ipfs.pin.add.withArgs([pins.root], { recursive: false }).returns([{ cid: new CID(pins.root) }]) @@ -99,7 +99,7 @@ describe('pin', () => { }) it('should rm and print CIDs encoded in specified base', async () => { - ipfs.pin.add.withArgs([pins.root], { recursive: true }).resolves([{ + ipfs.pin.add.withArgs([pins.root], { recursive: true }).returns([{ cid: new CID(pins.root) }]) diff --git a/packages/ipfs/test/core/gc.spec.js b/packages/ipfs/test/core/gc.spec.js index 90b0ebf982..11b7266c38 100644 --- a/packages/ipfs/test/core/gc.spec.js +++ b/packages/ipfs/test/core/gc.spec.js @@ -4,6 +4,7 @@ const { expect } = require('interface-ipfs-core/src/utils/mocha') const last = require('it-last') +const drain = require('it-drain') const factory = require('../utils/factory') const pEvent = require('p-event') @@ -199,7 +200,7 @@ describe.skip('gc', function () { // Pin first block // Note: pin add will take a read lock const pinLockRequested = pEvent(lockEmitter, 'readLock request') - const pin1 = ipfs.pin.add(cid1) + const pin1 = last(ipfs.pin.add(cid1)) // Once pin lock has been requested, start GC await pinLockRequested @@ -209,7 +210,7 @@ describe.skip('gc', function () { // TODO: Adding pin for removed block never returns, which means the lock // never gets released - // const pin2 = ipfs.pin.add(cid2) + // const pin2 = last(ipfs.pin.add(cid2)) // Confirm second second block has been removed const localRefs = (await ipfs.refs.local()).map(r => r.ref) @@ -230,13 +231,13 @@ describe.skip('gc', function () { const cid2 = (await ipfs.block.put(Buffer.from('block to pin rm 2'), null)).cid // Pin blocks - await ipfs.pin.add(cid1) - await ipfs.pin.add(cid2) + await drain(ipfs.pin.add(cid1)) + await drain(ipfs.pin.add(cid2)) // Unpin first block // Note: pin rm will take a read lock const pinLockRequested = pEvent(lockEmitter, 'readLock request') - const pinRm1 = ipfs.pin.rm(cid1) + const pinRm1 = last(ipfs.pin.rm(cid1)) // Once pin lock has been requested, start GC await pinLockRequested @@ -246,7 +247,7 @@ describe.skip('gc', function () { // Once GC has started, start second pin rm await gcStarted - const pinRm2 = ipfs.pin.rm(cid2) + const pinRm2 = last(ipfs.pin.rm(cid2)) const deleted = (await gc).map(i => i.cid.toString()) await pinRm1 diff --git a/packages/ipfs/test/core/node.js b/packages/ipfs/test/core/node.js index f7aa9bde93..bb95a6d65e 100644 --- a/packages/ipfs/test/core/node.js +++ b/packages/ipfs/test/core/node.js @@ -1,6 +1,4 @@ 'use strict' require('./name-pubsub') -require('./pin') -require('./pin-set') require('./utils') diff --git a/packages/ipfs/test/core/pin-set.js b/packages/ipfs/test/core/pin-set.js deleted file mode 100644 index bec62bb962..0000000000 --- a/packages/ipfs/test/core/pin-set.js +++ /dev/null @@ -1,188 +0,0 @@ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const { util, DAGNode } = require('ipld-dag-pb') -const CID = require('cids') -const map = require('p-map') -const IPFS = require('../../src/core') -const createPinSet = require('../../src/core/components/pin/pin-set') -const createTempRepo = require('../utils/create-repo-nodejs') - -const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' -const defaultFanout = 256 -const maxItems = 8192 - -/** - * Creates @param num DAGNodes, limited to 500 at a time to save memory - * @param {[type]} num the number of nodes to create - * @return {Promise>} - */ -function createNodes (num) { - return map(Array.from(Array(num)), (_, i) => createNode(String(i)), { concurrency: 500 }) -} - -async function createNode (data, links = []) { - const node = new DAGNode(data, links) - const cid = await util.cid(util.serialize(node), { cidVersion: 0 }) - return { node, cid } -} - -describe('pinSet', function () { - let ipfs - let pinSet - let repo - - before(async function () { - this.timeout(80 * 1000) - repo = createTempRepo() - ipfs = await IPFS.create({ - silent: true, - repo, - config: { - Bootstrap: [], - Discovery: { - MDNS: { - Enabled: false - } - } - }, - preload: { enabled: false } - }) - pinSet = createPinSet(ipfs.dag) - }) - - after(function () { - this.timeout(80 * 1000) - return ipfs.stop() - }) - - after(() => repo.teardown()) - - describe('storeItems', function () { - it('generates a root node with links and hash', async function () { - const expectedRootHash = 'QmcLiSTjcjoVC2iuGbk6A2PVcWV3WvjZT4jxfNis1vjyrR' - - const result = await createNode('data') - const nodeHash = result.cid.toBaseEncodedString() - const rootNode = await pinSet.storeSet([nodeHash]) - - expect(rootNode.cid.toBaseEncodedString()).to.eql(expectedRootHash) - expect(rootNode.node.Links).to.have.length(defaultFanout + 1) - - const lastLink = rootNode.node.Links[rootNode.node.Links.length - 1] - const mhash = lastLink.Hash.toBaseEncodedString() - expect(mhash).to.eql(nodeHash) - }) - }) - - describe('handles large sets', function () { - it('handles storing items > maxItems', async function () { - this.timeout(90 * 1000) - const expectedHash = 'QmbvhSy83QWfgLXDpYjDmLWBFfGc8utoqjcXHyj3gYuasT' - const count = maxItems + 1 - const nodes = await createNodes(count) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - expect(result.node.size).to.eql(3184696) - expect(result.node.Links).to.have.length(defaultFanout) - expect(result.cid.toBaseEncodedString()).to.eql(expectedHash) - - const loaded = await pinSet.loadSet(result.node, '') - expect(loaded).to.have.length(30) - - const hashes = loaded.map(l => new CID(l).toBaseEncodedString()) - - // just check the first node, assume all are children if successful - const has = await pinSet.hasDescendant(result.cid, hashes[0]) - expect(has).to.eql(true) - }) - - // This test is largely taken from go-ipfs/pin/set_test.go - // It fails after reaching maximum call stack depth but I don't believe it's - // infinite. We need to reference go's pinSet impl to make sure - // our sharding behaves correctly, or perhaps this test is misguided - // - // FIXME: Update: AS 2020-01-14 this test currently is failing with: - // - // TypeError: Cannot read property 'length' of undefined - // at storePins (src/core/components/pin/pin-set.js:195:18) - // at storePins (src/core/components/pin/pin-set.js:231:33) - // at storePins (src/core/components/pin/pin-set.js:231:33) - // at Object.storeItems (src/core/components/pin/pin-set.js:178:14) - // at Object.storeSet (src/core/components/pin/pin-set.js:163:37) - // at Context. (test/core/pin-set.js:116:39) - // at processTicksAndRejections (internal/process/task_queues.js:94:5) - it.skip('stress test: stores items > (maxItems * defaultFanout) + 1', async function () { - this.timeout(180 * 1000) - - // this value triggers the creation of a recursive shard. - // If the recursive sharding is done improperly, this will result in - // an infinite recursion and crash (OOM) - const limit = (defaultFanout * maxItems) + 1 - - const nodes = await createNodes(limit) - const rootNodes0 = await pinSet.storeSet(nodes.slice(0, -1).map(n => n.cid)) - const rootNodes1 = await pinSet.storeSet(nodes.map(n => n.cid)) - - expect(rootNodes0.length - rootNodes1.length).to.eql(2) - }) - }) - - describe('walkItems', function () { - it('fails if node doesn\'t have a pin-set protobuf header', async function () { - const { node } = await createNode('datum') - await expect(pinSet.walkItems(node, {})) - .to.eventually.be.rejected() - }) - - it('visits all links of a root node', async function () { - this.timeout(90 * 1000) - - const seenPins = [] - const stepPin = (link, idx, data) => seenPins.push({ link, idx, data }) - const seenBins = [] - const stepBin = (link, idx, data) => seenBins.push({ link, idx, data }) - - const nodes = await createNodes(maxItems + 1) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - await pinSet.walkItems(result.node, { stepPin, stepBin }) - expect(seenPins).to.have.length(maxItems + 1) - expect(seenBins).to.have.length(defaultFanout) - }) - - it('visits all non-fanout links of a root node', async () => { - const seen = [] - const stepPin = (link, idx, data) => seen.push({ link, idx, data }) - - const nodes = await createNodes(defaultFanout) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - await pinSet.walkItems(result.node, { stepPin }) - - expect(seen).to.have.length(defaultFanout) - expect(seen[0].idx).to.eql(defaultFanout) - - seen.forEach(item => { - expect(item.data).to.eql(Buffer.alloc(0)) - expect(item.link).to.exist() - }) - }) - }) - - describe('getInternalCids', function () { - it('gets all links and empty key CID', async () => { - const nodes = await createNodes(defaultFanout) - const result = await pinSet.storeSet(nodes.map(n => n.cid)) - - const rootNode = new DAGNode('pins', [{ Hash: result.cid }]) - const cids = await pinSet.getInternalCids(rootNode) - - expect(cids.length).to.eql(2) - const cidStrs = cids.map(c => c.toString()) - expect(cidStrs).includes(emptyKeyHash) - expect(cidStrs).includes(result.cid.toString()) - }) - }) -}) diff --git a/packages/ipfs/test/core/pin.js b/packages/ipfs/test/core/pin.js deleted file mode 100644 index 98da97b55c..0000000000 --- a/packages/ipfs/test/core/pin.js +++ /dev/null @@ -1,437 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const fs = require('fs') -const { - DAGNode -} = require('ipld-dag-pb') -const all = require('it-all') -const CID = require('cids') -const IPFS = require('../../src/core') -const createTempRepo = require('../utils/create-repo-nodejs') - -// fixture structure: -// planets/ -// solar-system.md -// mercury/ -// wiki.md -const pins = { - root: 'QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys', - solarWiki: 'QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG', - mercuryDir: 'QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q', - mercuryWiki: 'QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi' -} -const pinTypes = { - direct: 'direct', - recursive: 'recursive', - indirect: 'indirect', - all: 'all' -} - -describe('pin', function () { - const fixtures = [ - 'test/fixtures/planets/mercury/wiki.md', - 'test/fixtures/planets/solar-system.md' - ].map(path => ({ - path, - content: fs.readFileSync(path) - })) - - let ipfs - let pin - let repo - - async function isPinnedWithType (path, type) { - try { - for await (const _ of pin.ls(path, { type })) { // eslint-disable-line no-unused-vars - return true - } - return false - } catch (err) { - return false - } - } - - async function expectPinned (cid, type = pinTypes.all, pinned = true) { - if (typeof type === 'boolean') { - pinned = type - type = pinTypes.all - } - - const result = await isPinnedWithType(cid, type) - expect(result).to.eql(pinned) - } - - async function clearPins () { - for await (const { cid } of pin.ls({ type: pinTypes.recursive })) { - await pin.rm(cid) - } - - for await (const { cid } of pin.ls({ type: pinTypes.direct })) { - await pin.rm(cid) - } - } - - before(async function () { - this.timeout(20 * 1000) - repo = createTempRepo() - ipfs = await IPFS.create({ - silent: true, - repo, - config: { Bootstrap: [] }, - preload: { enabled: false } - }) - - pin = ipfs.pin - await all(ipfs.add(fixtures)) - }) - - after(function () { - this.timeout(60 * 1000) - return ipfs.stop() - }) - - after(() => repo.teardown()) - - describe('pinned status', function () { - beforeEach(async () => { - await clearPins() - await pin.add(pins.root) - }) - - it('should be pinned when added', async () => { - await pin.add(pins.solarWiki) - return expectPinned(pins.solarWiki) - }) - - it('should not be pinned when not in datastore', () => { - const falseHash = `${pins.root.slice(0, -2)}ss` - return expectPinned(falseHash, false) - }) - - it('should not be pinned when in datastore but not added', async () => { - await pin.rm(pins.root) - return expectPinned(pins.root, false) - }) - - it('should be pinned recursively when added', () => { - return expectPinned(pins.root, pinTypes.recursive) - }) - - it('should be pinned indirectly', () => { - return expectPinned(pins.mercuryWiki, pinTypes.indirect) - }) - - it('should be pinned directly', async () => { - await pin.add(pins.mercuryDir, { recursive: false }) - return expectPinned(pins.mercuryDir, pinTypes.direct) - }) - - it('should not be pinned when not in datastore or added', async () => { - await clearPins() - return expectPinned(pins.mercuryDir, pinTypes.direct, false) - }) - }) - - describe('add', function () { - beforeEach(function () { - return clearPins() - }) - - it('should add recursively', async () => { - await pin.add(pins.root) - await expectPinned(pins.root, pinTypes.recursive) - - const pinChecks = Object.values(pins).map(hash => expectPinned(hash)) - return Promise.all(pinChecks) - }) - - it('should add directly', async () => { - await pin.add(pins.root, { recursive: false }) - await Promise.all([ - expectPinned(pins.root, pinTypes.direct), - expectPinned(pins.solarWiki, false) - ]) - }) - - it('should recursively pin parent of direct pin', async () => { - await pin.add(pins.solarWiki, { recursive: false }) - await pin.add(pins.root) - await Promise.all([ - // solarWiki is pinned both directly and indirectly o.O - expectPinned(pins.solarWiki, pinTypes.direct), - expectPinned(pins.solarWiki, pinTypes.indirect) - ]) - }) - - it('should fail to directly pin a recursive pin', async () => { - await pin.add(pins.root) - return expect(pin.add(pins.root, { recursive: false })) - .to.eventually.be.rejected() - .with(/already pinned recursively/) - }) - - it('should fail to pin a hash not in datastore', function () { - this.timeout(5 * 1000) - const falseHash = `${pins.root.slice(0, -2)}ss` - return expect(pin.add(falseHash, { timeout: '2s' })) - .to.eventually.be.rejected() - .with.a.property('code').that.equals('ERR_TIMEOUT') - }) - - // TODO block rm breaks subsequent tests - // it.skip('needs all children in datastore to pin recursively', () => { - // return ipfs.block.rm(pins.mercuryWiki) - // .then(() => expectTimeout(pin.add(pins.root), 4000)) - // }) - }) - - describe('ls', function () { - before(async () => { - await clearPins() - await Promise.all([ - pin.add(pins.root), - pin.add(pins.mercuryDir, { recursive: false }) - ]) - }) - - it('should list pins of a particular CID', async () => { - const out = await all(pin.ls(pins.mercuryDir)) - expect(out[0].cid.toString()).to.eql(pins.mercuryDir) - }) - - it('should list indirect pins that supersede direct pins', async () => { - const ls = await all(pin.ls()) - const pinType = ls.find(out => out.cid.toString() === pins.mercuryDir).type - expect(pinType).to.eql(pinTypes.indirect) - }) - - it('should list all pins', async () => { - const out = await all(pin.ls()) - - expect(out).to.deep.include.members([ - { - type: 'recursive', - cid: new CID('QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys') - }, - { - type: 'indirect', - cid: new CID('QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG') - }, - { - type: 'indirect', - cid: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q') - }, - { - type: 'indirect', - cid: new CID('QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi') - } - ]) - }) - - it('should list all direct pins', async () => { - const out = await all(pin.ls({ type: 'direct' })) - - expect(out).to.deep.include.members([ - { - type: 'direct', - cid: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q') - } - ]) - }) - - it('should list all recursive pins', async () => { - const out = await all(pin.ls({ type: 'recursive' })) - - expect(out).to.deep.include.members([ - { - type: 'recursive', - cid: new CID('QmTAMavb995EHErSrKo7mB8dYkpaSJxu6ys1a6XJyB2sys') - } - ]) - }) - - it('should list all indirect pins', async () => { - const out = await all(pin.ls({ type: 'indirect' })) - - expect(out).to.deep.include.members([ - { - type: 'indirect', - cid: new CID('QmTMbkDfvHwq3Aup6Nxqn3KKw9YnoKzcZvuArAfQ9GF3QG') - }, - { - type: 'indirect', - cid: new CID('QmbJCNKXJqVK8CzbjpNFz2YekHwh3CSHpBA86uqYg3sJ8q') - }, - { - type: 'indirect', - cid: new CID('QmVgSHAdMxFAuMP2JiMAYkB8pCWP1tcB9djqvq8GKAFiHi') - } - ]) - }) - - it('should list direct pins for CID', async () => { - const out = await all(pin.ls(pins.mercuryDir, { type: 'direct' })) - - expect(out).to.have.deep.members([ - { - type: 'direct', - cid: new CID(pins.mercuryDir) - } - ]) - }) - - it('should list direct pins for path', async () => { - const out = await all(pin.ls(`/ipfs/${pins.root}/mercury/`, { type: 'direct' })) - - expect(out).to.have.deep.members([ - { - type: 'direct', - cid: new CID(pins.mercuryDir) - } - ]) - }) - - it('should list direct pins for path (no match)', () => { - return expect(all(pin.ls(`/ipfs/${pins.root}/mercury/wiki.md`, { type: 'direct' }))) - .to.eventually.be.rejected() - }) - - it('should list direct pins for CID (no match)', () => { - return expect(all(pin.ls(pins.root, { type: 'direct' }))) - .to.eventually.be.rejected() - }) - - it('should list recursive pins for CID', async () => { - const out = await all(pin.ls(pins.root, { type: 'recursive' })) - - expect(out).to.have.deep.members([ - { - type: 'recursive', - cid: new CID(pins.root) - } - ]) - }) - - it('should list recursive pins for CID (no match)', () => { - return expect(all(pin.ls(pins.mercuryDir, { type: 'recursive' }))) - .to.eventually.be.rejected() - }) - - it('should list indirect pins for CID', async () => { - const out = await all(pin.ls(pins.solarWiki, { type: 'indirect' })) - - expect(out).to.have.deep.members([ - { - type: `indirect through ${pins.root}`, - cid: new CID(pins.solarWiki) - } - ]) - }) - - it('should list indirect pins for CID (no match)', () => { - return expect(all(pin.ls(pins.root, { type: 'indirect' }))) - .to.eventually.be.rejected() - }) - }) - - describe('rm', function () { - beforeEach(async () => { - await clearPins() - await pin.add(pins.root) - }) - - it('should remove a recursive pin', async () => { - await pin.rm(pins.root) - await Promise.all([ - expectPinned(pins.root, false), - expectPinned(pins.mercuryWiki, false) - ]) - }) - - it('should remove a direct pin', async () => { - await clearPins() - await pin.add(pins.mercuryDir, { recursive: false }) - await pin.rm(pins.mercuryDir) - await expectPinned(pins.mercuryDir, false) - }) - - it('should fail to remove an indirect pin', async () => { - await expect(pin.rm(pins.solarWiki)) - .to.eventually.be.rejected() - .with(/is pinned indirectly under/) - await expectPinned(pins.solarWiki) - }) - - it('should fail when an item is not pinned', async () => { - await pin.rm(pins.root) - await expect(pin.rm(pins.root)) - .to.eventually.be.rejected() - .with(/is not pinned/) - }) - }) - - describe('non-dag-pb nodes', function () { - it('should pin dag-cbor', async () => { - const cid = await ipfs.dag.put({}, { - format: 'dag-cbor', - hashAlg: 'sha2-256' - }) - - await pin.add(cid) - - const pins = await all(pin.ls()) - - expect(pins).to.deep.include({ - type: 'recursive', - cid - }) - }) - - it('should pin raw', async () => { - const cid = await ipfs.dag.put(Buffer.alloc(0), { - format: 'raw', - hashAlg: 'sha2-256' - }) - - await pin.add(cid) - - const pins = await all(pin.ls()) - - expect(pins).to.deep.include({ - type: 'recursive', - cid - }) - }) - - it('should pin dag-cbor with dag-pb child', async () => { - const child = await ipfs.dag.put(new DAGNode(Buffer.alloc(0)), { - format: 'dag-pb', - hashAlg: 'sha2-256' - }) - const parent = await ipfs.dag.put({ - child - }, { - format: 'dag-cbor', - hashAlg: 'sha2-256' - }) - - await pin.add(parent, { - recursive: true - }) - - const pins = await all(pin.ls()) - - expect(pins).to.deep.include({ - cid: parent, - type: 'recursive' - }) - expect(pins).to.deep.include({ - cid: child, - type: 'indirect' - }) - }) - }) -}) diff --git a/packages/ipfs/test/core/pin.spec.js b/packages/ipfs/test/core/pin.spec.js deleted file mode 100644 index f1e2ba1ab8..0000000000 --- a/packages/ipfs/test/core/pin.spec.js +++ /dev/null @@ -1,34 +0,0 @@ -/* eslint max-nested-callbacks: ["error", 8] */ -/* eslint-env mocha */ -'use strict' - -const { expect } = require('interface-ipfs-core/src/utils/mocha') -const all = require('it-all') -const factory = require('../utils/factory') - -describe('pin', function () { - this.timeout(10 * 1000) - const df = factory() - let ipfsd, ipfs - - before(async () => { - ipfsd = await df.spawn() - ipfs = ipfsd.api - }) - - after(() => df.clean()) - - describe('ls', () => { - it('should throw error for invalid non-string pin type option', () => { - return expect(all(ipfs.pin.ls({ type: 6 }))) - .to.eventually.be.rejected() - .with.property('code').that.equals('ERR_INVALID_PIN_TYPE') - }) - - it('should throw error for invalid string pin type option', () => { - return expect(all(ipfs.pin.ls({ type: '__proto__' }))) - .to.eventually.be.rejected() - .with.property('code').that.equals('ERR_INVALID_PIN_TYPE') - }) - }) -}) diff --git a/packages/ipfs/test/http-api/inject/pin.js b/packages/ipfs/test/http-api/inject/pin.js index f4a77ab1d6..bb1967e023 100644 --- a/packages/ipfs/test/http-api/inject/pin.js +++ b/packages/ipfs/test/http-api/inject/pin.js @@ -52,7 +52,14 @@ module.exports = (http) => { }) it('unpins recursive pins', async () => { - const res = await api.inject({ + let res = await api.inject({ + method: 'POST', + url: `/api/v0/pin/add?arg=${pins.root1}` + }) + + expect(res.statusCode).to.equal(200) + + res = await api.inject({ method: 'POST', url: `/api/v0/pin/rm?arg=${pins.root1}` })