Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BinaryView and Utf8View type support. #11

Merged
merged 1 commit into from
Aug 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions src/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -815,3 +815,69 @@ export class DictionaryBatch extends ArrayBatch {
return /** @type {number} */ (this.values[index]);
}
}

/**
* @template T
* @extends {ArrayBatch<T>}
*/
class ViewBatch extends ArrayBatch {
/**
* Create a new view batch.
* @param {object} options Batch options.
* @param {number} options.length The length of the batch
* @param {number} options.nullCount The null value count
* @param {Uint8Array} [options.validity] Validity bitmap buffer
* @param {Uint8Array} options.values Values buffer
* @param {Uint8Array[]} options.data View data buffers
*/
constructor({ data, ...rest }) {
super(rest);
this.data = data;
}

/**
* Get the binary data at the provided index.
* @param {number} index The value index.
* @returns {Uint8Array}
*/
view(index) {
const { values, data } = this;
const offset = index << 4; // each entry is 16 bytes
let start = offset + 4;
let buf = /** @type {Uint8Array} */ (values);
const length = readInt32(buf, offset);
if (length > 12) {
// longer strings are in a data buffer
start = readInt32(buf, offset + 12);
buf = data[readInt32(buf, offset + 8)];
}
return buf.subarray(start, start + length);
}
}

/**
* A batch of binary blobs from variable data buffers, returned as byte
* buffers of unsigned 8-bit integers.
* @extends {ViewBatch<Uint8Array>}
*/
export class BinaryViewBatch extends ViewBatch {
/**
* @param {number} index The value index.
*/
value(index) {
return this.view(index);
}
}

