Skip to content

Commit

Permalink
refactor(api/internal/conduit.js): improve conduit lifecycles and rec…
Browse files Browse the repository at this point in the history
…onnect logic
  • Loading branch information
jwerle committed Aug 17, 2024
1 parent b702a5f commit 54ea7e3
Showing 1 changed file with 57 additions and 63 deletions.
120 changes: 57 additions & 63 deletions api/internal/conduit.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,40 @@ export const DEFAULT_MAX_RECONNECT_TIMEOUT = 256

/**
* A pool of known `Conduit` instances.
* @type {Set<WeakRef<Conduit>>}
* @type {Set<Conduit>}
*/
export const pool = new Set()

// reconnect when application resumes
hooks.onApplicationResume(() => {
isApplicationPaused = false
const refs = Array.from(pool)
for (const ref of refs) {
for (const conduit of pool) {
// @ts-ignore
const conduit = /** @type {WeakRef<Conduit>} */ (ref).deref()
if (conduit?.shouldReconnect) {
// @ts-ignore
conduit.reconnect()
} else {
pool.delete(ref)
pool.delete(conduit)
}
}
})

hooks.onApplicationPause(() => {
isApplicationPaused = true
const refs = Array.from(pool)
for (const ref of refs) {
// @ts-ignore
const conduit = /** @type {WeakRef<Conduit>} */ (ref).deref()
for (const conduit of pool) {
if (conduit) {
// @ts-ignore
conduit.isConnecting = false
// @ts-ignore
conduit.isActive = false
if (conduit.socket) {
conduit.socket.close()
// @ts-ignore
if (conduit.socket && conduit.socket.readyState == WebSocket.OPEN) {
// @ts-ignore
conduit.socket?.close()
}

// @ts-ignore
conduit.socket = null
}
}
})
Expand Down Expand Up @@ -157,12 +160,6 @@ export class Conduit extends EventTarget {
*/
id = null

/**
* @private
* @type {number}
*/
#loop = 0

/**
* @private
* @type {function(MessageEvent)}
Expand All @@ -186,11 +183,6 @@ export class Conduit extends EventTarget {
*/
#onopen = null

/**
* @type {WeakRef<Conduit>}
*/
#ref = null

/**
* Creates an instance of Conduit.
*
Expand All @@ -206,23 +198,8 @@ export class Conduit extends EventTarget {
this.port = this.constructor.port
this.connect()

const reconnectState = {
// TODO(@jwerle): eventually consume from 'options' when it exists
retries: DEFALUT_MAX_RECONNECT_RETRIES
}

this.#loop = setInterval(async () => {
if (!this.isActive && !this.isConnecting && this.shouldReconnect) {
await this.reconnect({
retries: --reconnectState.retries
})
} else {
reconnectState.retries = DEFALUT_MAX_RECONNECT_RETRIES
}
}, 256)

this.#ref = new WeakRef(this)
pool.add(this.#ref)
pool.add(this)
gc.ref(this)
}

/**
Expand Down Expand Up @@ -351,6 +328,7 @@ export class Conduit extends EventTarget {
this.socket = new WebSocket(this.url)
this.socket.binaryType = 'arraybuffer'
this.socket.onerror = (e) => {
this.socket = null
this.isActive = false
this.isConnecting = false
this.dispatchEvent(new ErrorEvent('error', e))
Expand All @@ -366,10 +344,11 @@ export class Conduit extends EventTarget {
}

this.socket.onclose = (e) => {
this.isActive = false
this.socket = null
this.isConnecting = false
this.isActive = false
this.dispatchEvent(new CloseEvent('close', e))
if (this.shouldReconnect) {
if (this.shouldReconnect && !isApplicationPaused) {
this.reconnect()
}
}
Expand Down Expand Up @@ -410,16 +389,22 @@ export class Conduit extends EventTarget {
const retries = options?.retries ?? DEFALUT_MAX_RECONNECT_RETRIES
const timeout = options?.timeout ?? DEFAULT_MAX_RECONNECT_TIMEOUT

return await this.connect((err) => {
if (err) {
this.isActive = false
if (retries > 0) {
setTimeout(() => this.reconnect({
retries: retries - 1,
timeout
}), timeout)
}
}
return await new Promise((resolve, reject) => {
queueMicrotask(() => {
const promise = this.connect((err) => {
if (err) {
this.isActive = false
if (retries > 0) {
setTimeout(() => this.reconnect({
retries: retries - 1,
timeout
}), timeout)
}
}
})

return promise.then(resolve, reject)
})
})
}

Expand Down Expand Up @@ -544,11 +529,23 @@ export class Conduit extends EventTarget {
* Sends a message with the specified options and payload over the
* WebSocket connection.
* @param {object} options - The options to send.
* @param {Uint8Array} payload - The payload to send.
* @param {Uint8Array=} [payload] - The payload to send.
* @return {boolean}
*/
send (options, payload) {
if (this.isActive) {
send (options, payload = null) {
if (isApplicationPaused || !this.isActive) {
return false
}

if (!payload) {
payload = new Uint8Array(0)
}

if (
this.socket !== null &&
this.socket instanceof WebSocket &&
this.socket.readyState === WebSocket.OPEN
) {
this.socket.send(this.encodeMessage(options, payload))
return true
}
Expand All @@ -562,17 +559,12 @@ export class Conduit extends EventTarget {
close () {
this.shouldReconnect = false

if (this.#loop) {
clearInterval(this.#loop)
this.#loop = 0
}

if (this.socket) {
this.socket.close()
this.socket = null
}

pool.delete(this.#ref)
pool.delete(this)
}

/**
Expand All @@ -582,9 +574,11 @@ export class Conduit extends EventTarget {
*/
[gc.finalizer] () {
return {
args: [this.#ref],
handle (ref) {
pool.delete(ref)
args: [this.socket],
handle (socket) {
if (socket?.readyState === WebSocket.OPEN) {
socket.close()
}
}
}
}
Expand Down

0 comments on commit 54ea7e3

Please sign in to comment.