Skip to content

Commit

Permalink
Make various fixes. (#17)
Browse files Browse the repository at this point in the history
* fix: Properly initialize null batch.

* fix: Raise error on invalid format.

* fix: Fix array resizing logic.
  • Loading branch information
jheer authored Sep 12, 2024
1 parent d870178 commit bd28ef4
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 43 deletions.
65 changes: 33 additions & 32 deletions src/decode/table-from-ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,16 @@ function contextGenerator(options, version, dictionaryMap) {
*/
function visit(type, ctx) {
const { typeId } = type;
const BatchType = batchType(type, ctx.options);
const { length, options, node, buffer, variadic, version } = ctx;
const BatchType = batchType(type, options);

if (typeId === Type.Null) {
// no field node, no buffers
return new BatchType({ length: ctx.length, nullCount: length });
return new BatchType({ length, nullCount: length, type });
}

// extract the next { length, nullCount } field node
const node = { ...ctx.node(), type };
const base = { ...node(), type };

switch (typeId) {
// validity and data value buffers
Expand All @@ -157,9 +158,9 @@ function visit(type, ctx) {
case Type.Interval:
case Type.FixedSizeBinary:
return new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(type.values)
...base,
validity: buffer(),
values: buffer(type.values)
});

// validity, offset, and value buffers
Expand All @@ -168,79 +169,79 @@ function visit(type, ctx) {
case Type.Binary:
case Type.LargeBinary:
return new BatchType({
...node,
validity: ctx.buffer(),
offsets: ctx.buffer(type.offsets),
values: ctx.buffer()
...base,
validity: buffer(),
offsets: buffer(type.offsets),
values: buffer()
});

// views with variadic buffers
case Type.BinaryView:
case Type.Utf8View:
return new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(), // views buffer
data: Array.from({ length: ctx.variadic() }, () => ctx.buffer()) // data buffers
...base,
validity: buffer(),
values: buffer(), // views buffer
data: Array.from({ length: variadic() }, () => buffer()) // data buffers
});

// validity, offset, and list child
case Type.List:
case Type.LargeList:
case Type.Map:
return new BatchType({
...node,
validity: ctx.buffer(),
offsets: ctx.buffer(type.offsets),
...base,
validity: buffer(),
offsets: buffer(type.offsets),
children: ctx.visit(type.children)
});

// validity, offset, size, and list child
case Type.ListView:
case Type.LargeListView:
return new BatchType({
...node,
validity: ctx.buffer(),
offsets: ctx.buffer(type.offsets),
sizes: ctx.buffer(type.offsets),
...base,
validity: buffer(),
offsets: buffer(type.offsets),
sizes: buffer(type.offsets),
children: ctx.visit(type.children)
});

// validity and children
case Type.FixedSizeList:
case Type.Struct:
return new BatchType({
...node,
validity: ctx.buffer(),
...base,
validity: buffer(),
children: ctx.visit(type.children)
});

// children only
case Type.RunEndEncoded:
return new BatchType({
...node,
...base,
children: ctx.visit(type.children)
});

// dictionary
case Type.Dictionary: {
const { id, indices } = type;
return new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(indices.values),
...base,
validity: buffer(),
values: buffer(indices.values),
}).setDictionary(ctx.dictionary(id));
}

// union
case Type.Union: {
if (ctx.version < Version.V5) {
ctx.buffer(); // skip unused null bitmap
if (version < Version.V5) {
buffer(); // skip unused null bitmap
}
return new BatchType({
...node,
typeIds: ctx.buffer(int8Array),
offsets: type.mode === UnionMode.Sparse ? null : ctx.buffer(type.offsets),
...base,
typeIds: buffer(int8Array),
offsets: type.mode === UnionMode.Sparse ? null : buffer(type.offsets),
children: ctx.visit(type.children)
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/encode/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ export function prep(builder, size, additionalBytes) {
const alignSize = (~used + 1) & (size - 1);

// reallocate the buffer if needed
buf = grow(buf, used + alignSize + size);
buf = grow(buf, used + alignSize + size - 1, true);
space += buf.length - bufSize;

// add padding
Expand Down
10 changes: 8 additions & 2 deletions src/encode/encode-ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { encodeSchema } from './schema.js';
import { writeMessage } from './message.js';
import { MemorySink } from './sink.js';

const STREAM = 'stream';
const FILE = 'file';

/**
* Encode assembled data into Arrow IPC binary format.
* @param {any} data Assembled table data.
Expand All @@ -15,10 +18,13 @@ import { MemorySink } from './sink.js';
* @param {'stream' | 'file'} [options.format] Arrow stream or file format.
* @returns {import('./sink.js').Sink} The sink that was passed in.
*/
export function encodeIPC(data, { sink, format } = {}) {
export function encodeIPC(data, { sink, format = STREAM } = {}) {
if (format !== STREAM && format !== FILE) {
throw new Error(`Unrecognized Arrow IPC format: ${format}`);
}
const { schema, dictionaries = [], records = [], metadata } = data;
const builder = new Builder(sink || new MemorySink());
const file = format === 'file';
const file = format === FILE;
const dictBlocks = [];
const recordBlocks = [];

Expand Down
18 changes: 10 additions & 8 deletions src/util/arrays.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,28 @@ export function align(array, length = array.length) {
* @template {import('../types.js').TypedArray} T
* @param {T} array The array.
* @param {number} newLength The new length.
* @param {number} [offset] The offset at which to copy the old array.
* @returns {T} The resized array.
*/
export function resize(array, newLength) {
export function resize(array, newLength, offset = 0) {
// @ts-ignore
const newArray = new array.constructor(newLength);
newArray.set(array, array.length);
newArray.set(array, offset);
return newArray;
}

/**
* Grow a typed array to accommdate a minimum length. The array size is
* doubled until it meets or exceeds the minimum length.
* Grow a typed array to accommdate a minimum index. The array size is
* doubled until it exceeds the minimum index.
* @template {import('../types.js').TypedArray} T
* @param {T} array The array.
* @param {number} minLength The minimum length.
* @param {number} index The minimum index.
* @param {boolean} [shift] Flag to shift copied bytes to back of array.
* @returns {T} The resized array.
*/
export function grow(array, minLength) {
while (array.length < minLength) {
array = resize(array, array.length << 1);
export function grow(array, index, shift) {
while (array.length <= index) {
array = resize(array, array.length << 1, shift ? array.length : 0);
}
return array;
}

0 comments on commit bd28ef4

Please sign in to comment.