diff --git a/src/decode/table-from-ipc.js b/src/decode/table-from-ipc.js index bcecdc8..eda3ecb 100644 --- a/src/decode/table-from-ipc.js +++ b/src/decode/table-from-ipc.js @@ -134,15 +134,16 @@ function contextGenerator(options, version, dictionaryMap) { */ function visit(type, ctx) { const { typeId } = type; - const BatchType = batchType(type, ctx.options); + const { length, options, node, buffer, variadic, version } = ctx; + const BatchType = batchType(type, options); if (typeId === Type.Null) { // no field node, no buffers - return new BatchType({ length: ctx.length, nullCount: length }); + return new BatchType({ length, nullCount: length, type }); } // extract the next { length, nullCount } field node - const node = { ...ctx.node(), type }; + const base = { ...node(), type }; switch (typeId) { // validity and data value buffers @@ -157,9 +158,9 @@ function visit(type, ctx) { case Type.Interval: case Type.FixedSizeBinary: return new BatchType({ - ...node, - validity: ctx.buffer(), - values: ctx.buffer(type.values) + ...base, + validity: buffer(), + values: buffer(type.values) }); // validity, offset, and value buffers @@ -168,20 +169,20 @@ function visit(type, ctx) { case Type.Binary: case Type.LargeBinary: return new BatchType({ - ...node, - validity: ctx.buffer(), - offsets: ctx.buffer(type.offsets), - values: ctx.buffer() + ...base, + validity: buffer(), + offsets: buffer(type.offsets), + values: buffer() }); // views with variadic buffers case Type.BinaryView: case Type.Utf8View: return new BatchType({ - ...node, - validity: ctx.buffer(), - values: ctx.buffer(), // views buffer - data: Array.from({ length: ctx.variadic() }, () => ctx.buffer()) // data buffers + ...base, + validity: buffer(), + values: buffer(), // views buffer + data: Array.from({ length: variadic() }, () => buffer()) // data buffers }); // validity, offset, and list child @@ -189,9 +190,9 @@ function visit(type, ctx) { case Type.LargeList: case Type.Map: return new BatchType({ - ...node, - validity: ctx.buffer(), - offsets: ctx.buffer(type.offsets), + ...base, + validity: buffer(), + offsets: buffer(type.offsets), children: ctx.visit(type.children) }); @@ -199,10 +200,10 @@ function visit(type, ctx) { case Type.ListView: case Type.LargeListView: return new BatchType({ - ...node, - validity: ctx.buffer(), - offsets: ctx.buffer(type.offsets), - sizes: ctx.buffer(type.offsets), + ...base, + validity: buffer(), + offsets: buffer(type.offsets), + sizes: buffer(type.offsets), children: ctx.visit(type.children) }); @@ -210,15 +211,15 @@ function visit(type, ctx) { case Type.FixedSizeList: case Type.Struct: return new BatchType({ - ...node, - validity: ctx.buffer(), + ...base, + validity: buffer(), children: ctx.visit(type.children) }); // children only case Type.RunEndEncoded: return new BatchType({ - ...node, + ...base, children: ctx.visit(type.children) }); @@ -226,21 +227,21 @@ function visit(type, ctx) { case Type.Dictionary: { const { id, indices } = type; return new BatchType({ - ...node, - validity: ctx.buffer(), - values: ctx.buffer(indices.values), + ...base, + validity: buffer(), + values: buffer(indices.values), }).setDictionary(ctx.dictionary(id)); } // union case Type.Union: { - if (ctx.version < Version.V5) { - ctx.buffer(); // skip unused null bitmap + if (version < Version.V5) { + buffer(); // skip unused null bitmap } return new BatchType({ - ...node, - typeIds: ctx.buffer(int8Array), - offsets: type.mode === UnionMode.Sparse ? null : ctx.buffer(type.offsets), + ...base, + typeIds: buffer(int8Array), + offsets: type.mode === UnionMode.Sparse ? null : buffer(type.offsets), children: ctx.visit(type.children) }); } diff --git a/src/encode/builder.js b/src/encode/builder.js index 1343180..05ffb58 100644 --- a/src/encode/builder.js +++ b/src/encode/builder.js @@ -282,7 +282,7 @@ export function prep(builder, size, additionalBytes) { const alignSize = (~used + 1) & (size - 1); // reallocate the buffer if needed - buf = grow(buf, used + alignSize + size); + buf = grow(buf, used + alignSize + size - 1, true); space += buf.length - bufSize; // add padding diff --git a/src/encode/encode-ipc.js b/src/encode/encode-ipc.js index 177d1e4..f2da170 100644 --- a/src/encode/encode-ipc.js +++ b/src/encode/encode-ipc.js @@ -7,6 +7,9 @@ import { encodeSchema } from './schema.js'; import { writeMessage } from './message.js'; import { MemorySink } from './sink.js'; +const STREAM = 'stream'; +const FILE = 'file'; + /** * Encode assembled data into Arrow IPC binary format. * @param {any} data Assembled table data. @@ -15,10 +18,13 @@ import { MemorySink } from './sink.js'; * @param {'stream' | 'file'} [options.format] Arrow stream or file format. * @returns {import('./sink.js').Sink} The sink that was passed in. */ -export function encodeIPC(data, { sink, format } = {}) { +export function encodeIPC(data, { sink, format = STREAM } = {}) { + if (format !== STREAM && format !== FILE) { + throw new Error(`Unrecognized Arrow IPC format: ${format}`); + } const { schema, dictionaries = [], records = [], metadata } = data; const builder = new Builder(sink || new MemorySink()); - const file = format === 'file'; + const file = format === FILE; const dictBlocks = []; const recordBlocks = []; diff --git a/src/util/arrays.js b/src/util/arrays.js index fab0bbb..c28d49f 100644 --- a/src/util/arrays.js +++ b/src/util/arrays.js @@ -118,26 +118,28 @@ export function align(array, length = array.length) { * @template {import('../types.js').TypedArray} T * @param {T} array The array. * @param {number} newLength The new length. + * @param {number} [offset] The offset at which to copy the old array. * @returns {T} The resized array. */ -export function resize(array, newLength) { +export function resize(array, newLength, offset = 0) { // @ts-ignore const newArray = new array.constructor(newLength); - newArray.set(array, array.length); + newArray.set(array, offset); return newArray; } /** - * Grow a typed array to accommdate a minimum length. The array size is - * doubled until it meets or exceeds the minimum length. + * Grow a typed array to accommdate a minimum index. The array size is + * doubled until it exceeds the minimum index. * @template {import('../types.js').TypedArray} T * @param {T} array The array. - * @param {number} minLength The minimum length. + * @param {number} index The minimum index. + * @param {boolean} [shift] Flag to shift copied bytes to back of array. * @returns {T} The resized array. */ -export function grow(array, minLength) { - while (array.length < minLength) { - array = resize(array, array.length << 1); +export function grow(array, index, shift) { + while (array.length <= index) { + array = resize(array, array.length << 1, shift ? array.length : 0); } return array; }