Skip to content

Commit

Permalink
feat: Add BinaryView and Utf8View type support. (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
jheer authored Aug 23, 2024
1 parent 4a1bc1c commit 1eeaa3f
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 96 deletions.
66 changes: 66 additions & 0 deletions src/batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -815,3 +815,69 @@ export class DictionaryBatch extends ArrayBatch {
return /** @type {number} */ (this.values[index]);
}
}

/**
* @template T
* @extends {ArrayBatch<T>}
*/
class ViewBatch extends ArrayBatch {
/**
* Create a new view batch.
* @param {object} options Batch options.
* @param {number} options.length The length of the batch
* @param {number} options.nullCount The null value count
* @param {Uint8Array} [options.validity] Validity bitmap buffer
* @param {Uint8Array} options.values Values buffer
* @param {Uint8Array[]} options.data View data buffers
*/
constructor({ data, ...rest }) {
super(rest);
this.data = data;
}

/**
* Get the binary data at the provided index.
* @param {number} index The value index.
* @returns {Uint8Array}
*/
view(index) {
const { values, data } = this;
const offset = index << 4; // each entry is 16 bytes
let start = offset + 4;
let buf = /** @type {Uint8Array} */ (values);
const length = readInt32(buf, offset);
if (length > 12) {
// longer strings are in a data buffer
start = readInt32(buf, offset + 12);
buf = data[readInt32(buf, offset + 8)];
}
return buf.subarray(start, start + length);
}
}

/**
* A batch of binary blobs from variable data buffers, returned as byte
* buffers of unsigned 8-bit integers.
* @extends {ViewBatch<Uint8Array>}
*/
export class BinaryViewBatch extends ViewBatch {
/**
* @param {number} index The value index.
*/
value(index) {
return this.view(index);
}
}

