@@ -5,7 +5,8 @@ const ShardingStore = core.ShardingDatastore
5
5
const Block = require ( 'ipld-block' )
6
6
const { cidToKey, keyToCid } = require ( './blockstore-utils' )
7
7
const map = require ( 'it-map' )
8
- const pipe = require ( 'it-pipe' )
8
+ const drain = require ( 'it-drain' )
9
+ const pushable = require ( 'it-pushable' )
9
10
10
11
module . exports = async ( filestore , options ) => {
11
12
const store = await maybeWithSharding ( filestore , options )
@@ -23,47 +24,39 @@ function maybeWithSharding (filestore, options) {
23
24
function createBaseStore ( store ) {
24
25
return {
25
26
/**
26
- * Query the store.
27
+ * Query the store
27
28
*
28
29
* @param {Object } query
29
30
* @param {Object } options
30
- * @returns {AsyncIterator<Block> }
31
+ * @returns {AsyncIterator<Block|CID > }
31
32
*/
32
- async * query ( query , options ) { // eslint-disable-line require-await
33
- yield * store . query ( query , options )
33
+ async * query ( query , options ) {
34
+ for await ( const { key, value } of store . query ( query , options ) ) {
35
+ if ( query . keysOnly ) {
36
+ yield keyToCid ( key )
37
+ continue
38
+ }
39
+
40
+ yield new Block ( value , keyToCid ( key ) )
41
+ }
34
42
} ,
43
+
35
44
/**
36
- * Get a single block by CID.
45
+ * Get a single block by CID
37
46
*
38
47
* @param {CID } cid
39
48
* @param {Object } options
40
49
* @returns {Promise<Block> }
41
50
*/
42
51
async get ( cid , options ) {
43
52
const key = cidToKey ( cid )
44
- let blockData
45
- try {
46
- blockData = await store . get ( key , options )
47
- return new Block ( blockData , cid )
48
- } catch ( err ) {
49
- if ( err . code === 'ERR_NOT_FOUND' ) {
50
- const otherCid = cidToOtherVersion ( cid )
51
-
52
- if ( ! otherCid ) {
53
- throw err
54
- }
55
-
56
- const otherKey = cidToKey ( otherCid )
57
- const blockData = await store . get ( otherKey , options )
58
- await store . put ( key , blockData )
59
- return new Block ( blockData , cid )
60
- }
53
+ const blockData = await store . get ( key , options )
61
54
62
- throw err
63
- }
55
+ return new Block ( blockData , cid )
64
56
} ,
57
+
65
58
/**
66
- * Like get, but for more.
59
+ * Like get, but for more
67
60
*
68
61
* @param {AsyncIterator<CID> } cids
69
62
* @param {Object } options
@@ -74,8 +67,9 @@ function createBaseStore (store) {
74
67
yield this . get ( cid , options )
75
68
}
76
69
} ,
70
+
77
71
/**
78
- * Write a single block to the store.
72
+ * Write a single block to the store
79
73
*
80
74
* @param {Block } block
81
75
* @param {Object } options
@@ -86,59 +80,75 @@ function createBaseStore (store) {
86
80
throw new Error ( 'invalid block' )
87
81
}
88
82
89
- const exists = await this . has ( block . cid )
83
+ const key = cidToKey ( block . cid )
84
+ const exists = await store . has ( key , options )
90
85
91
- if ( exists ) {
92
- return this . get ( block . cid , options )
86
+ if ( ! exists ) {
87
+ await store . put ( key , block . data , options )
93
88
}
94
89
95
- await store . put ( cidToKey ( block . cid ) , block . data , options )
96
-
97
90
return block
98
91
} ,
99
92
100
93
/**
101
- * Like put, but for more.
94
+ * Like put, but for more
102
95
*
103
96
* @param {AsyncIterable<Block>|Iterable<Block> } blocks
104
97
* @param {Object } options
105
98
* @returns {AsyncIterable<Block> }
106
99
*/
107
100
async * putMany ( blocks , options ) { // eslint-disable-line require-await
108
- yield * pipe (
109
- blocks ,
110
- ( source ) => {
111
- // turn them into a key/value pair
112
- return map ( source , ( block ) => {
113
- return { key : cidToKey ( block . cid ) , value : block . data }
114
- } )
115
- } ,
116
- ( source ) => {
117
- // put them into the datastore
118
- return store . putMany ( source , options )
119
- } ,
120
- ( source ) => {
121
- // map the returned key/value back into a block
122
- return map ( source , ( { key, value } ) => {
123
- return new Block ( value , keyToCid ( key ) )
124
- } )
101
+ // we cannot simply chain to `store.putMany` because we convert a CID into
102
+ // a key based on the multihash only, so we lose the version & codec and
103
+ // cannot give the user back the CID they used to create the block, so yield
104
+ // to `store.putMany` but return the actual block the user passed in.
105
+ //
106
+ // nb. we want to use `store.putMany` here so bitswap can control batching
107
+ // up block HAVEs to send to the network - if we use multiple `store.put`s
108
+ // it will not be able to guess we are about to `store.put` more blocks
109
+ const output = pushable ( )
110
+
111
+ // process.nextTick runs on the microtask queue, setImmediate runs on the next
112
+ // event loop iteration so is slower. Use process.nextTick if it is available.
113
+ const runner = process && process . nextTick ? process . nextTick : setImmediate
114
+
115
+ runner ( async ( ) => {
116
+ try {
117
+ await drain ( store . putMany ( async function * ( ) {
118
+ for await ( const block of blocks ) {
119
+ const key = cidToKey ( block . cid )
120
+ const exists = await store . has ( key , options )
121
+
122
+ if ( ! exists ) {
123
+ yield { key, value : block . data }
124
+ }
125
+
126
+ // there is an assumption here that after the yield has completed
127
+ // the underlying datastore has finished writing the block
128
+ output . push ( block )
129
+ }
130
+ } ( ) ) )
131
+
132
+ output . end ( )
133
+ } catch ( err ) {
134
+ output . end ( err )
125
135
}
126
- )
136
+ } )
137
+
138
+ yield * output
127
139
} ,
140
+
128
141
/**
129
- * Does the store contain block with this cid ?
142
+ * Does the store contain block with this CID ?
130
143
*
131
144
* @param {CID } cid
132
145
* @param {Object } options
133
146
* @returns {Promise<bool> }
134
147
*/
135
- async has ( cid , options ) {
136
- const exists = await store . has ( cidToKey ( cid ) , options )
137
- if ( exists ) return exists
138
- const otherCid = cidToOtherVersion ( cid )
139
- if ( ! otherCid ) return false
140
- return store . has ( cidToKey ( otherCid ) , options )
148
+ async has ( cid , options ) { // eslint-disable-line require-await
149
+ return store . has ( cidToKey ( cid ) , options )
141
150
} ,
151
+
142
152
/**
143
153
* Delete a block from the store
144
154
*
@@ -149,6 +159,7 @@ function createBaseStore (store) {
149
159
async delete ( cid , options ) { // eslint-disable-line require-await
150
160
return store . delete ( cidToKey ( cid ) , options )
151
161
} ,
162
+
152
163
/**
153
164
* Delete a block from the store
154
165
*
@@ -157,12 +168,9 @@ function createBaseStore (store) {
157
168
* @returns {Promise<void> }
158
169
*/
159
170
async * deleteMany ( cids , options ) { // eslint-disable-line require-await
160
- yield * store . deleteMany ( ( async function * ( ) {
161
- for await ( const cid of cids ) {
162
- yield cidToKey ( cid )
163
- }
164
- } ( ) ) , options )
171
+ yield * store . deleteMany ( map ( cids , cid => cidToKey ( cid ) ) , options )
165
172
} ,
173
+
166
174
/**
167
175
* Close the store
168
176
*
@@ -173,11 +181,3 @@ function createBaseStore (store) {
173
181
}
174
182
}
175
183
}
176
-
177
- function cidToOtherVersion ( cid ) {
178
- try {
179
- return cid . version === 0 ? cid . toV1 ( ) : cid . toV0 ( )
180
- } catch ( err ) {
181
- return null
182
- }
183
- }
0 commit comments