diff --git a/__tests__/integration/schema_e2e.test.ts b/__tests__/integration/schema_e2e.test.ts new file mode 100644 index 00000000..31a9a997 --- /dev/null +++ b/__tests__/integration/schema_e2e.test.ts @@ -0,0 +1,215 @@ +import type { ClickHouseClient } from '../../src' +import { createTableWithSchema, createTestClient, guid } from '../utils' +import * as ch from '../../src/schema' +import { And, Eq, Or } from '../../src/schema' + +describe('schema e2e test', () => { + let client: ClickHouseClient + let tableName: string + + beforeEach(async () => { + client = await createTestClient() + tableName = `schema_e2e_test_${guid()}` + }) + afterEach(async () => { + await client.close() + }) + + const shape = { + id: ch.UUID, + name: ch.String, + sku: ch.Array(ch.UInt8), + active: ch.Bool, + } + let table: ch.Table + type Value = ch.Infer + + const value1: Value = { + id: '8dbb28f7-4da0-4e49-af71-e830aee422eb', + name: 'foo', + sku: [1, 2], + active: true, + } + const value2: Value = { + id: '314f5ac4-fe93-4c39-b26c-0cb079be0767', + name: 'bar', + sku: [3, 4], + active: false, + } + + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['id'] + ) + }) + + it('should insert and select data using arrays', async () => { + await table.insert({ + values: [value1, value2], + }) + const result = await (await table.select()).json() + expect(result).toEqual([value1, value2]) + }) + + it('should insert and select data using streams', async () => { + const values = new ch.InsertStream() + values.add(value1) + values.add(value2) + setTimeout(() => values.complete(), 100) + + await table.insert({ + values, + }) + + const result: Value[] = [] + const { asyncGenerator } = await table.select() + + for await (const value of asyncGenerator()) { + result.push(value) + } + + expect(result).toEqual([value1, value2]) + }) + + // FIXME: find a way to disallow default values + it.skip('should not swallow generic insert errors using arrays', async () => { + await expect( + table.insert({ + values: [{ foobar: 'qaz' } as any], + }) + ).rejects.toEqual( + expect.objectContaining({ + error: 'asdfsdaf', + }) + ) + }) + + // FIXME: find a way to disallow default values + it.skip('should not swallow generic insert errors using streams', async () => { + const values = new ch.InsertStream() + values.add(value1) + values.add({ foobar: 'qaz' } as any) + setTimeout(() => values.complete(), 100) + + await table.insert({ + values, + }) + const result = await (await table.select()).json() + expect(result).toEqual([value1, value2]) + }) + + it('should not swallow generic select errors', async () => { + await expect( + table.select({ + order_by: [['non_existing_column' as any, 'ASC']], + }) + ).rejects.toMatchObject({ + message: expect.stringContaining('Missing columns'), + }) + }) + + it('should use order by / where statements', async () => { + const value3: Value = { + id: '7640bde3-cdc5-4d63-a47e-66c6a16629df', + name: 'qaz', + sku: [6, 7], + active: true, + } + await table.insert({ + values: [value1, value2, value3], + }) + + expect( + await table + .select({ + where: Eq('name', 'bar'), + }) + .then((r) => r.json()) + ).toEqual([value2]) + + expect( + await table + .select({ + where: Or(Eq('name', 'foo'), Eq('name', 'qaz')), + order_by: [['name', 'DESC']], + }) + .then((r) => r.json()) + ).toEqual([value3, value1]) + + expect( + await table + .select({ + where: And(Eq('active', true), Eq('name', 'foo')), + }) + .then((r) => r.json()) + ).toEqual([value1]) + + expect( + await table + .select({ + where: Eq('sku', [3, 4]), + }) + .then((r) => r.json()) + ).toEqual([value2]) + + expect( + await table + .select({ + where: And(Eq('active', true), Eq('name', 'quuux')), + }) + .then((r) => r.json()) + ).toEqual([]) + + expect( + await table + .select({ + order_by: [ + ['active', 'DESC'], + ['name', 'DESC'], + ], + }) + .then((r) => r.json()) + ).toEqual([value3, value1, value2]) + + expect( + await table + .select({ + order_by: [ + ['active', 'DESC'], + ['name', 'ASC'], + ], + }) + .then((r) => r.json()) + ).toEqual([value1, value3, value2]) + }) + + it('should be able to select only specific columns', async () => { + await table.insert({ + values: [value1, value2], + }) + + expect( + await table + .select({ + columns: ['id'], + order_by: [['name', 'ASC']], + }) + .then((r) => r.json()) + ).toEqual([{ id: value2.id }, { id: value1.id }]) + + expect( + await table + .select({ + columns: ['id', 'active'], + order_by: [['name', 'ASC']], + }) + .then((r) => r.json()) + ).toEqual([ + { id: value2.id, active: value2.active }, + { id: value1.id, active: value1.active }, + ]) + }) +}) diff --git a/__tests__/integration/schema_types.test.ts b/__tests__/integration/schema_types.test.ts new file mode 100644 index 00000000..afedc80b --- /dev/null +++ b/__tests__/integration/schema_types.test.ts @@ -0,0 +1,394 @@ +import type { ClickHouseClient } from '../../src' +import { createTableWithSchema, createTestClient, guid } from '../utils' + +import * as ch from '../../src/schema' + +describe('schema types', () => { + let client: ClickHouseClient + let tableName: string + + beforeEach(async () => { + client = await createTestClient() + tableName = `schema_test_${guid()}` + }) + afterEach(async () => { + await client.close() + }) + + describe('(U)Int', () => { + const shape = { + i1: ch.Int8, + i2: ch.Int16, + i3: ch.Int32, + i4: ch.Int64, + i5: ch.Int128, + i6: ch.Int256, + u1: ch.UInt8, + u2: ch.UInt16, + u3: ch.UInt32, + u4: ch.UInt64, + u5: ch.UInt128, + u6: ch.UInt256, + } + const value: ch.Infer = { + i1: 127, + i2: 32767, + i3: 2147483647, + i4: '9223372036854775807', + i5: '170141183460469231731687303715884105727', + i6: '57896044618658097711785492504343953926634992332820282019728792003956564819967', + u1: 255, + u2: 65535, + u3: 4294967295, + u4: '18446744073709551615', + u5: '340282366920938463463374607431768211455', + u6: '115792089237316195423570985008687907853269984665640564039457584007913129639935', + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['i1'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) + + describe('Float', () => { + const shape = { + f1: ch.Float32, + f2: ch.Float64, + } + // TODO: figure out better values for this test + const value: ch.Infer = { + f1: 1.2345, + f2: 2.2345, + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['f1'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) + + describe('String', () => { + const shape = { + s1: ch.String, + s2: ch.FixedString(255), + } + const value: ch.Infer = { + s1: 'foo', + s2: 'bar', + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['s1'] + ) + }) + + it('should insert and select it back', async () => { + await table.insert({ + values: [value], + }) + const result = await (await table.select()).json() + expect(result).toEqual([ + { + s1: value.s1, + s2: value.s2.padEnd(255, '\x00'), + }, + ]) + expect(result[0].s2.length).toEqual(255) + }) + }) + + describe('IP', () => { + const shape = { + ip1: ch.IPv4, + ip2: ch.IPv6, + } + const value: ch.Infer = { + ip1: '127.0.0.116', + ip2: '2001:db8:85a3::8a2e:370:7334', + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['ip1'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) + + describe('Array', () => { + const shape = { + arr1: ch.Array(ch.UInt32), + arr2: ch.Array(ch.String), + arr3: ch.Array(ch.Array(ch.Array(ch.Int32))), + arr4: ch.Array(ch.Nullable(ch.String)), + } + // TODO: better values for this test + const value: ch.Infer = { + arr1: [1, 2], + arr2: ['foo', 'bar'], + arr3: [[[12345]]], + arr4: ['qux', null, 'qaz'], + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['arr2'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) + + describe('Map', () => { + const shape = { + m1: ch.Map(ch.String, ch.String), + m2: ch.Map(ch.Int32, ch.Map(ch.Date, ch.Array(ch.Int32))), + } + const value: ch.Infer = { + m1: { foo: 'bar' }, + m2: { + 42: { + '2022-04-25': [1, 2, 3], + }, + }, + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['m1'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) + + describe('Nullable', () => { + const shape = { + id: ch.Int32, // nullable order by is prohibited + n1: ch.Nullable(ch.String), + n2: ch.Nullable(ch.Date), + } + const value1: ch.Infer = { + id: 1, + n1: 'foo', + n2: null, + } + const value2: ch.Infer = { + id: 2, + n1: null, + n2: '2022-04-30', + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['id'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value1, value2) + }) + }) + + describe('Enum', () => { + enum MyEnum { + Foo = 'Foo', + Bar = 'Bar', + Qaz = 'Qaz', + Qux = 'Qux', + } + + const shape = { + id: ch.Int32, // to preserve the order of values + e: ch.Enum(MyEnum), + } + const values: ch.Infer[] = [ + { id: 1, e: MyEnum.Bar }, + { id: 2, e: MyEnum.Qux }, + { id: 3, e: MyEnum.Foo }, + { id: 4, e: MyEnum.Qaz }, + ] + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['id'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, ...values) + }) + + it('should fail in case of an invalid value', async () => { + await expect( + table.insert({ + values: [{ id: 4, e: 'NonExistingValue' as MyEnum }], + }) + ).rejects.toMatchObject( + expect.objectContaining({ + message: expect.stringContaining( + `Unknown element 'NonExistingValue' for enum` + ), + }) + ) + }) + }) + + describe('Date(Time)', () => { + const shape = { + d1: ch.Date, + d2: ch.Date32, + dt1: ch.DateTime(), + dt2: ch.DateTime64(3), + dt3: ch.DateTime64(6), + dt4: ch.DateTime64(9), + } + const value: ch.Infer = { + d1: '2149-06-06', + d2: '2178-04-16', + dt1: '2106-02-07 06:28:15', + dt2: '2106-02-07 06:28:15.123', + dt3: '2106-02-07 06:28:15.123456', + dt4: '2106-02-07 06:28:15.123456789', + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['d1'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) + + // FIXME: uncomment and extend the test + // once Decimal is re-implemented properly + + // describe('Decimal', () => { + // const shape = { + // d1: ch.Decimal({ + // precision: 9, + // scale: 2, + // }), // Decimal32 + // d2: ch.Decimal({ + // precision: 18, + // scale: 3, + // }), // Decimal64 + // } + // const value: ch.Infer = { + // d1: 1234567.89, + // d2: 123456789123456.789, + // } + // + // let table: ch.Table + // beforeEach(async () => { + // table = await createTableWithSchema( + // client, + // new ch.Schema(shape), + // tableName, + // ['d1'] + // ) + // }) + // + // it('should insert and select it back', async () => { + // await assertInsertAndSelect(table, value) + // }) + // }) + + describe('LowCardinality', () => { + const shape = { + lc1: ch.LowCardinality(ch.String), + } + const value: ch.Infer = { + lc1: 'foobar', + } + + let table: ch.Table + beforeEach(async () => { + table = await createTableWithSchema( + client, + new ch.Schema(shape), + tableName, + ['lc1'] + ) + }) + + it('should insert and select it back', async () => { + await assertInsertAndSelect(table, value) + }) + }) +}) + +async function assertInsertAndSelect( + table: ch.Table, + ...value: ch.Infer[] +) { + await table.insert({ + values: value, + }) + const result = await ( + await table.select({ + clickhouse_settings: { + s3_create_new_file_on_insert: 1, + }, + }) + ).json() + expect(result).toEqual(value) +} diff --git a/__tests__/unit/query_formatter.ts b/__tests__/unit/query_formatter.ts new file mode 100644 index 00000000..7d1aba53 --- /dev/null +++ b/__tests__/unit/query_formatter.ts @@ -0,0 +1,56 @@ +import * as ch from '../../src/schema' +import { QueryFormatter } from '../../src/schema/query_formatter' + +describe('QueryFormatter', () => { + it('should render a simple CREATE TABLE statement', async () => { + const schema = new ch.Schema({ + foo: ch.String, + bar: ch.UInt8, + }) + const tableOptions = { + name: 'my_table', + schema, + } + expect( + QueryFormatter.createTable(tableOptions, { + engine: ch.MergeTree(), + order_by: ['foo'], + }) + ).toEqual( + 'CREATE TABLE my_table (foo String, bar UInt8) ENGINE MergeTree() ORDER BY (foo)' + ) + }) + + it('should render a complex CREATE TABLE statement', async () => { + const schema = new ch.Schema({ + foo: ch.String, + bar: ch.UInt8, + }) + const tableOptions = { + name: 'my_table', + schema, + } + expect( + QueryFormatter.createTable(tableOptions, { + engine: ch.MergeTree(), + if_not_exists: true, + on_cluster: '{cluster}', + order_by: ['foo', 'bar'], + partition_by: ['foo'], + primary_key: ['bar'], + settings: { + merge_max_block_size: 16384, + enable_mixed_granularity_parts: 1, + }, + }) + ).toEqual( + `CREATE TABLE IF NOT EXISTS my_table ON CLUSTER '{cluster}' ` + + '(foo String, bar UInt8) ' + + 'ENGINE MergeTree() ' + + 'ORDER BY (foo, bar) ' + + 'PARTITION BY (foo) ' + + 'PRIMARY KEY (bar) ' + + 'SETTINGS merge_max_block_size = 16384, enable_mixed_granularity_parts = 1' + ) + }) +}) diff --git a/__tests__/unit/schema_select_result.test.ts b/__tests__/unit/schema_select_result.test.ts new file mode 100644 index 00000000..2b173f0d --- /dev/null +++ b/__tests__/unit/schema_select_result.test.ts @@ -0,0 +1,50 @@ +import type { ClickHouseClient } from '../../src' +import { Rows } from '../../src' +import * as ch from '../../src/schema' +import { QueryFormatter } from '../../src/schema/query_formatter' +import { Readable } from 'stream' + +describe('schema select result', () => { + const client: ClickHouseClient = { + command: () => { + // stub + }, + } as any + const schema = new ch.Schema({ + id: ch.UInt32, + name: ch.String, + }) + const table = new ch.Table(client, { + name: 'data_table', + schema, + }) + + beforeEach(() => { + jest + .spyOn(QueryFormatter, 'select') + .mockReturnValueOnce('SELECT * FROM data_table') + jest + .spyOn(client, 'command') + .mockResolvedValueOnce( + new Rows( + Readable.from(['{"valid":"json"}\n', 'invalid_json}\n']), + 'JSONEachRow' + ) + ) + }) + + it('should not swallow error during select stream consumption', async () => { + const { asyncGenerator } = await table.select() + + expect((await asyncGenerator().next()).value).toEqual({ valid: 'json' }) + await expect(asyncGenerator().next()).rejects.toMatchObject({ + message: expect.stringContaining('Unexpected token'), + }) + }) + + it('should not swallow error while converting stream to json', async () => { + await expect(table.select().then((r) => r.json())).rejects.toMatchObject({ + message: expect.stringContaining('Unexpected token'), + }) + }) +}) diff --git a/__tests__/utils/client.ts b/__tests__/utils/client.ts index f822f86a..a43f1557 100644 --- a/__tests__/utils/client.ts +++ b/__tests__/utils/client.ts @@ -13,7 +13,9 @@ export function createTestClient( const env = getClickHouseTestEnvironment() const database = process.env[TestDatabaseEnvKey] console.log( - `Using ${env} test environment to create a Client instance for database ${database}` + `Using ${env} test environment to create a Client instance for database ${ + database || 'default' + }` ) const clickHouseSettings: ClickHouseSettings = {} if (env === TestEnv.LocalCluster || env === TestEnv.Cloud) { diff --git a/__tests__/utils/in_memory_logger.ts b/__tests__/utils/in_memory_logger.ts deleted file mode 100644 index f6c3f5c4..00000000 --- a/__tests__/utils/in_memory_logger.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { type Logger } from '../../src' - -export class InMemoryLogger implements Logger { - private readonly messages: string[] = [] - - constructor(readonly enabled: boolean) {} - - debug(message: string) { - this.messages.push(message) - } - - info(message: string) { - this.messages.push(message) - } - - warning(message: string) { - this.messages.push(message) - } - - error(message: string): void { - this.messages.push(message) - } - - getAll() { - return this.messages - } -} diff --git a/__tests__/utils/index.ts b/__tests__/utils/index.ts index e895a030..fc46efc8 100644 --- a/__tests__/utils/index.ts +++ b/__tests__/utils/index.ts @@ -1,4 +1,3 @@ -export { InMemoryLogger } from './in_memory_logger' export { TestLogger } from './test_logger' export { createTestClient, @@ -10,3 +9,4 @@ export { guid } from './guid' export { getClickHouseTestEnvironment } from './test_env' export { TestEnv } from './test_env' export { retryOnFailure } from './retry' +export { createTableWithSchema } from './schema' diff --git a/__tests__/utils/schema.ts b/__tests__/utils/schema.ts new file mode 100644 index 00000000..68030f44 --- /dev/null +++ b/__tests__/utils/schema.ts @@ -0,0 +1,49 @@ +import { getClickHouseTestEnvironment, TestEnv } from './test_env' +import * as ch from '../../src/schema' +import type { ClickHouseClient } from '../../src' +import type { NonEmptyArray } from '../../src/schema' + +export async function createTableWithSchema( + client: ClickHouseClient, + schema: ch.Schema, + tableName: string, + orderBy: NonEmptyArray +) { + const table = new ch.Table(client, { + name: tableName, + schema, + }) + const env = getClickHouseTestEnvironment() + switch (env) { + case TestEnv.Cloud: + await table.create({ + engine: ch.MergeTree(), + order_by: orderBy, + clickhouse_settings: { + wait_end_of_query: 1, + }, + }) + break + case TestEnv.LocalCluster: + await table.create({ + engine: ch.ReplicatedMergeTree({ + zoo_path: '/clickhouse/{cluster}/tables/{database}/{table}/{shard}', + replica_name: '{replica}', + }), + on_cluster: '{cluster}', + order_by: orderBy, + clickhouse_settings: { + wait_end_of_query: 1, + }, + }) + break + case TestEnv.LocalSingleNode: + await table.create({ + engine: ch.MergeTree(), + order_by: orderBy, + }) + break + } + console.log(`Created table ${tableName}`) + return table +} diff --git a/examples/schema/simple_schema.ts b/examples/schema/simple_schema.ts new file mode 100644 index 00000000..345392c2 --- /dev/null +++ b/examples/schema/simple_schema.ts @@ -0,0 +1,60 @@ +import * as ch from '../../src/schema' +import type { Infer } from '../../src/schema' +import { InsertStream } from '../../src/schema' +import { createClient } from '../../src' + +void (async () => { + const client = createClient() + + enum UserRole { + User = 'User', + Admin = 'Admin', + } + const userSchema = new ch.Schema({ + id: ch.UInt64, + name: ch.String, + externalIds: ch.Array(ch.UInt32), + settings: ch.Map(ch.String, ch.String), + role: ch.Enum(UserRole), + registeredAt: ch.DateTime64(3, 'Europe/Amsterdam'), + }) + + type Data = Infer + + const usersTable = new ch.Table(client, { + name: 'users', + schema: userSchema, + }) + + await usersTable.create({ + engine: ch.MergeTree(), + order_by: ['id'], + }) + + const insertStream = new InsertStream() + insertStream.add({ + // NB: (U)Int64/128/256 are represented as strings + // since their max value > Number.MAX_SAFE_INTEGER + id: '42', + name: 'foo', + externalIds: [1, 2], + settings: { foo: 'bar' }, + role: UserRole.Admin, + registeredAt: '2021-04-30 08:05:37.123', + }) + insertStream.complete() + await usersTable.insert({ + values: insertStream, + clickhouse_settings: { + insert_quorum: '2', + }, + }) + + const { asyncGenerator } = await usersTable.select({ + columns: ['id', 'name', 'registeredAt'], // or omit to select * + order_by: [['name', 'DESC']], + }) + for await (const value of asyncGenerator()) { + console.log(value.id) + } +})() diff --git a/src/client.ts b/src/client.ts index 183c4f98..e53de8ca 100644 --- a/src/client.ts +++ b/src/client.ts @@ -53,6 +53,7 @@ export interface CommandParams extends BaseParams { export interface InsertParams extends BaseParams { table: string values: ReadonlyArray | Stream.Readable + format?: DataFormat } function validateConfig(config: NormalizedConfig): void { @@ -156,11 +157,12 @@ export class ClickHouseClient { async insert(params: InsertParams): Promise { validateInsertValues(params.values) - const query = `INSERT into ${params.table.trim()} FORMAT JSONCompactEachRow` + const format = params.format || 'JSONCompactEachRow' + const query = `INSERT into ${params.table.trim()} FORMAT ${format}` await this.connection.insert({ query, - values: encodeValues(params.values, 'JSONCompactEachRow'), + values: encodeValues(params.values, format), ...this.getBaseParams(params), }) } @@ -225,7 +227,7 @@ function encodeValues( if (isStream(values)) { return Stream.pipeline( values, - mapStream(function (value: any) { + mapStream((value: unknown) => { return encode(value, format) }), function pipelineCb(err) { diff --git a/src/data_formatter/formatter.ts b/src/data_formatter/formatter.ts index 539f13a9..fdaaff16 100644 --- a/src/data_formatter/formatter.ts +++ b/src/data_formatter/formatter.ts @@ -80,7 +80,7 @@ export function decode(text: string, format: DataFormat): any { * @returns string */ export function encode(value: any, format: DataFormat): string { - if (format === 'JSONCompactEachRow') { + if (format === 'JSONCompactEachRow' || format === 'JSONEachRow') { return JSON.stringify(value) + '\n' } throw new Error(`The client does not support encoding in [${format}] format.`) diff --git a/src/index.ts b/src/index.ts index 96b100ae..f6c2701b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -14,7 +14,7 @@ export { type InsertParams, } from './client' -export type { Rows, Row } from './result' +export { Rows, Row } from './result' export type { Connection } from './connection' export type { DataFormat } from './data_formatter' export type { ClickHouseError } from './error' diff --git a/src/schema/common.ts b/src/schema/common.ts new file mode 100644 index 00000000..43a0724d --- /dev/null +++ b/src/schema/common.ts @@ -0,0 +1,14 @@ +import type { Type } from './types' + +// TODO: TTL +// TODO: Materialized columns +// TODO: alias +export type Shape = { + [key: string]: Type +} + +export type Infer = { + [Field in keyof S]: S[Field]['underlying'] +} + +export type NonEmptyArray = [T, ...T[]] diff --git a/src/schema/compact.ts b/src/schema/compact.ts new file mode 100644 index 00000000..76e70fe8 --- /dev/null +++ b/src/schema/compact.ts @@ -0,0 +1,21 @@ +import type { Infer, Shape } from './common' + +export function compactJson(shape: S, value: Infer) { + const compacted = [] + for (const key in shape) { + if (key in value) { + compacted.push(value[key]) + } + } + return compacted +} + +export function decompactJson(shape: S, [row]: [unknown[]]) { + const obj: Record = {} + let i = 0 + for (const key in shape) { + obj[key] = row[i] + i++ + } + return obj as Infer +} diff --git a/src/schema/engines.ts b/src/schema/engines.ts new file mode 100644 index 00000000..bb2c1640 --- /dev/null +++ b/src/schema/engines.ts @@ -0,0 +1,106 @@ +// See https://clickhouse.com/docs/en/engines/table-engines/ + +// TODO Log family +export type TableEngine = MergeTreeFamily + +type MergeTreeFamily = + | ReturnType + | ReturnType + | ReturnType + | ReturnType + | ReturnType + | ReturnType + | ReturnType + | ReturnType + +// https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree#settings +// TODO: refined types? +// TODO: storage_policy +export interface MergeTreeSettings { + index_granularity?: number + index_granularity_bytes?: number + min_index_granularity_bytes?: number + enable_mixed_granularity_parts?: 0 | 1 + use_minimalistic_part_header_in_zookeeper?: 0 | 1 + min_merge_bytes_to_use_direct_io?: number + merge_with_ttl_timeout?: number + merge_with_recompression_ttl_timeout?: number + try_fetch_recompressed_part_timeout?: number + write_final_mark?: number + merge_max_block_size?: number + min_bytes_for_wide_part?: number + min_rows_for_wide_part?: number + max_parts_in_total?: number + max_compress_block_size?: number + min_compress_block_size?: number + max_partitions_to_read?: number +} +export const MergeTree = () => ({ + toString: () => `MergeTree()`, + type: 'MergeTree', +}) + +// https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication/#replicatedmergetree-parameters +// TODO: figure out the complete usage of "other_parameters" +export interface ReplicatedMergeTreeParameters { + zoo_path: string + replica_name: string + ver?: string +} +export const ReplicatedMergeTree = ({ + zoo_path, + replica_name, + ver, +}: ReplicatedMergeTreeParameters) => ({ + toString: () => { + const _ver = ver ? `, ${ver}` : '' + return `ReplicatedMergeTree('${zoo_path}', '${replica_name}'${_ver})` + }, + type: 'ReplicatedMergeTree', +}) + +export const ReplacingMergeTree = (ver?: string) => ({ + toString: () => { + const _ver = ver ? `, ${ver}` : '' + return `ReplacingMergeTree(${_ver})` + }, + type: 'ReplacingMergeTree', +}) + +export const SummingMergeTree = (columns?: string[]) => ({ + toString: () => { + return `SummingMergeTree(${(columns || []).join(', ')})` + }, + type: 'SummingMergeTree', +}) + +export const AggregatingMergeTree = () => ({ + toString: () => { + return `AggregatingMergeTree()` + }, + type: 'AggregatingMergeTree', +}) + +export const CollapsingMergeTree = (sign: string) => ({ + toString: () => { + return `CollapsingMergeTree(${sign})` + }, + type: 'CollapsingMergeTree', +}) + +export const VersionedCollapsingMergeTree = ( + sign: string, + version: string +) => ({ + toString: () => { + return `VersionedCollapsingMergeTree(${sign}, ${version})` + }, + type: 'VersionedCollapsingMergeTree', +}) + +export const GraphiteMergeTree = (config_section: string) => ({ + toString: () => { + return `CollapsingMergeTree(${config_section})` + }, + type: 'GraphiteMergeTree', +}) diff --git a/src/schema/index.ts b/src/schema/index.ts new file mode 100644 index 00000000..be17b845 --- /dev/null +++ b/src/schema/index.ts @@ -0,0 +1,7 @@ +export * from './schema' +export * from './types' +export * from './table' +export * from './engines' +export * from './common' +export * from './stream' +export * from './where' diff --git a/src/schema/query_formatter.ts b/src/schema/query_formatter.ts new file mode 100644 index 00000000..68961fd2 --- /dev/null +++ b/src/schema/query_formatter.ts @@ -0,0 +1,70 @@ +import type { Shape } from './common' +import type { CreateTableOptions, TableOptions } from './index' +import type { WhereExpr } from './where' +import type { NonEmptyArray } from './common' + +export const QueryFormatter = { + // See https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/#table_engine-mergetree-creating-a-table + createTable: ( + tableOptions: TableOptions, + { + engine: _engine, + if_not_exists, + on_cluster, + order_by, + partition_by, + primary_key, + settings: _settings, + }: CreateTableOptions + ) => { + const ifNotExist = if_not_exists ? ' IF NOT EXISTS' : '' + const tableName = getTableName(tableOptions) + const onCluster = on_cluster ? ` ON CLUSTER '${on_cluster}'` : '' + const columns = ` (${tableOptions.schema.toString()})` + const engine = ` ENGINE ${_engine}` + const orderBy = order_by ? ` ORDER BY (${order_by.join(', ')})` : '' + const partitionBy = partition_by + ? ` PARTITION BY (${partition_by.join(', ')})` + : '' + const primaryKey = primary_key + ? ` PRIMARY KEY (${primary_key.join(', ')})` + : '' + const settings = + _engine.type === 'MergeTree' && + _settings && + Object.keys(_settings).length > 0 + ? ` SETTINGS ${Object.entries(_settings) + .map(([k, v]) => `${k} = ${v}`) + .join(', ')}` + : '' + return ( + `CREATE TABLE${ifNotExist} ${tableName}${onCluster}${columns}${engine}` + + `${orderBy}${partitionBy}${primaryKey}${settings}` + ) + }, + + // https://clickhouse.com/docs/en/sql-reference/statements/select/ + select: ( + tableOptions: TableOptions, + whereExpr?: WhereExpr, + columns?: NonEmptyArray, + orderBy?: NonEmptyArray<[keyof S, 'ASC' | 'DESC']> + ) => { + const tableName = getTableName(tableOptions) + const where = whereExpr ? ` WHERE ${whereExpr.toString()}` : '' + const cols = columns ? columns.join(', ') : '*' + const order = orderBy + ? ` ORDER BY ${orderBy + .map(([column, order]) => `${column.toString()} ${order}`) + .join(', ')}` + : '' + return `SELECT ${cols} FROM ${tableName}${where}${order}` + }, +} + +export function getTableName({ + database, + name, +}: TableOptions) { + return database !== undefined ? `${database}.${name}` : name +} diff --git a/src/schema/result.ts b/src/schema/result.ts new file mode 100644 index 00000000..d9344a93 --- /dev/null +++ b/src/schema/result.ts @@ -0,0 +1,6 @@ +export interface SelectResult { + data: T[] + statistics: { bytes_read: number; elapsed: number; rows_read: number } + rows: number + meta: { name: string; type: string }[] +} diff --git a/src/schema/schema.ts b/src/schema/schema.ts new file mode 100644 index 00000000..da3d44ce --- /dev/null +++ b/src/schema/schema.ts @@ -0,0 +1,11 @@ +import type { Shape } from './common' + +export class Schema { + constructor(public readonly shape: S) {} + + toString(delimiter?: string): string { + return Object.entries(this.shape) + .map(([column, type]) => `${column} ${type.toString()}`) + .join(delimiter ?? ', ') + } +} diff --git a/src/schema/stream.ts b/src/schema/stream.ts new file mode 100644 index 00000000..46e54ee2 --- /dev/null +++ b/src/schema/stream.ts @@ -0,0 +1,23 @@ +import Stream from 'stream' + +export interface SelectResult { + asyncGenerator(): AsyncGenerator + json(): Promise +} + +export class InsertStream extends Stream.Readable { + constructor() { + super({ + objectMode: true, + read() { + // Avoid [ERR_METHOD_NOT_IMPLEMENTED]: The _read() method is not implemented + }, + }) + } + add(data: T) { + this.push(data) + } + complete(): void { + this.push(null) + } +} diff --git a/src/schema/table.ts b/src/schema/table.ts new file mode 100644 index 00000000..ddd6567a --- /dev/null +++ b/src/schema/table.ts @@ -0,0 +1,113 @@ +import type { MergeTreeSettings, TableEngine } from './engines' +import type { Schema } from './schema' +import type { Infer, Shape } from './common' +import { getTableName, QueryFormatter } from './query_formatter' +import type { ClickHouseClient } from '../client' +import type { Row } from '../result' +import type { WhereExpr } from './where' +import type { InsertStream, SelectResult } from './stream' +import type { ClickHouseSettings } from '../settings' +import type { NonEmptyArray } from './common' + +// TODO: non-empty schema constraint +// TODO support more formats (especially JSONCompactEachRow) +export interface TableOptions { + name: string + schema: Schema + database?: string +} + +export interface CreateTableOptions { + engine: TableEngine + order_by: NonEmptyArray // TODO: functions support + if_not_exists?: boolean + on_cluster?: string + partition_by?: NonEmptyArray // TODO: functions support + primary_key?: NonEmptyArray // TODO: functions support + settings?: MergeTreeSettings // TODO: more settings and type constraints + clickhouse_settings?: ClickHouseSettings + // TODO: settings now moved to engines; decide whether we need it here + // TODO: index + // TODO: projections + // TODO: TTL +} + +export interface SelectOptions { + columns?: NonEmptyArray + where?: WhereExpr + order_by?: NonEmptyArray<[keyof S, 'ASC' | 'DESC']> + clickhouse_settings?: ClickHouseSettings + abort_signal?: AbortSignal +} + +export interface InsertOptions { + values: Infer[] | InsertStream> + clickhouse_settings?: ClickHouseSettings + abort_signal?: AbortSignal +} + +export class Table { + constructor( + private readonly client: ClickHouseClient, + private readonly options: TableOptions + ) {} + + // TODO: better types + async create(options: CreateTableOptions): Promise { + const query = QueryFormatter.createTable(this.options, options) + return (await this.client.command({ query })).text() + } + + insert({ + abort_signal, + clickhouse_settings, + values, + }: InsertOptions): Promise { + return this.client.insert({ + clickhouse_settings, + abort_signal, + table: getTableName(this.options), + format: 'JSONEachRow', + values, + }) + } + + async select({ + abort_signal, + clickhouse_settings, + columns, + order_by, + where, + }: SelectOptions = {}): Promise>> { + const query = QueryFormatter.select(this.options, where, columns, order_by) + const rows = await this.client.command({ + query, + clickhouse_settings, + abort_signal, + format: 'JSONEachRow', + }) + + const stream = rows.asStream() + async function* asyncGenerator() { + for await (const row of stream) { + const value = (row as Row).json() as unknown[] + yield value[0] as Infer // FIXME why we have an array here? + } + } + + return { + asyncGenerator, + json: async () => { + const result = [] + for await (const value of asyncGenerator()) { + if (Array.isArray(value)) { + result.push(...value) + } else { + result.push(value) + } + } + return result + }, + } + } +} diff --git a/src/schema/types.ts b/src/schema/types.ts new file mode 100644 index 00000000..842c440b --- /dev/null +++ b/src/schema/types.ts @@ -0,0 +1,494 @@ +/* eslint-disable @typescript-eslint/ban-types */ + +/* +TODO: + JSON (experimental) + AggregateFunction + SimpleAggregateFunction + Nested + Special Data Types + Geo (experimental) + Multi-word Types + Better Date(Time) parsing/handling, including timezones + Tuple + - Named tuple + Decimal (without precision loss) + - see https://github.com/ClickHouse/ClickHouse/issues/21875 + - currently disabled due to precision loss when using JS numbers in runtime +*/ + +type Int = UInt8 | UInt16 | UInt32 | UInt64 | UInt128 | UInt256 +type UInt = Int8 | Int16 | Int32 | Int64 | Int128 | Int256 +type Float = Float32 | Float64 +export type Type = + | Int + | UInt + | Float + | Bool + | String + | FixedString + | Array + | Nullable + | Map + // | Decimal + | UUID + | Enum + | LowCardinality + | Date + | Date32 + | DateTime + | DateTime64 + | IPv4 + | IPv6 + +export interface UInt8 { + underlying: number + type: 'UInt8' +} +export const UInt8 = { + type: 'UInt8', + toString(): string { + return 'UInt8' + }, +} as UInt8 +export interface UInt16 { + type: 'UInt16' + underlying: number +} +export const UInt16 = { + type: 'UInt16', + toString(): string { + return 'UInt16' + }, +} as UInt16 +export interface UInt32 { + type: 'UInt32' + underlying: number +} +export const UInt32 = { + type: 'UInt32', + toString(): string { + return 'UInt32' + }, +} as UInt32 +export interface UInt64 { + underlying: string + type: 'UInt64' +} +/** + * Uses string as the inferred type, since its max value + * is greater than Number.MAX_SAFE_INTEGER + * + * Max UInt64: 18446744073709551615 + * Number.MAX_SAFE_INTEGER: 9007199254740991 + * + * It can be cast to number + * by disabling `output_format_json_quote_64bit_integers` CH setting + */ +export const UInt64 = { + type: 'UInt64', + toString(): string { + return 'UInt64' + }, +} as UInt64 +export interface UInt128 { + type: 'UInt128' + underlying: string +} +/** + * Uses string as the inferred type, since its max value + * is greater than Number.MAX_SAFE_INTEGER + */ +export const UInt128 = { + type: 'UInt128', + toString(): string { + return 'UInt128' + }, +} as UInt128 +export interface UInt256 { + type: 'UInt256' + underlying: string +} +/** + * Uses string as the inferred type, since its max value + * is greater than Number.MAX_SAFE_INTEGER + */ +export const UInt256 = { + type: 'UInt256', + toString(): string { + return 'UInt256' + }, +} as UInt256 + +export interface Int8 { + underlying: number + type: 'Int8' +} +export const Int8 = { + type: 'Int8', + toString(): string { + return 'Int8' + }, +} as Int8 +export interface Int16 { + type: 'Int16' + underlying: number +} +export const Int16 = { + type: 'Int16', + toString(): string { + return 'Int16' + }, +} as Int16 +export interface Int32 { + type: 'Int32' + underlying: number +} +export const Int32 = { + type: 'Int32', + toString(): string { + return 'Int32' + }, +} as Int32 + +export interface Int64 { + underlying: string + type: 'Int64' +} +/** + * Uses string as the inferred type, since its max value + * is greater than Number.MAX_SAFE_INTEGER + * + * Max Int64: 9223372036854775807 + * Number.MAX_SAFE_INTEGER: 9007199254740991 + * + * It could be cast to number + * by disabling `output_format_json_quote_64bit_integers` CH setting + */ +export const Int64 = { + type: 'Int64', + toString(): string { + return 'Int64' + }, +} as Int64 +export interface Int128 { + type: 'Int128' + underlying: string +} +/** + * Uses string as the inferred type, since its max value + * is greater than Number.MAX_SAFE_INTEGER + */ +export const Int128 = { + type: 'Int128', + toString(): string { + return 'Int128' + }, +} as Int128 +export interface Int256 { + type: 'Int256' + underlying: string +} +/** + * Uses string as the inferred type, since its max value + * is greater than Number.MAX_SAFE_INTEGER + */ +export const Int256 = { + type: 'Int256', + toString(): string { + return 'Int256' + }, +} as Int256 + +export interface Float32 { + type: 'Float32' + underlying: number +} +export const Float32 = { + type: 'Float32', + toString(): string { + return 'Float32' + }, +} as Float32 +export interface Float64 { + type: 'Float64' + underlying: number +} +export const Float64 = { + type: 'Float64', + toString(): string { + return 'Float64' + }, +} as Float64 + +export interface Decimal { + type: 'Decimal' + underlying: number +} +export const Decimal = ({ + precision, + scale, +}: { + precision: number + scale: number +}) => + ({ + type: 'Decimal', + toString(): string { + if (scale < 0) { + throw new Error( + `Invalid Decimal scale. Valid range: [ 0 : P ], got ${scale}` + ) + } + if (precision > 0 && precision < 10) { + return `Decimal32(${scale})` + } + if (precision > 10 && precision < 19) { + return `Decimal64(${scale})` + } + if (precision > 19 && precision < 39) { + return `Decimal128(${scale})` + } + if (precision > 19 && precision < 39) { + return `Decimal128(${scale})` + } + if (precision > 39 && precision < 77) { + return `Decimal256(${scale})` + } + throw Error( + `Unsupported Decimal precision. Valid range: [ 1 : 18 ], got ${precision}` + ) + }, + } as Decimal) + +export interface Bool { + type: 'Bool' + underlying: boolean +} +export const Bool = { + type: 'Bool', + toString(): string { + return 'Bool' + }, +} as Bool + +export interface String { + type: 'String' + underlying: string +} +export const String = { + type: 'String', + toString(): string { + return 'String' + }, +} as String + +export interface FixedString { + type: 'FixedString' + underlying: string +} +export const FixedString = (bytes: number) => + ({ + type: 'FixedString', + toString(): string { + return `FixedString(${bytes})` + }, + } as FixedString) + +export interface UUID { + type: 'UUID' + underlying: string +} +export const UUID = { + type: 'UUID', + toString(): string { + return 'UUID' + }, +} as UUID + +type StandardEnum = { + [id: string]: T | string + [n: number]: string +} + +export interface Enum> { + type: 'Enum' + underlying: keyof T +} +// https://github.com/microsoft/TypeScript/issues/30611#issuecomment-479087883 +// Currently limited to only string enums +export function Enum>(enumVariable: T) { + return { + type: 'Enum', + toString(): string { + return `Enum(${Object.keys(enumVariable) + .map((k) => `'${k}'`) + .join(', ')})` + }, + } as Enum +} + +type LowCardinalityDataType = + | String + | FixedString + | UInt + | Int + | Float + | Date + | DateTime +export interface LowCardinality { + type: 'LowCardinality' + underlying: T['underlying'] +} +export const LowCardinality = (type: T) => + ({ + type: 'LowCardinality', + toString(): string { + return `LowCardinality(${type})` + }, + } as LowCardinality) + +export interface Array { + type: 'Array' + underlying: globalThis.Array +} +export const Array = (inner: T) => + ({ + type: 'Array', + toString(): string { + return `Array(${inner.toString()})` + }, + } as Array) + +type NullableType = + | Int + | UInt + | Float + | Bool + | String + | FixedString + | UUID + | Decimal + | Enum + | Date + | DateTime + | Date32 + | IPv4 + | IPv6 +export interface Nullable { + type: 'Nullable' + underlying: T['underlying'] | null +} +export const Nullable = (inner: T) => + ({ + type: 'Nullable', + toString(): string { + return `Nullable(${inner.toString()})` + }, + } as Nullable) + +type MapKey = + | String + | Int + | UInt + | FixedString + | UUID + | Enum + | Date + | DateTime + | Date32 +export interface Map { + type: 'Map' + underlying: Record +} +export const Map = (k: K, v: V) => + ({ + type: 'Map', + toString(): string { + return `Map(${k.toString()}, ${v.toString()})` + }, + } as Map) + +export interface Date { + type: 'Date' + underlying: string // '1970-01-01' to '2149-06-06' +} +export const Date = { + type: 'Date', + toString(): string { + return 'Date' + }, +} as Date + +export interface Date32 { + type: 'Date32' + underlying: string // '1900-01-01' to '2299-12-31' +} +export const Date32 = { + type: 'Date32', + toString(): string { + return 'Date32' + }, +} as Date32 + +export interface DateTime { + type: 'DateTime' + underlying: string // '1970-01-01 00:00:00' to '2106-02-07 06:28:15' +} +export const DateTime = (timezone?: string) => + ({ + type: 'DateTime', + toString(): string { + const tz = timezone ? ` (${timezone})` : '' + return `DateTime${tz}` + }, + } as DateTime) + +export interface DateTime64 { + type: 'DateTime64' + underlying: string // '1900-01-01 00:00:00' to '2299-12-31 23:59:59.99999999' +} +export const DateTime64 = (precision: number, timezone?: string) => + ({ + type: 'DateTime64', + toString(): string { + const tz = timezone ? `, ${timezone}` : '' + return `DateTime64(${precision}${tz})` + }, + } as DateTime64) + +export interface IPv4 { + type: 'IPv4' + underlying: string // 255.255.255.255 +} +export const IPv4 = { + type: 'IPv4', + toString(): string { + return 'IPv4' + }, +} as IPv4 + +export interface IPv6 { + type: 'IPv6' + underlying: string // 2001:db8:85a3::8a2e:370:7334 +} +export const IPv6 = { + type: 'IPv6', + toString(): string { + return 'IPv6' + }, +} as IPv6 + +// TODO: Tuple is disabled for now. Figure out type derivation in this case + +// export interface Tuple = { +// type: 'Tuple' +// // underlying: globalThis.Array +// } +// export const Tuple = (...inner: T[]) => +// ({ +// type: 'Tuple', +// toString(): string { +// return `Tuple(${inner.join(', ')})` +// }, +// } as Tuple) diff --git a/src/schema/where.ts b/src/schema/where.ts new file mode 100644 index 00000000..f0345885 --- /dev/null +++ b/src/schema/where.ts @@ -0,0 +1,52 @@ +import type { NonEmptyArray, Shape } from './common' + +// eslint-disable-next-line @typescript-eslint/no-unused-vars +export interface WhereExpr { + toString(): string + type: 'And' | 'Or' | 'Eq' | 'Le' | 'Lte' | 'Gt' | 'Gte' +} + +export function Eq( + field: F, + value: S[F]['underlying'] +): WhereExpr { + return { + toString(): string { + return `(${String(field)} == ${formatValue(value)})` + }, + type: 'Eq', + } +} +export function And( + ...expr: NonEmptyArray> +): WhereExpr { + return { + toString(): string { + return `(${expr.join(' AND ')})` + }, + type: 'And', + } +} +export function Or( + ...expr: NonEmptyArray> +): WhereExpr { + return { + toString(): string { + return `(${expr.join(' OR ')})` + }, + type: 'Or', + } +} + +function formatValue(value: any): string { + if (value === null || value === undefined) { + return 'NULL' + } + if (typeof value === 'string') { + return `'${value}'` + } + if (globalThis.Array.isArray(value)) { + return `[${value.join(', ')}]` + } + return value.toString() +} diff --git a/tsconfig.eslint.json b/tsconfig.eslint.json index e237199e..a3443695 100644 --- a/tsconfig.eslint.json +++ b/tsconfig.eslint.json @@ -1,6 +1,6 @@ { "extends": "./tsconfig.json", - "include": ["./src/**/*.ts", "__tests__/**/*.ts"], + "include": ["./src/**/*.ts", "__tests__/**/*.ts", "examples/**/*.ts"], "compilerOptions": { "noUnusedLocals": false, "noUnusedParameters": false