/**
* A batch of UTF-8 strings from variable data buffers.
* @extends {ViewBatch<string>}
*/
export class Utf8ViewBatch extends ViewBatch {
/**
* @param {number} index The value index.
*/
value(index) {
return decodeUtf8(this.view(index));
}
}
4 changes: 0 additions & 4 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ export const Type = /** @type {const} */ ({
*
* Since it uses a variable number of data buffers, each Field with this type
* must have a corresponding entry in `variadicBufferCounts`.
*
* Not currently supported by Flechette.
*/
BinaryView: 23,
/**
Expand All @@ -266,8 +264,6 @@ export const Type = /** @type {const} */ ({
*
* Since it uses a variable number of data buffers, each Field with this type
* must have a corresponding entry in `variadicBufferCounts`.
*
* Not currently supported by Flechette.
*/
Utf8View: 24,
/**
Expand Down
7 changes: 1 addition & 6 deletions src/decode/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,5 @@ export function decodeBlock(buf, index) {
* @returns An array of message blocks.
*/
export function decodeBlocks(buf, index) {
const { length, base } = readVector(buf, index);
const batches = [];
for (let i = 0; i < length; ++i) {
batches.push(decodeBlock(buf, base + i * 24));
}
return batches;
return readVector(buf, index, 24, decodeBlock);
}
12 changes: 5 additions & 7 deletions src/decode/data-type.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export function decodeDataType(buf, index, typeId, children) {
case Type.NONE:
case Type.Null:
case Type.Bool:
case Type.BinaryView:
case Type.Utf8View:
return { typeId };
case Type.Binary:
case Type.Utf8:
Expand All @@ -30,7 +32,7 @@ export function decodeDataType(buf, index, typeId, children) {
return { typeId, children: [children?.[0]], offsets: int64 };
case Type.Struct:
case Type.RunEndEncoded:
// @ts-ignore
// @ts-ignore - suppress children length warning for run-end encoded
return { typeId, children };
case Type.Int:
return decodeInt(buf, index);
Expand All @@ -56,9 +58,7 @@ export function decodeDataType(buf, index, typeId, children) {
return decodeMap(buf, index, children);
case Type.Union:
return decodeUnion(buf, index, children);

}
// TODO: collect errors, skip failures?
throw new Error(`Unrecognized type: "${keyFor(Type, typeId)}" (id ${typeId})`);
}

Expand Down Expand Up @@ -285,12 +285,10 @@ function decodeUnion(buf, index, children) {
// 4: mode
// 6: typeIds
const get = table(buf, index);
const { length, base } = readVector(buf, get(6, readOffset));
return {
typeId: Type.Union,
mode: /** @type {typeof UnionMode[keyof UnionMode]} */
(get(4, readInt16, UnionMode.Sparse)),
typeIds: new int32(buf.buffer, buf.byteOffset + base, length),
mode: /** @type {typeof UnionMode[keyof UnionMode]} */ (get(4, readInt16, UnionMode.Sparse)),
typeIds: readVector(buf, get(6, readOffset), 4, readInt32),
children: children ?? [],
offsets: int32
};
Expand Down
19 changes: 8 additions & 11 deletions src/decode/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ import { readString, readVector, table } from '../util.js';
* @returns {import('../types.js').Metadata | null} The custom metadata map
*/
export function decodeMetadata(buf, index) {
const { length, base } = readVector(buf, index);
const metadata = length > 0 ? new Map : null;
for (let i = 0; i < length; ++i) {
// 4: key (string)
// 6: key (string)
const get = table(buf, base + i * 4);
const key = get(4, readString);
const val = get(6, readString);
if (key || val) metadata.set(key, val);
}
return metadata?.size ? metadata : null;
const entries = readVector(buf, index, 4, (buf, pos) => {
const get = table(buf, pos);
return /** @type {[string, string]} */ ([
get(4, readString), // 4: key (string)
get(6, readString) // 6: key (string)
]);
});
return entries.length ? new Map(entries) : null;
}
46 changes: 13 additions & 33 deletions src/decode/record-batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,26 @@ export function decodeRecordBatch(buf, index, version) {
// 6: nodes
// 8: buffers
// 10: compression (not supported)
// 12: variadicBuffers (for view types, not supported)
// 12: variadicBuffers (buffer counts for view-typed fields)
const get = table(buf, index);
if (get(10, readOffset, 0)) {
throw new Error('Record batch compression not implemented');
}

// field nodes
const nodes = [];
const nodeVector = get(6, readVector);
if (nodeVector) {
const { length, base } = nodeVector;
for (let i = 0; i < length; ++i) {
const pos = base + i * 16;
nodes.push({
length: readInt64AsNum(buf, pos),
nullCount: readInt64AsNum(buf, pos + 8)
});
}
}

// buffers
const buffers = [];
const bufferVector = get(8, readVector);
if (bufferVector) {
const { length, base } = bufferVector;
const adjust = version < Version.V4;
for (let i = 0; i < length; ++i) {
// If this Arrow buffer was written before version 4,
// advance 8 bytes to skip the now-removed page_id field
const pos = base + i * 16 + (adjust ? (8 * (i + 1)) : 0);
buffers.push({
offset: readInt64AsNum(buf, pos),
length: readInt64AsNum(buf, pos + 8)
});
}
}
// If an Arrow buffer was written before version 4,
// advance 8 bytes to skip the now-removed page_id field
const offset = version < Version.V4 ? 8 : 0;

return {
length: get(4, readInt64AsNum, 0),
nodes,
buffers
nodes: readVector(buf, get(6, readOffset), 16, (buf, pos) => ({
length: readInt64AsNum(buf, pos),
nullCount: readInt64AsNum(buf, pos + 8)
})),
buffers: readVector(buf, get(8, readOffset), 16 + offset, (buf, pos) => ({
offset: readInt64AsNum(buf, pos + offset),
length: readInt64AsNum(buf, pos + offset + 8)
})),
variadic: readVector(buf, get(12, readOffset), 8, readInt64AsNum)
};
}
27 changes: 10 additions & 17 deletions src/decode/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ export function decodeSchema(buf, index, version) {
const get = table(buf, index);
return {
version,
endianness: /** @type {import('../types.js').Endianness_} */
(get(4, readInt16, 0)),
endianness: /** @type {import('../types.js').Endianness_} */ (get(4, readInt16, 0)),
fields: get(6, (buf, off) => decodeSchemaFields(buf, off, dictionaryTypes), []),
metadata: get(8, decodeMetadata),
dictionaryTypes
Expand All @@ -31,12 +30,9 @@ export function decodeSchema(buf, index, version) {
* @returns {import('../types.js').Field[] | null}
*/
function decodeSchemaFields(buf, fieldsOffset, dictionaryTypes) {
const { length, base } = readVector(buf, fieldsOffset);
const fields = [];
for (let i = 0; i < length; ++i) {
fields.push(decodeField(buf, base + i * 4, dictionaryTypes));
}
return fields;
return readVector(buf, fieldsOffset, 4,
(buf, pos) => decodeField(buf, pos, dictionaryTypes)
);
}

/**
Expand All @@ -53,8 +49,8 @@ function decodeField(buf, index, dictionaryTypes) {
const get = table(buf, index);
const typeId = get(8, readUint8, Type.NONE);
const typeOffset = get(10, readOffset, 0);
const dict = get(12, (buf, off) => decodeDictionary(buf, off));
const children = get(14, decodeFieldChildren);
const dict = get(12, decodeDictionary);
const children = get(14, (buf, off) => decodeFieldChildren(buf, off, dictionaryTypes));

let type;
if (dict) {
Expand Down Expand Up @@ -83,13 +79,10 @@ function decodeField(buf, index, dictionaryTypes) {
/**
* @returns {import('../types.js').Field[] | null}
*/
function decodeFieldChildren(buf, fieldOffset, dictionaries) {
const { length, base } = readVector(buf, fieldOffset);
const children = [];
for (let i = 0; i < length; ++i) {
const pos = base + i * 4;
children.push(decodeField(buf, pos, dictionaries));
}
function decodeFieldChildren(buf, fieldOffset, dictionaryTypes) {
const children = readVector(buf, fieldOffset, 4,
(buf, pos) => decodeField(buf, pos, dictionaryTypes)
);
return children.length ? children : null;
}

Expand Down
33 changes: 24 additions & 9 deletions src/table-from-ipc.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { int8 } from './array-types.js';
import {
BinaryBatch,
BinaryViewBatch,
BoolBatch,
DateBatch,
DateDayBatch,
Expand Down Expand Up @@ -32,7 +33,8 @@ import {
TimestampMillisecondBatch,
TimestampNanosecondBatch,
TimestampSecondBatch,
Utf8Batch
Utf8Batch,
Utf8ViewBatch
} from './batch.js';
import { columnBuilder } from './column.js';
import {
Expand Down Expand Up @@ -118,9 +120,10 @@ function contextGenerator(options, version, dictionaryMap) {

// return a context generator
return batch => {
const { length, nodes, buffers, body } = batch;
const { length, nodes, buffers, variadic, body } = batch;
let nodeIndex = -1;
let bufferIndex = -1;
let variadicIndex = -1;
return {
...base,
length,
Expand All @@ -131,6 +134,7 @@ function contextGenerator(options, version, dictionaryMap) {
? new ArrayType(body.buffer, body.byteOffset + offset, length / ArrayType.BYTES_PER_ELEMENT)
: body.subarray(offset, offset + length)
},
variadic: () => variadic[++variadicIndex],
visitAll(list) { return list.map(x => visit(x.type, this)); }
};
};
Expand Down Expand Up @@ -165,6 +169,12 @@ function visit(type, ctx) {
offsets: ctx.buffer(type.offsets),
values: ctx.buffer()
});
const view = (BatchType) => new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(), // views buffer
data: Array.from({ length: ctx.variadic() }, () => ctx.buffer()) // data buffers
});
const list = (BatchType) => new BatchType({
...node,
validity: ctx.buffer(),
Expand Down Expand Up @@ -220,6 +230,10 @@ function visit(type, ctx) {
case Type.Binary: return offset(BinaryBatch);
case Type.LargeBinary: return offset(LargeBinaryBatch);

// views with variadic buffers
case Type.BinaryView: return view(BinaryViewBatch);
case Type.Utf8View: return view(Utf8ViewBatch);

// validity, offset, and list child
case Type.List: return list(ListBatch);
case Type.LargeList: return list(LargeListBatch);
Expand Down Expand Up @@ -258,16 +272,17 @@ function visit(type, ctx) {
ctx.buffer(); // skip unused null bitmap
}
const isSparse = type.mode === UnionMode.Sparse;
const typeIds = ctx.buffer(int8);
const offsets = isSparse ? null : ctx.buffer(type.offsets);
const children = ctx.visitAll(type.children);
const map = type.typeIds.reduce((map, id, i) => ((map[id] = i), map), {});
const options = { ...node, map, typeIds, offsets, children };
return isSparse ? new SparseUnionBatch(options) : new DenseUnionBatch(options);
return new (isSparse ? SparseUnionBatch : DenseUnionBatch)({
...node,
map: type.typeIds.reduce((map, id, i) => ((map[id] = i), map), {}),
typeIds: ctx.buffer(int8),
offsets: isSparse ? null : ctx.buffer(type.offsets),
children: ctx.visitAll(type.children)
});
}

// unsupported type
default:
throw new Error(`Unsupported type: ${typeId}, (${keyFor(Type, typeId)})`);
throw new Error(`Unsupported type: ${typeId} ${keyFor(Type, typeId)}`);
}
}
Loading