Skip to content

WebSocket API Spec

Om Duggineni edited this page May 3, 2024 · 37 revisions

The main goals of the socket API are (in this order):

  • Minimize time spent programming the application
  • Minimize latency to the end user (changes made should propagate as fast as possible)
  • Minimize the load on the server side

The main ways that we can achieve these goals are:

  • Reuse connections where possible to avoid the latency associated with setting up new connections, as long as the size of the data being sent is not large
  • Use language or database level mechanisms to track listeners when possible instead of making our own
  • Stick with data formats everyone on the team can be expected to know about

Potential (but minor) disadvantages of the below approach:

  • Our OpenAPI docs will not allow making requests through the websocket handler directly as they do for other regular handlers, mostly because OpenAPI doesn’t really support any non-janky forms of full-duplex async communication. There aren’t many ways around this. That said, the OpenAPI spec will mention a websocket endpoint with some version of this document embedded or linked, so very little is lost.
    • We can also still use Pydantic on the server side and Dart on the client side to create data models.

Data Structures

  • It is important that we use the secure websocket protocol (wss://) as opposed to ws://.
  • All data sent across the socket must be a valid JSON object. It should ideally be possible to display this data directly without making more requests. If we're really squeezing performance out of the protocol, we can use the binary format MessagePack (https://msgpack.org/), but that's likely a bit overkill.
  • Most messages sent through the socket has the following fields (and maybe others)
    • type (string ENUM): the type of message being sent. Message types not understood by the client/server are to be ignored.
    • requestId (integer): disambiguates different listeners happening simultaneously. Exempted in specific “initialization” requests. Decided by the client, and must be unique for each request.

Initialization

The client starts by setting up a websocket using the secure websocket protocol (wss://).

The client then sends an “init” message of the following format:

  • type: “init”
  • version: 1
  • authToken: “<a JWT token>”
  • huntId: “<the MongoDB ObjectID of the hunt, as a string>”
  • protocolExtensions: any subset of ['partialSync', 'getData'], in any order
Details
  • We can add further things to this as we go
  • Pydantic model:
from pydantic import BaseModel
from typing import Literal
class InitMessage(BaseModel):
    type: Literal["init"]
    version: int
    authToken: str
    huntId: str
    protocolExtensions: List[str]

The server responds with one of:

  • success

    • type (string ENUM): “initResponse”
    • success (bool): true
    • protocolExtensions (List<string>): <list of protocol extensions the server supports>
  • failure

    • type (string ENUM): “initResponse”
    • success (bool): false
    • errorMessage (string ENUM): “<error message>”
Details
  • Possible values of errorMessage:
    • "INVALID_REQUEST": the client sent something malformed
    • "INVALID_TOKEN": the client's auth token is invalid
    • "UPGRADE_API_VERSION": the version that the client requested is too old to be supported
    • "INVALID_API_VERSION": the version that the client requested never existed
  • Pydantic models:
from pydantic import BaseModel
from typing import Literal, Union
from enum import Enum

class InitResponseBaseMessage(BaseModel):
    type: Literal["initResponse"]
    success: bool
class InitResponseSuccessMessage(InitResponseBaseMessage):
    pass
class ErrorMessage(str, Enum):
    INVALID_REQUEST = "INVALID_REQUEST"
    INVALID_TOKEN = "INVALID_TOKEN"
    UPGRADE_API_VERSION = "UPGRADE_API_VERSION"
    INVALID_API_VERSION = "INVALID_API_VERSION"
class InitResponseFailureMessage(InitResponseBaseMessage):
    errorMessage: ErrorMessage

InitResponseMessage = Union[InitResponseSuccess, InitResponseFailure]

After a failure response, the server terminates the connection. Messages defined below can be sent if the server sends a success response.

Making a request / registering a listener

The client can register a listener by sending the following message

  • type: “addListener”
  • requestId: see above
  • listenTo: one of the below
    • {type: "upcomingHunts"}: listen for upcoming hunts
    • {type: "huntTeams", include: ['members', 'memberProfiles', 'questionsSolvedData']}: listen for teams currently in the hunt
    • {type: "huntTeam", team: "<object id>", include: ['members', 'memberProfiles']}: listen for teams currently in the hunt. Also return additional info based on the contents of include.
    • {type: "huntQuestionStatus", team: "<object id>", include: ['question', 'hintsRequested']}: listen for the current question status, optionally including the question itself and the hints that the team has requested (only the hints that the team has explicitly requested)
    • {type: "teamJoinRequests", team: "<object id>"}: listen to the current set of join requests for a team. Returns the profile of each player on the list of join requests as well as an ID for each join request. This API must validate first that the user that originally authenticated under this socket does in fact have access to view the join requests for the team mentioned (i.e. is the team leader). This is a good first candidate for partialSync if it gets implemented, but I don't think it needs to use partialSync.
    • {type: "leaderboard", include: ['score', 'members', 'questionsSolved']}: listen to updates in the list of teams on the leaderboard. Adds additional info based on the contents of include.

If the server dislikes the client's request, the server can respond with an error:

  • type: “addListenerError”
  • requestId: the same integer that the client sent with the initial request
  • errorMessage: ""

Otherwise, the server responds by immediately sending an "Update" messages and sending further updates when data is updated in the database.

Details
  • Possible values of errorMessage:

    • "UNAUTHORIZED_ACCESS": the client does not have access to view this resource (return this error if the client tries to view join requests without being the team leader)
    • "INVALID_REQUEST": the request was invalid and did not confirm to the above API spec
    • "REPEATED_REQUEST_ID": the requestId was not unique. Whether the server actually chooses to send an error message in the case of REPEATED_REQUEST_ID is an implementation detail.
  • Pydantic models:

from pydantic import BaseModel
from typing import Literal, Union
from enum import Enum

class AddListenerErrorMessage(str, Enum):
    INVALID_REQUEST = "INVALID_REQUEST"
    UNAUTHORIZED_ACCESS = "UNAUTHORIZED_ACCESS"

class AddListenerError(BaseModel):
    type: Literal["addListenerError"]
    requestId: int
    errorMessage: AddListenerErrorMessage

[optional extension] Making a request without registering a listener

The client can make a request directly without listening to future updates by using the "getData" method. Otherwise, the syntax is the same. In response to a getData, the server sends exactly one "update/sync" message.

This is optional because we could technically implement the same functionality with regular HTTP requests. One argument for this is that we only have to maintain one schema instead of two (+ code reuse on client-side, performance improvements). I can see arguments for the other side though.

  • type: “getData”
  • requestId: see above
  • listenTo: one of the below
    • {type: "upcomingHunts"}: listen for upcoming hunts
    • {type: "huntTeams", include: ['members', 'memberProfiles', 'questionsSolvedData']}: listen for teams currently in the hunt
    • {type: "huntTeam", team: "<object id>", include: ['members', 'memberProfiles']}: listen for teams currently in the hunt. Also return additional info based on the contents of include.
    • {type: "huntQuestionStatus", team: "<object id>", include: ['question', 'hintsRequested']}: listen for the current question status, optionally including the question itself and the hints that the team has requested (only the hints that the team has explicitly requested)
    • {type: "teamJoinRequests", team: "<object id>"}: listen to the current set of join requests for a team. Returns the profile of each player on the list of join requests as well as an ID for each join request. This API must validate first that the user that originally authenticated under this socket does in fact have access to view the join requests for the team mentioned (i.e. is the team leader).
    • {type: "leaderboard", include: ['score', 'members', 'questionsSolved']}: listen to updates in the list of teams on the leaderboard. Adds additional info based on the contents of include.

If the server dislikes the client's request, the server can respond with the same error as above (for the streaming version). No schema changes required.

Updates

The server can send the full data of an update to a client by sending an update/sync message.

  • type: “update/sync”
  • requestId: whatever the client sent
  • data: <any>
Details
  • We can borrow the models for this from the schema for the Hunts database. Some data may need to be removed or rearranged.

[optional extension] Partial Sync

If the server has already attempted to sync data, the server can update the synced data on the client side directly without resending the full dataset. I doubt this will ever be implemented on the server/client side, it has some performance benefits at the cost of dev time. I'm personally too lazy to implement it but wanted to spec it out. update/partialSync messages may only be sent if the client includes partialSync in their protocolExtensions list.

  • type: “update/partialSync”
  • requestId: whatever the client sent
  • update: <a list of objects>
    • {type: "insert", index: 5, value: {type: "joinRequest", memberId: 1237841243, memberName: "John Doe", memberProfile: }}
    • {type: "update", index: 5, value: {type: "joinRequest", memberId: 1237841243, memberName: "John Doe", memberProfile: }}
    • {type: "delete", index: 5, value: {type: "joinRequest", memberId: 1237841243, memberName: "John Doe", memberProfile: }}

Deregistering a listener

A client can cancel any particular listener by sending a "deregister" message like this (no additional data sent):

  • type: “deregisterListener”
  • requestId: whatever the client originally sent for this request

Disconnecting

  • Normally when the client disconnects, the server receives a disconnect event that it can handle.
  • Clients could disconnect due to internet loss, in which case the server does not recieve the disconnect packet
    • To mitigate this, the server should send a WebSocket ping every [implementation detail, no more than 5 ideally] seconds, which will (automatically) be responded to by a “pong” on the client side. If the client does not respond to the pong for [implementation detail, no more than 5 ideally] seconds, the server can consider the WebSocket connection “dead” and terminate it.
  • The team leader disconnecting (either by explicitly closing the WebSocket connection or by disconnecting and having the connection time out) could result in the server re-assigning the team leader within the database to be someone else. The server may want to wait for a short period [implementation detail, no more than 60 ideally] to check if the team leader reconnects via a separate WebSocket.
  • The server does not need to retain any data from a client session after the client disconnects. The client will re-establish any prior requests it had prior to disconnecting.

Implementation Notes

Dart

Dart has a feature called Streams and StreamBuilders that will make using the API in the UI very easy once the API itself is built. It looks something like this (very simple example):

// clock_stream.dart

import 'dart:async';

class ClockStream {
  final bool _running = true;

  Stream<String> get clockStream async* {
    while (_running) {
      await Future<void>.delayed(const Duration(seconds: 1));
      DateTime now = DateTime.now();
      yield "${now.hour} : ${now.minute} : ${now.second}";
    }
  }
}


// main.dart

import 'package:flutter/material.dart';
import 'clock_stream.dart'; // Import the ClockStream class

void main() {
  runApp(const MyApp());
}

class MyApp extends StatelessWidget {
  const MyApp({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    return MaterialApp(
      debugShowCheckedModeBanner: false,
      title: 'Real-time Clock App',
      theme: ThemeData(
        primarySwatch: Colors.blue,
      ),
      home: const HomePage(),
    );
  }
}

class HomePage extends StatelessWidget {
  const HomePage({Key? key}) : super(key: key);

  @override
  Widget build(BuildContext context) {
    final clockStream = ClockStream().clockStream; // Initialize the stream

    return Scaffold(
      appBar: AppBar(
        title: const Text('Real-time Clock App'),
      ),
      body: Center(
        child: StreamBuilder<String>(
          stream: clockStream,
          builder: (context, AsyncSnapshot<String> snapshot) {
            if (snapshot.connectionState == ConnectionState.waiting) {
              return const CircularProgressIndicator();
            }
            return Text(
              snapshot.data ?? '',
              style: TextStyle(fontSize: 24),
            );
          },
        ),
      ),
    );
  }
}

Note how all that is necessary is to wrap the existing UI in a StreamBuilder widget and write a function to convert from the data object to a Flutter widget tree.

Server-Side

  • We can use MongoDB change streams to stream changes from the database. This replaces older solutions for doing so, such as "tailing the oplog." See https://www.mongodb.com/docs/manual/changeStreams/.

    • Whether data from the database is "re-fetched" or not upon changes is an implementation detail. This seems like a reasonable way to implement it and save dev time however.
  • There isn't really an upper bound on the number of change streams on a MongoDB databse and they are relatively lightweight, so performance is unlikely to be a problem for us here.

  • WebSockets are implemented as in https://fastapi.tiangolo.com/advanced/websockets/#await-for-messages-and-send-messages

  • You will likely not need to keep track of WebSocket connections explicitly. Use Python's async/await features to implicitly keep track of WebSockets. If you find yourself passing WebSocket objects around or storing them in some global data structure (instead of using async/await), your solution to the problem is unlikely to be the simplest one.

  • See https://fastapi.tiangolo.com/advanced/testing-websockets/ for information on testing WebSockets

Sample Protocol Exchange

client -> server

{
  "type": "init",
  "version": 1,
  "authToken": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c",
  "huntId": "627e16d11dbc57e08645d88c",
  "protocolExtensions": []
}

server -> client

{
  "type": "initResponse",
  "success": true,
  "protocolExtensions": []
}

client -> server

{
  "type": "addListener",
  "requestId": 1,
  "listenTo": {
    "type": "teamJoinRequests",
    "team": "<not my team's ObjectID>"
  }
}

server -> client

{
  "type": "addListenerError",
  "requestId": 1,
  "errorMessage": "UNAUTHORIZED_ACCESS"
}

client -> server

{
  "type": "addListener",
  "requestId": 2,
  "listenTo": {
    "type": "huntTeams",
    "include": [
      "members",
      "memberProfiles",
      "questionsSolvedData"
    ]
  }
}

server -> client

{
  "type": "update/sync",
  "requestId": 2,
  "data": [
    {
      "id": "627e16d11dbc57e08645d88d",
      "name": "Team 1",
      "members": <name and member info>,
      "memberProfiles": <profiles of each of the members>,
      "questionsSolvedData": <borrow this verbatim from the database docs>
    }
  ]
}

[the teams update on the backend]

server -> client

{
  "type": "update/sync",
  "requestId": 2,
  "data": [
    {
      "id": "627e16d11dbc57e08645d88d",
      "name": "Team 1",
      "members": <name and member info>,
      "memberProfiles": <profiles of each of the members>,
      "questionsSolvedData": <borrow this verbatim from the database docs>
    },
    {
      "id": "627e16d11dbc57e08645d88d",
      "name": "Team 2",
      "members": <name and member info>,
      "memberProfiles": <profiles of each of the members>,
      "questionsSolvedData": <borrow this verbatim from the database docs>
    }
  ]
}

client -> server

{
  "type": "addListener",
  "requestId": 3,
  "listenTo": {
    "type": "teamJoinRequests",
    "team": "<my team's ObjectID>"
  }
}

server -> client

{
  "type": "update/sync",
  "requestId": 3,
  "data": [
    {
      "id": "627e16d11dbc57e08645d88d",
      "profile": <borrow this from the database docs>
    }
  ]
}

[a new person requests to join the team]

server -> client

{
  "type": "update/sync",
  "requestId": 3,
  "data": [
    {
      "id": "627e16d11dbc57e08645d88d",
      "profile": <borrow this from the database docs>
    },
    {
      "id": "627e16d11dbc57e08645d88d",
      "profile": <borrow this from the database docs>
    }
  ]
}

[a new team joins the hunt]

server -> client

{
  "type": "deregisterListener",
  "requestId": 2
}

[a new team joins the hunt, nothing gets sent]