Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make various fixes. #17

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions src/decode/table-from-ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,16 @@ function contextGenerator(options, version, dictionaryMap) {
*/
function visit(type, ctx) {
const { typeId } = type;
const BatchType = batchType(type, ctx.options);
const { length, options, node, buffer, variadic, version } = ctx;
const BatchType = batchType(type, options);

if (typeId === Type.Null) {
// no field node, no buffers
return new BatchType({ length: ctx.length, nullCount: length });
return new BatchType({ length, nullCount: length, type });
}

// extract the next { length, nullCount } field node
const node = { ...ctx.node(), type };
const base = { ...node(), type };

switch (typeId) {
// validity and data value buffers
Expand All @@ -157,9 +158,9 @@ function visit(type, ctx) {
case Type.Interval:
case Type.FixedSizeBinary:
return new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(type.values)
...base,
validity: buffer(),
values: buffer(type.values)
});

// validity, offset, and value buffers
Expand All @@ -168,79 +169,79 @@ function visit(type, ctx) {
case Type.Binary:
case Type.LargeBinary:
return new BatchType({
...node,
validity: ctx.buffer(),
offsets: ctx.buffer(type.offsets),
values: ctx.buffer()
...base,
validity: buffer(),
offsets: buffer(type.offsets),
values: buffer()
});

// views with variadic buffers
case Type.BinaryView:
case Type.Utf8View:
return new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(), // views buffer
data: Array.from({ length: ctx.variadic() }, () => ctx.buffer()) // data buffers
...base,
validity: buffer(),
values: buffer(), // views buffer
data: Array.from({ length: variadic() }, () => buffer()) // data buffers
});

// validity, offset, and list child
case Type.List:
case Type.LargeList:
case Type.Map:
return new BatchType({
...node,
validity: ctx.buffer(),
offsets: ctx.buffer(type.offsets),
...base,
validity: buffer(),
offsets: buffer(type.offsets),
children: ctx.visit(type.children)
});

// validity, offset, size, and list child
case Type.ListView:
case Type.LargeListView:
return new BatchType({
...node,
validity: ctx.buffer(),
offsets: ctx.buffer(type.offsets),
sizes: ctx.buffer(type.offsets),
...base,
validity: buffer(),
offsets: buffer(type.offsets),
sizes: buffer(type.offsets),
children: ctx.visit(type.children)
});

// validity and children
case Type.FixedSizeList:
case Type.Struct:
return new BatchType({
...node,
validity: ctx.buffer(),
...base,
validity: buffer(),
children: ctx.visit(type.children)
});

// children only
case Type.RunEndEncoded:
return new BatchType({
...node,
...base,
children: ctx.visit(type.children)
});

// dictionary
case Type.Dictionary: {
const { id, indices } = type;
return new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(indices.values),
...base,
validity: buffer(),
values: buffer(indices.values),
}).setDictionary(ctx.dictionary(id));
}

// union
case Type.Union: {
if (ctx.version < Version.V5) {
ctx.buffer(); // skip unused null bitmap
if (version < Version.V5) {
buffer(); // skip unused null bitmap
}
return new BatchType({
...node,
typeIds: ctx.buffer(int8Array),
offsets: type.mode === UnionMode.Sparse ? null : ctx.buffer(type.offsets),
...base,
typeIds: buffer(int8Array),
offsets: type.mode === UnionMode.Sparse ? null : buffer(type.offsets),
children: ctx.visit(type.children)
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/encode/builder.js
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ export function prep(builder, size, additionalBytes) {
const alignSize = (~used + 1) & (size - 1);

// reallocate the buffer if needed
buf = grow(buf, used + alignSize + size);
buf = grow(buf, used + alignSize + size - 1, true);
space += buf.length - bufSize;

// add padding
Expand Down
10 changes: 8 additions & 2 deletions src/encode/encode-ipc.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import { encodeSchema } from './schema.js';
import { writeMessage } from './message.js';
import { MemorySink } from './sink.js';

const STREAM = 'stream';
const FILE = 'file';

/**
* Encode assembled data into Arrow IPC binary format.
* @param {any} data Assembled table data.
Expand All @@ -15,10 +18,13 @@ import { MemorySink } from './sink.js';
* @param {'stream' | 'file'} [options.format] Arrow stream or file format.
* @returns {import('./sink.js').Sink} The sink that was passed in.
*/
export function encodeIPC(data, { sink, format } = {}) {
export function encodeIPC(data, { sink, format = STREAM } = {}) {
if (format !== STREAM && format !== FILE) {
throw new Error(`Unrecognized Arrow IPC format: ${format}`);
}
const { schema, dictionaries = [], records = [], metadata } = data;
const builder = new Builder(sink || new MemorySink());
const file = format === 'file';
const file = format === FILE;
const dictBlocks = [];
const recordBlocks = [];

Expand Down
18 changes: 10 additions & 8 deletions src/util/arrays.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,26 +118,28 @@ export function align(array, length = array.length) {
* @template {import('../types.js').TypedArray} T
* @param {T} array The array.
* @param {number} newLength The new length.
* @param {number} [offset] The offset at which to copy the old array.
* @returns {T} The resized array.
*/
export function resize(array, newLength) {
export function resize(array, newLength, offset = 0) {
// @ts-ignore
const newArray = new array.constructor(newLength);
newArray.set(array, array.length);
newArray.set(array, offset);
return newArray;
}

/**
* Grow a typed array to accommdate a minimum length. The array size is
* doubled until it meets or exceeds the minimum length.
* Grow a typed array to accommdate a minimum index. The array size is
* doubled until it exceeds the minimum index.
* @template {import('../types.js').TypedArray} T
* @param {T} array The array.
* @param {number} minLength The minimum length.
* @param {number} index The minimum index.
* @param {boolean} [shift] Flag to shift copied bytes to back of array.
* @returns {T} The resized array.
*/
export function grow(array, minLength) {
while (array.length < minLength) {
array = resize(array, array.length << 1);
export function grow(array, index, shift) {
while (array.length <= index) {
array = resize(array, array.length << 1, shift ? array.length : 0);
}
return array;
}