Skip to content
This repository has been archived by the owner on Mar 23, 2023. It is now read-only.

fix: fix query with ops #9

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .aegir.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module.exports = {
bundlesize: { maxSize: '12.1kB' }
bundlesize: { maxSize: '13kB' }
}
4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
"dependencies": {
"buffer": "^5.5.0",
"idb": "^5.0.2",
"interface-datastore": "^1.0.2"
"interface-datastore": "ipfs/interface-datastore#test/add-tests-for-mutating-datastore-during-query"
},
"devDependencies": {
"aegir": "^22.0.0",
"aegir": "^23.0.0",
"chai": "^4.2.0",
"datastore-core": "^1.1.0",
"datastore-level": "^1.1.0",
Expand Down
80 changes: 49 additions & 31 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ const str2ab = (str) => {
return buf
}

const queryIt = async function * (q, store, location) {
const queryIt = async function * (q, instance) {
const { db, location } = instance
const range = q.prefix ? self.IDBKeyRange.bound(str2ab(q.prefix), str2ab(q.prefix + '\xFF'), false, true) : undefined
let cursor = await store.transaction(location).store.openCursor(range)
const tx = db.transaction(location, 'readwrite')
const store = tx.objectStore(location)
let cursor = await store.openCursor(range)
let limit = 0

instance.tx = tx

if (cursor && q.offset && q.offset > 0) {
cursor = await cursor.advance(q.offset)
}
Expand All @@ -68,26 +73,41 @@ const queryIt = async function * (q, store, location) {
}
cursor = await cursor.continue()
}
instance.tx = null
}

class IdbDatastore extends Adapter {
constructor (location, options = {}) {
super()

this.store = null
this.db = null
this.options = options
this.location = options.prefix + location
this.location = (options.prefix || '') + location
this.version = options.version || 1
/** @type {IDBTransaction} */
this.tx = null
}

getStore (mode) {
if (this.db === null) {
throw new Error('Datastore needs to be opened.')
}

if (this.tx) {
return this.tx.objectStore(this.location)
}

return this.db.transaction(this.location, mode).objectStore(this.location)
}

async open () {
if (this.store !== null) {
if (this.db !== null) {
return
}

const location = this.location
try {
this.store = await openDB(this.location, this.version, {
this.db = await openDB(this.location, this.version, {
upgrade (db) {
db.createObjectStore(location)
}
Expand All @@ -98,23 +118,17 @@ class IdbDatastore extends Adapter {
}

async put (key, val) {
if (this.store === null) {
throw new Error('Datastore needs to be opened.')
}
try {
await this.store.put(this.location, val, key.toBuffer())
await this.getStore('readwrite').put(val, key.toBuffer())
} catch (err) {
throw Errors.dbWriteFailedError(err)
}
}

async get (key) {
if (this.store === null) {
throw new Error('Datastore needs to be opened.')
}
let value
try {
value = await this.store.get(this.location, key.toBuffer())
value = await this.getStore().get(key.toBuffer())
} catch (err) {
throw Errors.dbWriteFailedError(err)
}
Expand All @@ -126,25 +140,29 @@ class IdbDatastore extends Adapter {
return typedarrayToBuffer(value)
}

/**
* Check if a key exists in the datastore
*
* @param {Key} key
* @returns {boolean}
*/
async has (key) {
if (this.store === null) {
throw new Error('Datastore needs to be opened.')
}
let value
try {
await this.get(key)
value = await this.getStore().getKey(key.toBuffer())
} catch (err) {
if (err.code === 'ERR_NOT_FOUND') return false
throw err
throw Errors.dbWriteFailedError(err)
}

if (!value) {
return false
}
return true
}

async delete (key) {
if (this.store === null) {
throw new Error('Datastore needs to be opened.')
}
try {
await this.store.delete(this.location, key.toBuffer())
await this.getStore('readwrite').delete(key.toBuffer())
} catch (err) {
throw Errors.dbDeleteFailedError(err)
}
Expand All @@ -162,10 +180,10 @@ class IdbDatastore extends Adapter {
dels.push(key.toBuffer())
},
commit: async () => {
if (this.store === null) {
if (this.db === null) {
throw new Error('Datastore needs to be opened.')
}
const tx = this.store.transaction(this.location, 'readwrite')
const tx = this.db.transaction(this.location, 'readwrite')
const store = tx.store
await Promise.all(puts.map(p => store.put(p[1], p[0])))
await Promise.all(dels.map(p => store.delete(p)))
Expand All @@ -175,10 +193,10 @@ class IdbDatastore extends Adapter {
}

query (q) {
if (this.store === null) {
if (this.db === null) {
throw new Error('Datastore needs to be opened.')
}
let it = queryIt(q, this.store, this.location)
let it = queryIt(q, this)

if (Array.isArray(q.filters)) {
it = q.filters.reduce((it, f) => filter(it, f), it)
Expand All @@ -192,11 +210,11 @@ class IdbDatastore extends Adapter {
}

close () {
if (this.store === null) {
if (this.db === null) {
throw new Error('Datastore needs to be opened.')
}
this.store.close()
this.store = null
this.db.close()
this.db = null
}

destroy () {
Expand Down
71 changes: 70 additions & 1 deletion test/index.spec.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
/* eslint-env mocha */
'use strict'

const { expect } = require('aegir/utils/chai')
const { MountDatastore } = require('datastore-core')
const { Key } = require('interface-datastore')
const { isNode } = require('ipfs-utils/src/env')
const IDBStore = require('../src')

describe('LevelDatastore', function () {
describe('IndexedDB Datastore', function () {
if (isNode) {
return
}

describe('interface-datastore (idb)', () => {
const store = new IDBStore('hello')
require('interface-datastore/src/tests')({
Expand Down Expand Up @@ -51,4 +53,71 @@ describe('LevelDatastore', function () {
}
})
})

describe('concurrency', () => {
let store

before(async () => {
store = new IDBStore('hello')
await store.open()
})

it('should not explode under unreasonable load', function (done) {
this.timeout(10000)

const updater = setInterval(async () => {
try {
const key = new Key('/a-' + Date.now())

await store.put(key, Buffer.from([0, 1, 2, 3]))
await store.has(key)
await store.get(key)
} catch (err) {
clearInterval(updater)
clearInterval(mutatorQuery)
clearInterval(readOnlyQuery)
done(err)
}
}, 0)

const mutatorQuery = setInterval(async () => {
try {
for await (const { key } of store.query({})) {
await store.get(key)

const otherKey = new Key('/b-' + Date.now())
const otherValue = Buffer.from([0, 1, 2, 3])
await store.put(otherKey, otherValue)
const res = await store.get(otherKey)
expect(res).to.deep.equal(otherValue)
}
} catch (err) {
clearInterval(updater)
clearInterval(mutatorQuery)
clearInterval(readOnlyQuery)
done(err)
}
}, 0)

const readOnlyQuery = setInterval(async () => {
try {
for await (const { key } of store.query({})) {
await store.has(key)
}
} catch (err) {
clearInterval(updater)
clearInterval(mutatorQuery)
clearInterval(readOnlyQuery)
done(err)
}
}, 0)

setTimeout(() => {
clearInterval(updater)
clearInterval(mutatorQuery)
clearInterval(readOnlyQuery)
done()
}, 5000)
})
})
})