Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the D1 binding to use D1DatabaseSession for all actions #2624

Merged
merged 7 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cloudflare/internal/compatibility-flags.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ export const specCompliantResponseRedirect: boolean;
export const workerdExperimental: boolean;
export const durableObjectGetExisting: boolean;
export const vectorizeQueryMetadataOptional: boolean;
export const enableD1WithSessionsAPI: boolean;
287 changes: 232 additions & 55 deletions src/cloudflare/internal/d1-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// Licensed under the Apache 2.0 license found in the LICENSE file or at:
// https://opensource.org/licenses/Apache-2.0

import flags from 'workerd:compatibility-flags';

interface Fetcher {
fetch: typeof fetch;
}
Expand Down Expand Up @@ -52,38 +54,113 @@ type SQLError = {

type ResultsFormat = 'ARRAY_OF_OBJECTS' | 'ROWS_AND_COLUMNS' | 'NONE';

type D1SessionCommitTokenOrConstraint = string;
type D1SessionCommitToken = string;
// Indicates that the first query should go to the primary, and the rest queries
// using the same D1DatabaseSession will go to any replica that is consistent with
// the commit token maintained by the session (returned by the first query).
const D1_SESSION_CONSTRAINT_FIRST_PRIMARY = 'first-primary';
// Indicates that the first query can go anywhere (primary or replica), and the rest queries
// using the same D1DatabaseSession will go to any replica that is consistent with
// the commit token maintained by the session (returned by the first query).
const D1_SESSION_CONSTRAINT_FIRST_UNCONSTRAINED = 'first-unconstrained';

// Parsed by the D1 eyeball worker.
const D1_SESSION_COMMIT_TOKEN_HTTP_HEADER = 'x-cf-d1-session-commit-token';

class D1Database {
private readonly fetcher: Fetcher;
private readonly alwaysPrimarySession: D1DatabaseSessionAlwaysPrimary;
protected readonly fetcher: Fetcher;

public constructor(fetcher: Fetcher) {
this.fetcher = fetcher;
this.alwaysPrimarySession = new D1DatabaseSessionAlwaysPrimary(
this.fetcher
);
}

public prepare(query: string): D1PreparedStatement {
return new D1PreparedStatement(this, query);
return new D1PreparedStatement(this.alwaysPrimarySession, query);
}

public async batch<T = unknown>(
statements: D1PreparedStatement[]
): Promise<D1Result<T>[]> {
return this.alwaysPrimarySession.batch(statements);
}

public async exec(query: string): Promise<D1ExecResult> {
return this.alwaysPrimarySession.exec(query);
}

// DEPRECATED, TO BE REMOVED WITH NEXT BREAKING CHANGE
public async dump(): Promise<ArrayBuffer> {
const response = await this.fetcher.fetch('http://d1/dump', {
method: 'POST',
headers: {
'content-type': 'application/json',
},
});
if (response.status !== 200) {
try {
const err = (await response.json()) as SQLError;
throw new Error(`D1_DUMP_ERROR: ${err.error}`, {
cause: new Error(err.error),
});
} catch {
throw new Error(`D1_DUMP_ERROR: Status + ${response.status}`, {
cause: new Error(`Status ${response.status}`),
});
}
return this.alwaysPrimarySession.dump();
}
}

class D1DatabaseWithSessionAPI extends D1Database {
public constructor(fetcher: Fetcher) {
super(fetcher);
}

public withSession(
constraintOrToken: D1SessionCommitTokenOrConstraint | null | undefined
): D1DatabaseSession {
constraintOrToken = constraintOrToken?.trim();
if (
constraintOrToken === null ||
constraintOrToken === undefined ||
constraintOrToken === ''
) {
constraintOrToken = D1_SESSION_CONSTRAINT_FIRST_UNCONSTRAINED;
}
return await response.arrayBuffer();
return new D1DatabaseSession(this.fetcher, constraintOrToken);
}
}

class D1DatabaseSession {
protected fetcher: Fetcher;
protected commitTokenOrConstraint: D1SessionCommitTokenOrConstraint;

public constructor(
fetcher: Fetcher,
commitTokenOrConstraint: D1SessionCommitTokenOrConstraint
) {
this.fetcher = fetcher;
this.commitTokenOrConstraint = commitTokenOrConstraint;

if (!this.commitTokenOrConstraint) {
throw new Error('D1_SESSION_ERROR: invalid commit token or constraint');
}
}

// Update the commit token IFF the given newCommitToken is more recent.
// The commit token held in the session should always be the latest value we
// have observed in the responses to our API. There can be cases where we have concurrent
// queries running within the same session, and therefore here we ensure we only
// retain the latest commit token received.
// @returns the final commit token after the update.
protected _updateCommitToken(
newCommitToken: D1SessionCommitToken
): D1SessionCommitToken | null {
newCommitToken = newCommitToken.trim();
if (!newCommitToken) {
// We should not be receiving invalid commit tokens, but just be defensive.
return this.getCommitToken();
}
const currentCommitToken = this.getCommitToken();
if (
currentCommitToken === null ||
currentCommitToken.localeCompare(newCommitToken) < 0
) {
this.commitTokenOrConstraint = newCommitToken;
}
return this.getCommitToken();
}

public prepare(sql: string): D1PreparedStatement {
return new D1PreparedStatement(this, sql);
}

public async batch<T = unknown>(
Expand All @@ -98,34 +175,52 @@ class D1Database {
return exec.map(toArrayOfObjects);
}

public async exec(query: string): Promise<D1ExecResult> {
const lines = query.trim().split('\n');
const _exec = await this._send('/execute', lines, [], 'NONE');
const exec = Array.isArray(_exec) ? _exec : [_exec];
const error = exec
.map((r) => {
return r.error ? 1 : 0;
})
.indexOf(1);
if (error !== -1) {
throw new Error(
`D1_EXEC_ERROR: Error in line ${error + 1}: ${lines[error]}: ${
exec[error]?.error
}`,
{
cause: new Error(
`Error in line ${error + 1}: ${lines[error]}: ${exec[error]?.error}`
),
}
);
// Returns the latest commit token we received from all responses processed so far.
// It does not return constraints that might have be passed during the session creation.
public getCommitToken(): D1SessionCommitToken | null {
switch (this.commitTokenOrConstraint) {
// First to any replica, and then anywhere that satisfies the commit token.
case D1_SESSION_CONSTRAINT_FIRST_UNCONSTRAINED:
// First to primary, and then anywhere that satisfies the commit token.
case D1_SESSION_CONSTRAINT_FIRST_PRIMARY:
return null;
default:
return this.commitTokenOrConstraint;
}
}

// fetch will append the commit token header to all outgoing fetch calls.
// The response headers are parsed automatically, extracting the commit token
// from the response headers and updating it through `_updateCommitToken(token)`.
protected async _wrappedFetch(
input: RequestInfo | URL,
init?: RequestInit
): Promise<Response> {
// We append the commit token to all fetch queries.
const h = new Headers(init?.headers);

// We send either a constraint, or a commit token, and the eyeball worker will figure out
// what to do based on the value. This simulates the same flow as the REST API would behave too.
if (this.commitTokenOrConstraint) {
h.set(D1_SESSION_COMMIT_TOKEN_HTTP_HEADER, this.commitTokenOrConstraint);
}

if (!init) {
init = { headers: h };
} else {
return {
count: exec.length,
duration: exec.reduce((p, c) => {
return p + (c.meta['duration'] as number);
}, 0),
};
init.headers = h;
}
return this.fetcher.fetch(input, init).then((resp) => {
const newCommitToken = resp.headers.get(
D1_SESSION_COMMIT_TOKEN_HTTP_HEADER
);
// TODO(soon): Add validation of the received commit token, in case we sent a commit token,
// otherwise sessions functionality could be inconsistent.
if (newCommitToken) {
this._updateCommitToken(newCommitToken);
}
return resp;
});
}

public async _sendOrThrow<T = unknown>(
Expand Down Expand Up @@ -165,7 +260,7 @@ class D1Database {

const url = new URL(endpoint, 'http://d1');
url.searchParams.set('resultsFormat', resultsFormat);
const response = await this.fetcher.fetch(url.href, {
const response = await this._wrappedFetch(url.href, {
method: 'POST',
headers: {
'content-type': 'application/json',
Expand Down Expand Up @@ -196,17 +291,96 @@ class D1Database {
}
}

class D1DatabaseSessionAlwaysPrimary extends D1DatabaseSession {
public constructor(fetcher: Fetcher) {
// Will always go to primary, since we won't be ever updating this constraint.
super(fetcher, D1_SESSION_CONSTRAINT_FIRST_PRIMARY);
}

// We ignore commit tokens for this special type of session,
// since all queries are sent to the primary.
public override _updateCommitToken(
_newCommitToken: D1SessionCommitToken
): D1SessionCommitToken | null {
return null;
}

// There is not commit token returned every by this special type of session,
// since all queries are sent to the primary.
public override getCommitToken(): D1SessionCommitToken | null {
return null;
}

//////////////////////////////////////////////////////////////////////////////////////////////
// These are only used by the D1Database which is our existing API pre-Sessions API.
// For backwards compatibility they always go to the primary database.
//

public async exec(query: string): Promise<D1ExecResult> {
const lines = query.trim().split('\n');
const _exec = await this._send('/execute', lines, [], 'NONE');
const exec = Array.isArray(_exec) ? _exec : [_exec];
const error = exec
.map((r) => {
return r.error ? 1 : 0;
})
.indexOf(1);
if (error !== -1) {
throw new Error(
`D1_EXEC_ERROR: Error in line ${error + 1}: ${lines[error]}: ${
exec[error]?.error
}`,
{
cause: new Error(
`Error in line ${error + 1}: ${lines[error]}: ${exec[error]?.error}`
),
}
);
} else {
return {
count: exec.length,
duration: exec.reduce((p, c) => {
return p + (c.meta['duration'] as number);
}, 0),
};
}
}

// DEPRECATED, TO BE REMOVED WITH NEXT BREAKING CHANGE
public async dump(): Promise<ArrayBuffer> {
const response = await this._wrappedFetch('http://d1/dump', {
method: 'POST',
headers: {
'content-type': 'application/json',
},
});
if (response.status !== 200) {
try {
const err = (await response.json()) as SQLError;
throw new Error(`D1_DUMP_ERROR: ${err.error}`, {
cause: new Error(err.error),
});
} catch {
throw new Error(`D1_DUMP_ERROR: Status + ${response.status}`, {
cause: new Error(`Status ${response.status}`),
});
}
}
return await response.arrayBuffer();
}
}

class D1PreparedStatement {
private readonly database: D1Database;
private readonly dbSession: D1DatabaseSession;
public readonly statement: string;
public readonly params: unknown[];

public constructor(
database: D1Database,
dbSession: D1DatabaseSession,
statement: string,
values?: unknown[]
) {
this.database = database;
this.dbSession = dbSession;
this.statement = statement;
this.params = values || [];
}
Expand Down Expand Up @@ -249,7 +423,7 @@ class D1PreparedStatement {
);
});
return new D1PreparedStatement(
this.database,
this.dbSession,
this.statement,
transformedValues
);
Expand All @@ -261,7 +435,7 @@ class D1PreparedStatement {
colName?: string
): Promise<Record<string, T> | T | null> {
const info = firstIfArray(
await this.database._sendOrThrow<Record<string, T>>(
await this.dbSession._sendOrThrow<Record<string, T>>(
'/query',
this.statement,
this.params,
Expand All @@ -288,7 +462,7 @@ class D1PreparedStatement {

public async run<T = Record<string, unknown>>(): Promise<D1Response> {
return firstIfArray(
await this.database._sendOrThrow<T>(
await this.dbSession._sendOrThrow<T>(
'/execute',
this.statement,
this.params,
Expand All @@ -300,7 +474,7 @@ class D1PreparedStatement {
public async all<T = Record<string, unknown>>(): Promise<D1Result<T[]>> {
return toArrayOfObjects(
firstIfArray(
await this.database._sendOrThrow<T[]>(
await this.dbSession._sendOrThrow<T[]>(
'/query',
this.statement,
this.params,
Expand All @@ -312,7 +486,7 @@ class D1PreparedStatement {

public async raw<T = unknown[]>(options?: D1RawOptions): Promise<T[]> {
const s = firstIfArray(
await this.database._sendOrThrow<Record<string, unknown>>(
await this.dbSession._sendOrThrow<Record<string, unknown>>(
'/query',
this.statement,
this.params,
Expand Down Expand Up @@ -400,5 +574,8 @@ async function toJson<T = unknown>(response: Response): Promise<T> {
}

export default function makeBinding(env: { fetcher: Fetcher }): D1Database {
if (flags.enableD1WithSessionsAPI) {
return new D1DatabaseWithSessionAPI(env.fetcher);
}
return new D1Database(env.fetcher);
}
Loading
Loading