Skip to content

Commit

Permalink
Changes to support big data actions (#112)
Browse files Browse the repository at this point in the history
* Added `exec_generator` to yield the row instead of collecting it in memory

* - added execute_gennerator into client api

* - added client.execute_generator generator which consumes connection.execute_generator without client.useConnection

* - merged connection.execute_generator into connection.execute behind an `iterator` flag
- removed client.execute_generator

* - removed note from readme.md

* - added documentation for `execute()` method.

* Add test

* Don't allow using `iterator` with Client.execute()

* Fix fmt

Co-authored-by: Petru Szemereczki <[email protected]>
Co-authored-by: lideming <[email protected]>
  • Loading branch information
3 people authored May 3, 2021
1 parent e101b49 commit 6dcab80
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 14 deletions.
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,37 @@ const queryWithParams = await client.query(
console.log(users, queryWithParams);
```

### execute

There are two ways to execute an SQL statement.

First and default one will return you an `rows` key containing an array of rows:

```ts
const { rows: users } = await client.execute(`select * from users`);
console.log(users);
```

The second one will return you an `iterator` key containing an
`[Symbol.asyncIterator]` property:

```ts
await client.useConnection(async (conn) => {
// note the third parameter of execute() method.
const { iterator: users } = await conn.execute(
`select * from users`,
/* params: */ [],
/* iterator: */ false,
);
for await (const user of users) {
console.log(user);
}
});
```

The second method is recommended only for SELECT queries that might contain many
results (e.g. 100k rows).

### transaction

```ts
Expand Down
4 changes: 2 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ export class Client {
}

/**
* excute query sql
* execute query sql
* @param sql query sql string
* @param params query params
*/
Expand All @@ -88,7 +88,7 @@ export class Client {
}

/**
* excute sql
* execute sql
* @param sql sql string
* @param params query params
*/
Expand Down
61 changes: 49 additions & 12 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { byteFormat, delay } from "../deps.ts";
import { ClientConfig } from "./client.ts";
import {
ConnnectionError,
Expand Down Expand Up @@ -31,13 +30,14 @@ export enum ConnectionState {
}

/**
* Result for excute sql
* Result for execute sql
*/
export type ExecuteResult = {
affectedRows?: number;
lastInsertId?: number;
fields?: FieldInfo[];
rows?: any[];
iterator?: any;
};

/** Connection for mysql */
Expand Down Expand Up @@ -245,11 +245,16 @@ export class Connection {
}

/**
* excute sql
* execute sql
* @param sql sql string
* @param params query params
* @param iterator whether to return an ExecuteIteratorResult or ExecuteResult
*/
async execute(sql: string, params?: any[]): Promise<ExecuteResult> {
async execute(
sql: string,
params?: any[],
iterator = false,
): Promise<ExecuteResult> {
if (this.state != ConnectionState.CONNECTED) {
if (this.state == ConnectionState.CLOSED) {
throw new ConnnectionError("Connection is closed");
Expand Down Expand Up @@ -289,19 +294,51 @@ export class Connection {
}
}

while (true) {
receive = await this.nextPacket();
if (receive.type === PacketType.EOF_Packet) {
break;
} else {
const row = parseRow(receive.body, fields);
rows.push(row);
if (!iterator) {
while (true) {
receive = await this.nextPacket();
if (receive.type === PacketType.EOF_Packet) {
break;
} else {
const row = parseRow(receive.body, fields);
rows.push(row);
}
}
return { rows, fields };
}
return { rows, fields };

return {
fields,
iterator: this.buildIterator(fields),
};
} catch (error) {
this.close();
throw error;
}
}

private buildIterator(fields: FieldInfo[]): any {
const next = async () => {
const receive = await this.nextPacket();

if (receive.type === PacketType.EOF_Packet) {
return { done: true };
}

const value = parseRow(receive.body, fields);

return {
done: false,
value,
};
};

return {
[Symbol.asyncIterator]: () => {
return {
next,
};
},
};
}
}
19 changes: 19 additions & 0 deletions test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,25 @@ testWithClient(async function testLargeQueryAndResponse(client) {
);
});

testWithClient(async function testExecuteIterator(client) {
await client.useConnection(async (conn) => {
await conn.execute(`DROP TABLE IF EXISTS numbers`);
await conn.execute(`CREATE TABLE numbers (num INT NOT NULL)`);
await conn.execute(
`INSERT INTO numbers (num) VALUES ${
new Array(64).fill(0).map((v, idx) => `(${idx})`).join(",")
}`,
);
const r = await conn.execute(`SELECT num FROM numbers`, [], true);
let count = 0;
for await (const row of r.iterator) {
assertEquals(row.num, count);
count++;
}
assertEquals(count, 64);
});
});

registerTests();

Deno.test("configLogger()", async () => {
Expand Down

0 comments on commit 6dcab80

Please sign in to comment.