/**
* A batch of UTF-8 strings from variable data buffers.
* @extends {ViewBatch<string>}
*/
export class Utf8ViewBatch extends ViewBatch {
/**
* @param {number} index The value index.
*/
value(index) {
return decodeUtf8(this.view(index));
}
}
4 changes: 0 additions & 4 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,6 @@ export const Type = /** @type {const} */ ({
*
* Since it uses a variable number of data buffers, each Field with this type
* must have a corresponding entry in `variadicBufferCounts`.
*
* Not currently supported by Flechette.
*/
BinaryView: 23,
/**
Expand All @@ -266,8 +264,6 @@ export const Type = /** @type {const} */ ({
*
* Since it uses a variable number of data buffers, each Field with this type
* must have a corresponding entry in `variadicBufferCounts`.
*
* Not currently supported by Flechette.
*/
Utf8View: 24,
/**
Expand Down
7 changes: 1 addition & 6 deletions src/decode/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,5 @@ export function decodeBlock(buf, index) {
* @returns An array of message blocks.
*/
export function decodeBlocks(buf, index) {
const { length, base } = readVector(buf, index);
const batches = [];
for (let i = 0; i < length; ++i) {
batches.push(decodeBlock(buf, base + i * 24));
}
return batches;
return readVector(buf, index, 24, decodeBlock);
}
12 changes: 5 additions & 7 deletions src/decode/data-type.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ export function decodeDataType(buf, index, typeId, children) {
case Type.NONE:
case Type.Null:
case Type.Bool:
case Type.BinaryView:
case Type.Utf8View:
return { typeId };
case Type.Binary:
case Type.Utf8:
Expand All @@ -30,7 +32,7 @@ export function decodeDataType(buf, index, typeId, children) {
return { typeId, children: [children?.[0]], offsets: int64 };
case Type.Struct:
case Type.RunEndEncoded:
// @ts-ignore
// @ts-ignore - suppress children length warning for run-end encoded
return { typeId, children };
case Type.Int:
return decodeInt(buf, index);
Expand All @@ -56,9 +58,7 @@ export function decodeDataType(buf, index, typeId, children) {
return decodeMap(buf, index, children);
case Type.Union:
return decodeUnion(buf, index, children);

}
// TODO: collect errors, skip failures?
throw new Error(`Unrecognized type: "${keyFor(Type, typeId)}" (id ${typeId})`);
}

Expand Down Expand Up @@ -285,12 +285,10 @@ function decodeUnion(buf, index, children) {
// 4: mode
// 6: typeIds
const get = table(buf, index);
const { length, base } = readVector(buf, get(6, readOffset));
return {
typeId: Type.Union,
mode: /** @type {typeof UnionMode[keyof UnionMode]} */
(get(4, readInt16, UnionMode.Sparse)),
typeIds: new int32(buf.buffer, buf.byteOffset + base, length),
mode: /** @type {typeof UnionMode[keyof UnionMode]} */ (get(4, readInt16, UnionMode.Sparse)),
typeIds: readVector(buf, get(6, readOffset), 4, readInt32),
children: children ?? [],
offsets: int32
};
Expand Down
19 changes: 8 additions & 11 deletions src/decode/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,12 @@ import { readString, readVector, table } from '../util.js';
* @returns {import('../types.js').Metadata | null} The custom metadata map
*/
export function decodeMetadata(buf, index) {
const { length, base } = readVector(buf, index);
const metadata = length > 0 ? new Map : null;
for (let i = 0; i < length; ++i) {
// 4: key (string)
// 6: key (string)
const get = table(buf, base + i * 4);
const key = get(4, readString);
const val = get(6, readString);
if (key || val) metadata.set(key, val);
}
return metadata?.size ? metadata : null;
const entries = readVector(buf, index, 4, (buf, pos) => {
const get = table(buf, pos);
return /** @type {[string, string]} */ ([
get(4, readString), // 4: key (string)
get(6, readString) // 6: key (string)
]);
});
return entries.length ? new Map(entries) : null;
}
46 changes: 13 additions & 33 deletions src/decode/record-batch.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,46 +13,26 @@ export function decodeRecordBatch(buf, index, version) {
// 6: nodes
// 8: buffers
// 10: compression (not supported)
// 12: variadicBuffers (for view types, not supported)
// 12: variadicBuffers (buffer counts for view-typed fields)
const get = table(buf, index);
if (get(10, readOffset, 0)) {
throw new Error('Record batch compression not implemented');
}

// field nodes
const nodes = [];
const nodeVector = get(6, readVector);
if (nodeVector) {
const { length, base } = nodeVector;
for (let i = 0; i < length; ++i) {
const pos = base + i * 16;
nodes.push({
length: readInt64AsNum(buf, pos),
nullCount: readInt64AsNum(buf, pos + 8)
});
}
}

// buffers
const buffers = [];
const bufferVector = get(8, readVector);
if (bufferVector) {
const { length, base } = bufferVector;
const adjust = version < Version.V4;
for (let i = 0; i < length; ++i) {
// If this Arrow buffer was written before version 4,
// advance 8 bytes to skip the now-removed page_id field
const pos = base + i * 16 + (adjust ? (8 * (i + 1)) : 0);
buffers.push({
offset: readInt64AsNum(buf, pos),
length: readInt64AsNum(buf, pos + 8)
});
}
}
// If an Arrow buffer was written before version 4,
// advance 8 bytes to skip the now-removed page_id field
const offset = version < Version.V4 ? 8 : 0;

return {
length: get(4, readInt64AsNum, 0),
nodes,
buffers
nodes: readVector(buf, get(6, readOffset), 16, (buf, pos) => ({
length: readInt64AsNum(buf, pos),
nullCount: readInt64AsNum(buf, pos + 8)
})),
buffers: readVector(buf, get(8, readOffset), 16 + offset, (buf, pos) => ({
offset: readInt64AsNum(buf, pos + offset),
length: readInt64AsNum(buf, pos + offset + 8)
})),
variadic: readVector(buf, get(12, readOffset), 8, readInt64AsNum)
};
}
27 changes: 10 additions & 17 deletions src/decode/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ export function decodeSchema(buf, index, version) {
const get = table(buf, index);
return {
version,
endianness: /** @type {import('../types.js').Endianness_} */
(get(4, readInt16, 0)),
endianness: /** @type {import('../types.js').Endianness_} */ (get(4, readInt16, 0)),
fields: get(6, (buf, off) => decodeSchemaFields(buf, off, dictionaryTypes), []),
metadata: get(8, decodeMetadata),
dictionaryTypes
Expand All @@ -31,12 +30,9 @@ export function decodeSchema(buf, index, version) {
* @returns {import('../types.js').Field[] | null}
*/
function decodeSchemaFields(buf, fieldsOffset, dictionaryTypes) {
const { length, base } = readVector(buf, fieldsOffset);
const fields = [];
for (let i = 0; i < length; ++i) {
fields.push(decodeField(buf, base + i * 4, dictionaryTypes));
}
return fields;
return readVector(buf, fieldsOffset, 4,
(buf, pos) => decodeField(buf, pos, dictionaryTypes)
);
}

/**
Expand All @@ -53,8 +49,8 @@ function decodeField(buf, index, dictionaryTypes) {
const get = table(buf, index);
const typeId = get(8, readUint8, Type.NONE);
const typeOffset = get(10, readOffset, 0);
const dict = get(12, (buf, off) => decodeDictionary(buf, off));
const children = get(14, decodeFieldChildren);
const dict = get(12, decodeDictionary);
const children = get(14, (buf, off) => decodeFieldChildren(buf, off, dictionaryTypes));

let type;
if (dict) {
Expand Down Expand Up @@ -83,13 +79,10 @@ function decodeField(buf, index, dictionaryTypes) {
/**
* @returns {import('../types.js').Field[] | null}
*/
function decodeFieldChildren(buf, fieldOffset, dictionaries) {
const { length, base } = readVector(buf, fieldOffset);
const children = [];
for (let i = 0; i < length; ++i) {
const pos = base + i * 4;
children.push(decodeField(buf, pos, dictionaries));
}
function decodeFieldChildren(buf, fieldOffset, dictionaryTypes) {
const children = readVector(buf, fieldOffset, 4,
(buf, pos) => decodeField(buf, pos, dictionaryTypes)
);
return children.length ? children : null;
}

Expand Down
33 changes: 24 additions & 9 deletions src/table-from-ipc.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { int8 } from './array-types.js';
import {
BinaryBatch,
BinaryViewBatch,
BoolBatch,
DateBatch,
DateDayBatch,
Expand Down Expand Up @@ -32,7 +33,8 @@ import {
TimestampMillisecondBatch,
TimestampNanosecondBatch,
TimestampSecondBatch,
Utf8Batch
Utf8Batch,
Utf8ViewBatch
} from './batch.js';
import { columnBuilder } from './column.js';
import {
Expand Down Expand Up @@ -118,9 +120,10 @@ function contextGenerator(options, version, dictionaryMap) {

// return a context generator
return batch => {
const { length, nodes, buffers, body } = batch;
const { length, nodes, buffers, variadic, body } = batch;
let nodeIndex = -1;
let bufferIndex = -1;
let variadicIndex = -1;
return {
...base,
length,
Expand All @@ -131,6 +134,7 @@ function contextGenerator(options, version, dictionaryMap) {
? new ArrayType(body.buffer, body.byteOffset + offset, length / ArrayType.BYTES_PER_ELEMENT)
: body.subarray(offset, offset + length)
},
variadic: () => variadic[++variadicIndex],
visitAll(list) { return list.map(x => visit(x.type, this)); }
};
};
Expand Down Expand Up @@ -165,6 +169,12 @@ function visit(type, ctx) {
offsets: ctx.buffer(type.offsets),
values: ctx.buffer()
});
const view = (BatchType) => new BatchType({
...node,
validity: ctx.buffer(),
values: ctx.buffer(), // views buffer
data: Array.from({ length: ctx.variadic() }, () => ctx.buffer()) // data buffers
});
const list = (BatchType) => new BatchType({
...node,
validity: ctx.buffer(),
Expand Down Expand Up @@ -220,6 +230,10 @@ function visit(type, ctx) {
case Type.Binary: return offset(BinaryBatch);
case Type.LargeBinary: return offset(LargeBinaryBatch);

// views with variadic buffers
case Type.BinaryView: return view(BinaryViewBatch);
case Type.Utf8View: return view(Utf8ViewBatch);

// validity, offset, and list child
case Type.List: return list(ListBatch);
case Type.LargeList: return list(LargeListBatch);
Expand Down Expand Up @@ -258,16 +272,17 @@ function visit(type, ctx) {
ctx.buffer(); // skip unused null bitmap
}
const isSparse = type.mode === UnionMode.Sparse;
const typeIds = ctx.buffer(int8);
const offsets = isSparse ? null : ctx.buffer(type.offsets);
const children = ctx.visitAll(type.children);
const map = type.typeIds.reduce((map, id, i) => ((map[id] = i), map), {});
const options = { ...node, map, typeIds, offsets, children };
return isSparse ? new SparseUnionBatch(options) : new DenseUnionBatch(options);
return new (isSparse ? SparseUnionBatch : DenseUnionBatch)({
...node,
map: type.typeIds.reduce((map, id, i) => ((map[id] = i), map), {}),
typeIds: ctx.buffer(int8),
offsets: isSparse ? null : ctx.buffer(type.offsets),
children: ctx.visitAll(type.children)
});
}

// unsupported type
default:
throw new Error(`Unsupported type: ${typeId}, (${keyFor(Type, typeId)})`);
throw new Error(`Unsupported type: ${typeId} ${keyFor(Type, typeId)}`);
}
}
Loading

0 comments on commit 1eeaa3f

Please sign in to comment.