Skip to content

Commit f768a13

Browse files
authored
Implement serialization header (#120)
* Serialization header concept * Implement header and marker readers * Bump dev version
1 parent 7c43f3d commit f768a13

File tree

9 files changed

+139
-37
lines changed

9 files changed

+139
-37
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Type: Package
22
Package: nanonext
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 1.5.2.9005
4+
Version: 1.5.2.9006
55
Authors@R: c(
66
person("Charlie", "Gao", , "[email protected]", role = c("aut", "cre"),
77
comment = c(ORCID = "0000-0002-0750-061X")),

NAMESPACE

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@ export("%~>%")
4444
export("opt<-")
4545
export(.advance)
4646
export(.context)
47+
export(.header)
4748
export(.interrupt)
4849
export(.keep)
4950
export(.mark)
51+
export(.read_header)
52+
export(.read_marker)
5053
export(.unresolved)
5154
export(call_aio)
5255
export(call_aio_)

R/utils.R

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -297,29 +297,65 @@ status_code <- function(x) .Call(rnng_status_code, x)
297297
serial_config <- function(class, sfunc, ufunc, vec = FALSE)
298298
.Call(rnng_serial_config, class, sfunc, ufunc)
299299

300-
#' Set Serialization Marker
300+
#' Advances the RNG State
301301
#'
302302
#' Internal package function.
303303
#'
304-
#' @param x logical value.
304+
#' @return NULL.
305305
#'
306-
#' @return The logical value `x` supplied.
306+
#' @keywords internal
307+
#' @export
308+
#'
309+
.advance <- function() .Call(rnng_advance_rng_state)
310+
311+
#' Serialization Headers and Markers
312+
#'
313+
#' Internal package functions.
314+
#'
315+
#' @param value integer value.
316+
#'
317+
#' @return For `.header()`: the integer `value` supplied.
307318
#'
308319
#' @keywords internal
309320
#' @export
310321
#'
311-
.mark <- function(x = TRUE) .Call(rnng_set_marker, x)
322+
.header <- function(value = 0L) .Call(rnng_header_set, value)
312323

313-
#' Advances the RNG State
324+
#' Read Serialization Header
314325
#'
315-
#' Internal package function.
326+
#' @param x raw vector.
316327
#'
317-
#' @return NULL.
328+
#' @return For `.read_header()`: integer value.
318329
#'
319330
#' @keywords internal
331+
#' @rdname dot-header
320332
#' @export
321333
#'
322-
.advance <- function() .Call(rnng_advance_rng_state)
334+
.read_header <- function(x) .Call(rnng_header_read, x)
335+
336+
#' Set Serialization Marker
337+
#'
338+
#' @param bool logical value.
339+
#'
340+
#' @return For `.mark()`: the logical `bool` supplied.
341+
#'
342+
#' @keywords internal
343+
#' @rdname dot-header
344+
#' @export
345+
#'
346+
.mark <- function(bool = TRUE) .Call(rnng_marker_set, bool)
347+
348+
#' Read Serialization Marker
349+
#'
350+
#' @param x raw vector.
351+
#'
352+
#' @return For `.read_marker()`: logical value `TRUE` or `FALSE`.
353+
#'
354+
#' @keywords internal
355+
#' @rdname dot-header
356+
#' @export
357+
#'
358+
.read_marker <- function(x) .Call(rnng_marker_read, x)
323359

324360
#' Interrupt Switch
325361
#'

man/dot-header.Rd

Lines changed: 37 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/dot-mark.Rd

Lines changed: 0 additions & 18 deletions
This file was deleted.

src/core.c

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
// internals -------------------------------------------------------------------
66

7-
static uint8_t special_bit = 0;
7+
static int special_marker = 0;
8+
static int special_header = 0;
89
static nano_serial_bundle nano_bundle;
910
static SEXP nano_eval_res;
1011

@@ -315,9 +316,11 @@ void nano_serialize(nano_buf *buf, SEXP object, SEXP hook) {
315316
NANO_ALLOC(buf, NANONEXT_INIT_BUFSIZE);
316317
struct R_outpstream_st output_stream;
317318

318-
if (special_bit) {
319+
if (special_header || special_marker) {
319320
buf->buf[0] = 0x7;
320-
buf->buf[3] = special_bit;
321+
buf->buf[3] = (uint8_t) special_marker;
322+
if (special_header)
323+
memcpy(buf->buf + 4, &special_header, sizeof(int));
321324
buf->cur += 8;
322325
}
323326

@@ -621,13 +624,39 @@ int nano_matchargs(const SEXP mode) {
621624

622625
// specials --------------------------------------------------------------------
623626

624-
SEXP rnng_set_marker(SEXP x) {
627+
SEXP rnng_marker_set(SEXP x) {
625628

626-
special_bit = (uint8_t) NANO_INTEGER(x);
629+
special_marker = NANO_INTEGER(x);
627630
return x;
628631

629632
}
630633

634+
SEXP rnng_marker_read(SEXP x) {
635+
636+
unsigned char *buf = (unsigned char *) NANO_DATAPTR(x);
637+
638+
return Rf_ScalarLogical(TYPEOF(x) == RAWSXP && XLENGTH(x) > 12 && buf[0] == 0x7 && buf[3] == 0x1);
639+
640+
}
641+
642+
SEXP rnng_header_set(SEXP x) {
643+
644+
special_header = NANO_INTEGER(x);
645+
return x;
646+
647+
}
648+
649+
SEXP rnng_header_read(SEXP x) {
650+
651+
unsigned char *buf = (unsigned char *) NANO_DATAPTR(x);
652+
int res = 0;
653+
if (TYPEOF(x) == RAWSXP && XLENGTH(x) > 12 && buf[0] == 0x7) {
654+
memcpy(&res, buf + 4, sizeof(int));
655+
}
656+
return Rf_ScalarInteger(res);
657+
658+
}
659+
631660
SEXP rnng_eval_safe(SEXP arg) {
632661

633662
return R_ToplevelExec(nano_eval_safe, arg) ? nano_eval_res : Rf_allocVector(RAWSXP, 1);

src/init.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,12 +129,16 @@ static const R_CallMethodDef callMethods[] = {
129129
{"rnng_eval_safe", (DL_FUNC) &rnng_eval_safe, 1},
130130
{"rnng_fini", (DL_FUNC) &rnng_fini, 0},
131131
{"rnng_get_opt", (DL_FUNC) &rnng_get_opt, 2},
132+
{"rnng_header_read", (DL_FUNC) &rnng_header_read, 1},
133+
{"rnng_header_set", (DL_FUNC) &rnng_header_set, 1},
132134
{"rnng_interrupt_switch", (DL_FUNC) &rnng_interrupt_switch, 1},
133135
{"rnng_is_error_value", (DL_FUNC) &rnng_is_error_value, 1},
134136
{"rnng_is_nul_byte", (DL_FUNC) &rnng_is_nul_byte, 1},
135137
{"rnng_listen", (DL_FUNC) &rnng_listen, 5},
136138
{"rnng_listener_close", (DL_FUNC) &rnng_listener_close, 1},
137139
{"rnng_listener_start", (DL_FUNC) &rnng_listener_start, 1},
140+
{"rnng_marker_read", (DL_FUNC) &rnng_marker_read, 1},
141+
{"rnng_marker_set", (DL_FUNC) &rnng_marker_set, 1},
138142
{"rnng_messenger", (DL_FUNC) &rnng_messenger, 1},
139143
{"rnng_monitor_create", (DL_FUNC) &rnng_monitor_create, 2},
140144
{"rnng_monitor_read", (DL_FUNC) &rnng_monitor_read, 1},
@@ -153,7 +157,6 @@ static const R_CallMethodDef callMethods[] = {
153157
{"rnng_send", (DL_FUNC) &rnng_send, 5},
154158
{"rnng_send_aio", (DL_FUNC) &rnng_send_aio, 6},
155159
{"rnng_serial_config", (DL_FUNC) &rnng_serial_config, 3},
156-
{"rnng_set_marker", (DL_FUNC) &rnng_set_marker, 1},
157160
{"rnng_set_opt", (DL_FUNC) &rnng_set_opt, 3},
158161
{"rnng_set_promise_context", (DL_FUNC) &rnng_set_promise_context, 2},
159162
{"rnng_signal_thread_create", (DL_FUNC) &rnng_signal_thread_create, 2},

src/nanonext.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,12 +306,16 @@ SEXP rnng_dialer_start(SEXP, SEXP);
306306
SEXP rnng_eval_safe(SEXP);
307307
SEXP rnng_fini(void);
308308
SEXP rnng_get_opt(SEXP, SEXP);
309+
SEXP rnng_header_read(SEXP);
310+
SEXP rnng_header_set(SEXP);
309311
SEXP rnng_interrupt_switch(SEXP);
310312
SEXP rnng_is_error_value(SEXP);
311313
SEXP rnng_is_nul_byte(SEXP);
312314
SEXP rnng_listen(SEXP, SEXP, SEXP, SEXP, SEXP);
313315
SEXP rnng_listener_close(SEXP);
314316
SEXP rnng_listener_start(SEXP);
317+
SEXP rnng_marker_read(SEXP);
318+
SEXP rnng_marker_set(SEXP);
315319
SEXP rnng_messenger(SEXP);
316320
SEXP rnng_messenger_thread_create(SEXP);
317321
SEXP rnng_monitor_create(SEXP, SEXP);
@@ -331,7 +335,6 @@ SEXP rnng_request(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
331335
SEXP rnng_send(SEXP, SEXP, SEXP, SEXP, SEXP);
332336
SEXP rnng_send_aio(SEXP, SEXP, SEXP, SEXP, SEXP, SEXP);
333337
SEXP rnng_serial_config(SEXP, SEXP, SEXP);
334-
SEXP rnng_set_marker(SEXP);
335338
SEXP rnng_set_opt(SEXP, SEXP, SEXP);
336339
SEXP rnng_set_promise_context(SEXP, SEXP);
337340
SEXP rnng_signal_thread_create(SEXP, SEXP);

tests/tests.R

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -206,12 +206,19 @@ test_error(req$opt("false", list()), "type")
206206

207207
test_class("nanoContext", ctx <- context(rep))
208208
test_print(ctx)
209+
test_equal(.header(12345L), 12345L)
209210
test_true(.mark())
210211
test_class("sendAio", csaio <- req$send_aio(data.frame(), mode = "seria", timeout = 500))
211-
test_true(!.mark(FALSE))
212212
test_zero(call_aio_(csaio)$result)
213-
test_class("recvAio", craio <- recv_aio(ctx, timeout = 500))
214-
test_type("list", collect_aio(craio))
213+
test_class("recvAio", craio <- recv_aio(ctx, mode = 8L, timeout = 500))
214+
test_type("raw", res <- collect_aio(craio))
215+
test_true(.read_marker(res))
216+
test_true(!.read_marker("not"))
217+
test_equal(.read_header(res), 12345L)
218+
test_equal(.read_header("not"), 0L)
219+
test_type("list", unserialize(res[9:length(res)]))
220+
test_equal(.header(0L), 0L)
221+
test_true(!.mark(FALSE))
215222
test_zero(req$send("context test", mode ="raw", block = 500))
216223
test_equal(recv(ctx, mode = "string", block = 500), "context test")
217224
test_type("integer", req$send(data.frame(), mode = "seri", block = 500))
@@ -220,6 +227,7 @@ test_type("logical", .unresolved(msg))
220227
test_type("logical", unresolved(msg))
221228
test_class("data.frame", call_aio(msg)$data)
222229
test_true(!unresolved(msg))
230+
test_equal(.header(2025250L), 2025250L)
223231
test_zero(req$send(c(TRUE, FALSE, TRUE), mode = 2L, block = 500))
224232
test_class("recvAio", msg <- recv_aio(ctx, mode = 6L, timeout = 500))
225233
test_type("logical", msg[])
@@ -245,6 +253,7 @@ test_class("recvAio", rek <- request(req$context, c(1+3i, 4+2i), send_mode = "se
245253
test_zero(reply(ctx, execute = identity, recv_mode = 1L, send_mode = 1L, timeout = 500))
246254
test_type("complex", call_aio(rek)[["data"]])
247255
test_type("integer", rek[["aio"]])
256+
test_equal(.header(0L), 0L)
248257

249258
test_type("list", cfg <- serial_config(class = c("invalid", "custom"), sfunc = list(identity, function(x) raw(1L)), ufunc = list(identity, as.integer)))
250259
opt(req$socket, "serial") <- cfg

0 commit comments

Comments
 (0)