Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions examples/websocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ export default function () {
console.log('connected');
socket.send(Date.now());

// Send a regular ping
socket.ping();
console.log("Sent a ping without application data");

// Send a ping with application data
socket.ping("application-data");
console.log("Sent a ping with application data");

socket.setInterval(function timeout() {
socket.ping();
console.log("Pinging every 1sec (setInterval test)");
Expand Down
41 changes: 38 additions & 3 deletions internal/js/modules/k6/experimental/websockets/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -678,24 +679,58 @@ func isArrayBufferView(rt *sobek.Runtime, v sobek.Value) (bool, error) {
}

// Ping sends a ping message over the websocket.
func (w *webSocket) ping() {
// It can optionally include application data as per RFC 6455 section 5.5.2.
func (w *webSocket) ping(args ...sobek.Value) {
w.assertStateOpen()

pingID := strconv.Itoa(w.sendPings.counter)

var data []byte
if len(args) > 0 && !sobek.IsUndefined(args[0]) && !sobek.IsNull(args[0]) {
if args[0].String() != "" {
// Use a special delimiter "|" to separate ping ID from application data
data = []byte(pingID + "|" + args[0].String())
} else {
data = []byte(pingID)
}
} else {
data = []byte(pingID)
}

w.writeQueueCh <- message{
mtype: websocket.PingMessage,
data: []byte(pingID),
data: data,
t: time.Now(),
}

w.sendPings.timestamps[pingID] = time.Now()
w.sendPings.counter++
}

func (w *webSocket) trackPong(pingID string) {
func (w *webSocket) trackPong(pongData string) {
pongTimestamp := time.Now()

var pingID string
if parts := strings.SplitN(pongData, "|", 2); len(parts) > 0 {
pingID = parts[0]
} else {
for i, c := range pongData {
if c < '0' || c > '9' {
// Found non-numeric character, extract the ID
pingID = pongData[:i]
break
}
if i == len(pongData)-1 {
pingID = pongData
}
}
}

if pingID == "" {
w.vu.State().Logger.Warnf("received pong with invalid ping ID format")
return
}

pingTimestamp, ok := w.sendPings.timestamps[pingID]
if !ok {
// We received a pong for a ping we didn't send; ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1546,3 +1546,59 @@ func TestReadyStateSwitch(t *testing.T) {
logs := hook.Drain()
require.Len(t, logs, 0)
}

func TestSessionPingWithApplicationData(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
sr := tb.Replacer.Replace

ts := newTestState(t)

_, err := ts.runtime.RunOnEventLoop(sr(`
var ws = new WebSocket("WSBIN_URL/ws-echo")
var applicationData = "hello-ping-data";
ws.onopen = () => {
ws.ping(applicationData)
}

ws.onpong = () => {
call("from onpong")
ws.close()
}
ws.onerror = (e) => { throw JSON.stringify(e) }
`))

require.NoError(t, err)

samplesBuf := metrics.GetBufferedSamples(ts.samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), http.StatusSwitchingProtocols, "")
assert.Equal(t, []string{"from onpong"}, ts.callRecorder.Recorded())
}

func TestSessionPingWithNumericApplicationData(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
sr := tb.Replacer.Replace

ts := newTestState(t)

_, err := ts.runtime.RunOnEventLoop(sr(`
var ws = new WebSocket("WSBIN_URL/ws-echo")
var applicationData = "123456-numeric-ping-data";
ws.onopen = () => {
ws.ping(applicationData)
}

ws.onpong = () => {
call("from onpong")
ws.close()
}
ws.onerror = (e) => { throw JSON.stringify(e) }
`))

require.NoError(t, err)

samplesBuf := metrics.GetBufferedSamples(ts.samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), http.StatusSwitchingProtocols, "")
assert.Equal(t, []string{"from onpong"}, ts.callRecorder.Recorded())
}
37 changes: 34 additions & 3 deletions internal/js/modules/k6/ws/ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,10 +376,22 @@ func (s *Socket) SendBinary(message sobek.Value) {
}

// Ping sends a ping message over the websocket.
func (s *Socket) Ping() {
// It can optionally include application data as per RFC 6455 section 5.5.2.
func (s *Socket) Ping(args ...sobek.Value) {
deadline := time.Now().Add(writeWait)
pingID := strconv.Itoa(s.pingSendCounter)
data := []byte(pingID)

var data []byte
if len(args) > 0 && !sobek.IsUndefined(args[0]) && !sobek.IsNull(args[0]) {
if args[0].String() != "" {
// Add the pingID with a delimiter before the application data
data = []byte(pingID + "|" + args[0].String())
} else {
data = []byte(pingID)
}
} else {
data = []byte(pingID)
}

err := s.conn.WriteControl(websocket.PingMessage, data, deadline)
if err != nil {
Expand All @@ -391,9 +403,28 @@ func (s *Socket) Ping() {
s.pingSendCounter++
}

func (s *Socket) trackPong(pingID string) {
func (s *Socket) trackPong(pongData string) {
pongTimestamp := time.Now()

var pingID string
if parts := strings.SplitN(pongData, "|", 2); len(parts) > 0 {
pingID = parts[0]
} else {
for i, c := range pongData {
if c < '0' || c > '9' {
pingID = pongData[:i]
break
}
if i == len(pongData)-1 {
pingID = pongData
}
}
}

if pingID == "" {
return
}

if _, ok := s.pingSendTimestamps[pingID]; !ok {
// We received a pong for a ping we didn't send; ignore
// (this shouldn't happen with a compliant server)
Expand Down
60 changes: 60 additions & 0 deletions internal/js/modules/k6/ws/ws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1226,3 +1226,63 @@ func TestWSConnectDisableThrowErrorOption(t *testing.T) {
entries := logHook.Drain()
assert.Empty(t, entries)
}

func TestSessionPingWithApplicationData(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
sr := tb.Replacer.Replace

test := newTestState(t)
_, err := test.VU.Runtime().RunString(sr(`
var pongReceived = false;
var applicationData = "hello-ping-data";
var res = ws.connect("WSBIN_URL/ws-echo", function(socket){
socket.on("open", function(data) {
// Send ping with application data
socket.ping(applicationData);
});
socket.on("pong", function() {
pongReceived = true;
socket.close();
});
socket.setTimeout(function (){socket.close();}, 3000);
});
if (!pongReceived) {
throw new Error ("sent ping with application data but didn't get pong back");
}
`))
require.NoError(t, err)
samplesBuf := metrics.GetBufferedSamples(test.samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)
}

func TestSessionPingWithNumericApplicationData(t *testing.T) {
t.Parallel()
tb := httpmultibin.NewHTTPMultiBin(t)
sr := tb.Replacer.Replace

test := newTestState(t)
_, err := test.VU.Runtime().RunString(sr(`
var pongReceived = false;
var applicationData = "123456-numeric-ping-data";
var res = ws.connect("WSBIN_URL/ws-echo", function(socket){
socket.on("open", function(data) {
// Send ping with application data that starts with numbers
socket.ping(applicationData);
});
socket.on("pong", function() {
pongReceived = true;
socket.close();
});
socket.setTimeout(function (){socket.close();}, 3000);
});
if (!pongReceived) {
throw new Error ("sent ping with numeric application data but didn't get pong back");
}
`))
require.NoError(t, err)
samplesBuf := metrics.GetBufferedSamples(test.samples)
assertSessionMetricsEmitted(t, samplesBuf, "", sr("WSBIN_URL/ws-echo"), statusProtocolSwitch, "")
assertMetricEmittedCount(t, metrics.WSPingName, samplesBuf, sr("WSBIN_URL/ws-echo"), 1)
}
Loading