Skip to content

Commit 9cedf16

Browse files
committed
std.Io: vectored sendmsg/recvmsg
1 parent 4187d0e commit 9cedf16

File tree

4 files changed

+70
-71
lines changed

4 files changed

+70
-71
lines changed

lib/std/Io.zig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -700,7 +700,7 @@ pub const VTable = struct {
700700
netListenUnix: *const fn (?*anyopaque, *const net.UnixAddress, net.UnixAddress.ListenOptions) net.UnixAddress.ListenError!net.Socket.Handle,
701701
netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
702702
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
703-
netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
703+
netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
704704
/// Returns 0 on end of stream.
705705
netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
706706
netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,

lib/std/Io/Kqueue.zig

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,12 +1404,12 @@ fn netSendOne(
14041404
flags: u32,
14051405
) net.Socket.SendError!void {
14061406
var addr: Io.Threaded.PosixAddress = undefined;
1407-
var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
1407+
const iovecs: []posix.iovec_const = @ptrCast(@constCast(message.data));
14081408
const msg: posix.msghdr_const = .{
1409-
.name = &addr.any,
1410-
.namelen = Io.Threaded.addressToPosix(message.address, &addr),
1411-
.iov = (&iovec)[0..1],
1412-
.iovlen = 1,
1409+
.name = if (message.address) |a| &addr.any else null,
1410+
.namelen = if (message.address) |a| Io.Threaded.addressToPosix(a, &addr) else 0,
1411+
.iov = iovecs.ptr,
1412+
.iovlen = @intCast(iovecs.len),
14131413
// OS returns EINVAL if this pointer is invalid even if controllen is zero.
14141414
.control = if (message.control.len == 0) null else @constCast(message.control.ptr),
14151415
.controllen = @intCast(message.control.len),
@@ -1420,7 +1420,7 @@ fn netSendOne(
14201420
const rc = posix.system.sendmsg(handle, &msg, flags);
14211421
switch (posix.errno(rc)) {
14221422
.SUCCESS => {
1423-
message.data_len = @intCast(rc);
1423+
message.bytes_sent = @intCast(rc);
14241424
return;
14251425
},
14261426
.INTR => continue,
@@ -1455,15 +1455,13 @@ fn netReceive(
14551455
userdata: ?*anyopaque,
14561456
handle: net.Socket.Handle,
14571457
message_buffer: []net.IncomingMessage,
1458-
data_buffer: []u8,
14591458
flags: net.ReceiveFlags,
14601459
timeout: Io.Timeout,
14611460
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
14621461
const k: *Kqueue = @ptrCast(@alignCast(userdata));
14631462
_ = k;
14641463
_ = handle;
14651464
_ = message_buffer;
1466-
_ = data_buffer;
14671465
_ = flags;
14681466
_ = timeout;
14691467
@panic("TODO");

lib/std/Io/Threaded.zig

Lines changed: 37 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -4085,12 +4085,12 @@ fn netSendOne(
40854085
flags: u32,
40864086
) net.Socket.SendError!void {
40874087
var addr: PosixAddress = undefined;
4088-
var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
4088+
const iovecs: []const posix.iovec_const = @ptrCast(message.data);
40894089
const msg: posix.msghdr_const = .{
4090-
.name = &addr.any,
4091-
.namelen = addressToPosix(message.address, &addr),
4092-
.iov = (&iovec)[0..1],
4093-
.iovlen = 1,
4090+
.name = if (message.address) &addr.any else null,
4091+
.namelen = if (message.address) |a| addressToPosix(a, &addr) else 0,
4092+
.iov = iovecs.ptr,
4093+
.iovlen = @intCast(iovecs.len),
40944094
// OS returns EINVAL if this pointer is invalid even if controllen is zero.
40954095
.control = if (message.control.len == 0) null else @constCast(message.control.ptr),
40964096
.controllen = @intCast(message.control.len),
@@ -4127,13 +4127,13 @@ fn netSendOne(
41274127
else => |err| return windows.unexpectedWSAError(err),
41284128
}
41294129
} else {
4130-
message.data_len = @intCast(rc);
4130+
message.bytes_sent = @intCast(rc);
41314131
return;
41324132
}
41334133
}
41344134
switch (posix.errno(rc)) {
41354135
.SUCCESS => {
4136-
message.data_len = @intCast(rc);
4136+
message.bytes_sent = @intCast(rc);
41374137
return;
41384138
},
41394139
.INTR => continue,
@@ -4171,21 +4171,19 @@ fn netSendMany(
41714171
) net.Socket.SendError!usize {
41724172
var msg_buffer: [64]std.os.linux.mmsghdr = undefined;
41734173
var addr_buffer: [msg_buffer.len]PosixAddress = undefined;
4174-
var iovecs_buffer: [msg_buffer.len]posix.iovec = undefined;
41754174
const min_len: usize = @min(messages.len, msg_buffer.len);
41764175
const clamped_messages = messages[0..min_len];
41774176
const clamped_msgs = (&msg_buffer)[0..min_len];
41784177
const clamped_addrs = (&addr_buffer)[0..min_len];
4179-
const clamped_iovecs = (&iovecs_buffer)[0..min_len];
41804178

4181-
for (clamped_messages, clamped_msgs, clamped_addrs, clamped_iovecs) |*message, *msg, *addr, *iovec| {
4182-
iovec.* = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
4179+
for (clamped_messages, clamped_msgs, clamped_addrs) |*message, *msg, *addr| {
4180+
const iovecs: []posix.iovec = @ptrCast(@constCast(message.data));
41834181
msg.* = .{
41844182
.hdr = .{
4185-
.name = &addr.any,
4186-
.namelen = addressToPosix(message.address, addr),
4187-
.iov = iovec[0..1],
4188-
.iovlen = 1,
4183+
.name = if (message.address == null) null else &addr.any,
4184+
.namelen = if (message.address) |a| addressToPosix(a, addr) else 0,
4185+
.iov = iovecs.ptr,
4186+
.iovlen = @intCast(iovecs.len),
41894187
.control = @constCast(message.control.ptr),
41904188
.controllen = message.control.len,
41914189
.flags = 0,
@@ -4201,7 +4199,7 @@ fn netSendMany(
42014199
.SUCCESS => {
42024200
const n: usize = @intCast(rc);
42034201
for (clamped_messages[0..n], clamped_msgs[0..n]) |*message, *msg| {
4204-
message.data_len = msg.len;
4202+
message.bytes_sent = msg.len;
42054203
}
42064204
return n;
42074205
},
@@ -4236,7 +4234,6 @@ fn netReceivePosix(
42364234
userdata: ?*anyopaque,
42374235
handle: net.Socket.Handle,
42384236
message_buffer: []net.IncomingMessage,
4239-
data_buffer: []u8,
42404237
flags: net.ReceiveFlags,
42414238
timeout: Io.Timeout,
42424239
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
@@ -4246,10 +4243,6 @@ fn netReceivePosix(
42464243

42474244
// recvmmsg is useless, here's why:
42484245
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
4249-
// * it wants iovecs for each message but we have a better API: one data
4250-
// buffer to handle all the messages. The better API cannot be lowered to
4251-
// the split vectors though because reducing the buffer size might make
4252-
// some messages unreceivable.
42534246

42544247
// So the strategy instead is to use non-blocking recvmsg calls, calling
42554248
// poll() with timeout if the first one returns EAGAIN.
@@ -4267,7 +4260,6 @@ fn netReceivePosix(
42674260
},
42684261
};
42694262
var message_i: usize = 0;
4270-
var data_i: usize = 0;
42714263

42724264
const deadline = timeout.toDeadline(t_io) catch |err| return .{ err, message_i };
42734265

@@ -4276,14 +4268,15 @@ fn netReceivePosix(
42764268

42774269
if (message_buffer.len - message_i == 0) return .{ null, message_i };
42784270
const message = &message_buffer[message_i];
4279-
const remaining_data_buffer = data_buffer[data_i..];
42804271
var storage: PosixAddress = undefined;
4281-
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
4272+
4273+
const iovecs: []posix.iovec = @ptrCast(message.data);
4274+
42824275
var msg: posix.msghdr = .{
42834276
.name = &storage.any,
42844277
.namelen = @sizeOf(PosixAddress),
4285-
.iov = (&iov)[0..1],
4286-
.iovlen = 1,
4278+
.iov = iovecs.ptr,
4279+
.iovlen = @intCast(iovecs.len),
42874280
.control = message.control.ptr,
42884281
.controllen = @intCast(message.control.len),
42894282
.flags = undefined,
@@ -4292,11 +4285,9 @@ fn netReceivePosix(
42924285
const recv_rc = posix.system.recvmsg(handle, &msg, posix_flags);
42934286
switch (posix.errno(recv_rc)) {
42944287
.SUCCESS => {
4295-
const data = remaining_data_buffer[0..@intCast(recv_rc)];
4296-
data_i += data.len;
42974288
message.* = .{
42984289
.from = addressFromPosix(&storage),
4299-
.data = data,
4290+
.data = message.data,
43004291
.control = if (msg.control) |ptr| @as([*]u8, @ptrCast(ptr))[0..msg.controllen] else message.control,
43014292
.flags = .{
43024293
.eor = (msg.flags & posix.MSG.EOR) != 0,
@@ -4305,6 +4296,7 @@ fn netReceivePosix(
43054296
.oob = (msg.flags & posix.MSG.OOB) != 0,
43064297
.errqueue = if (@hasDecl(posix.MSG, "ERRQUEUE")) (msg.flags & posix.MSG.ERRQUEUE) != 0 else false,
43074298
},
4299+
.bytes_received = @intCast(recv_rc),
43084300
};
43094301
message_i += 1;
43104302
continue;
@@ -4366,7 +4358,6 @@ fn netReceiveWindows(
43664358
userdata: ?*anyopaque,
43674359
handle: net.Socket.Handle,
43684360
message_buffer: []net.IncomingMessage,
4369-
data_buffer: []u8,
43704361
flags: net.ReceiveFlags,
43714362
timeout: Io.Timeout,
43724363
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
@@ -4375,7 +4366,6 @@ fn netReceiveWindows(
43754366
_ = t;
43764367
_ = handle;
43774368
_ = message_buffer;
4378-
_ = data_buffer;
43794369
_ = flags;
43804370
_ = timeout;
43814371
@panic("TODO implement netReceiveWindows");
@@ -4385,14 +4375,12 @@ fn netReceiveUnavailable(
43854375
userdata: ?*anyopaque,
43864376
handle: net.Socket.Handle,
43874377
message_buffer: []net.IncomingMessage,
4388-
data_buffer: []u8,
43894378
flags: net.ReceiveFlags,
43904379
timeout: Io.Timeout,
43914380
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
43924381
_ = userdata;
43934382
_ = handle;
43944383
_ = message_buffer;
4395-
_ = data_buffer;
43964384
_ = flags;
43974385
_ = timeout;
43984386
return .{ error.NetworkDown, 0 };
@@ -5387,8 +5375,7 @@ fn lookupDns(
53875375
for (mapped_nameservers) |*ns| {
53885376
message_buffer[message_i] = .{
53895377
.address = ns,
5390-
.data_ptr = query.ptr,
5391-
.data_len = query.len,
5378+
.data = &[_][]const u8{query},
53925379
};
53935380
message_i += 1;
53945381
}
@@ -5402,11 +5389,22 @@ fn lookupDns(
54025389
} };
54035390

54045391
while (true) {
5405-
var message_buffer: [max_messages]Io.net.IncomingMessage = @splat(.init);
5392+
var message_buffer: [max_messages]Io.net.IncomingMessage = undefined;
54065393
const buf = answer_buffer[answer_buffer_i..];
5407-
const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, buf, .{}, timeout);
5394+
5395+
{ // Partition buffer among messages - each gets equal share
5396+
const buf_per_message = @max(1, buf.len / max_messages);
5397+
var data_buffers: [max_messages][1][]u8 = undefined;
5398+
var it = std.mem.window(u8, buf, buf_per_message, buf_per_message);
5399+
for (&message_buffer, &data_buffers) |*msg, *data_buf| {
5400+
data_buf[0] = @constCast(it.next() orelse &[_]u8{});
5401+
msg.* = .init(data_buf);
5402+
}
5403+
}
5404+
5405+
const recv_err, const recv_n = socket.receiveManyTimeout(t_io, &message_buffer, .{}, timeout);
54085406
for (message_buffer[0..recv_n]) |*received_message| {
5409-
const reply = received_message.data;
5407+
const reply = received_message.data[0][0..received_message.bytes_received];
54105408
// Ignore non-identifiable packets.
54115409
if (reply.len < 4) continue;
54125410

@@ -5438,8 +5436,7 @@ fn lookupDns(
54385436
2 => {
54395437
var retry_message: Io.net.OutgoingMessage = .{
54405438
.address = ns,
5441-
.data_ptr = query.ptr,
5442-
.data_len = query.len,
5439+
.data = &.{query},
54435440
};
54445441
_ = netSendPosix(t, socket.handle, (&retry_message)[0..1], .{});
54455442
continue;

lib/std/Io/net.zig

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -901,21 +901,25 @@ pub const ReceiveFlags = packed struct(u8) {
901901
pub const IncomingMessage = struct {
902902
/// Populated by receive functions.
903903
from: IpAddress,
904-
/// Populated by receive functions, points into the caller-supplied buffer.
905-
data: []u8,
904+
/// Buffers to receive into. Can be a single buffer or multiple for vectored I/O.
905+
data: [][]u8,
906906
/// Supplied by caller before calling receive functions; mutated by receive
907907
/// functions.
908908
control: []u8,
909909
/// Populated by receive functions.
910910
flags: Flags,
911+
/// Populated by receive functions with total bytes received across all buffers.
912+
bytes_received: usize = undefined,
911913

912914
/// Useful for initializing before calling `receiveManyTimeout`.
913-
pub const init: IncomingMessage = .{
914-
.from = undefined,
915-
.data = undefined,
916-
.control = &.{},
917-
.flags = undefined,
918-
};
915+
pub fn init(data: [][]u8) IncomingMessage {
916+
return .{
917+
.from = undefined,
918+
.data = data,
919+
.control = &.{},
920+
.flags = undefined,
921+
};
922+
}
919923

920924
pub const Flags = packed struct(u8) {
921925
/// indicates end-of-record; the data returned completed a record
@@ -937,12 +941,13 @@ pub const IncomingMessage = struct {
937941
};
938942

939943
pub const OutgoingMessage = struct {
940-
address: *const IpAddress,
941-
data_ptr: [*]const u8,
942-
/// Initialized with how many bytes of `data_ptr` to send. After sending
943-
/// succeeds, replaced with how many bytes were actually sent.
944-
data_len: usize,
944+
/// For connectionless sockets (UDP). For connected sockets (TCP, Unix), pass null.
945+
address: ?*const IpAddress,
946+
/// Buffers to send. Can be a single buffer or multiple for scatter-gather I/O.
947+
data: []const []const u8,
945948
control: []const u8 = &.{},
949+
/// Populated by send functions with total bytes sent across all buffers.
950+
bytes_sent: usize = undefined,
946951
};
947952

948953
pub const SendFlags = packed struct(u8) {
@@ -1083,10 +1088,10 @@ pub const Socket = struct {
10831088

10841089
/// Transfers `data` to `dest`, connectionless, in one packet.
10851090
pub fn send(s: *const Socket, io: Io, dest: *const IpAddress, data: []const u8) SendError!void {
1086-
var message: OutgoingMessage = .{ .address = dest, .data_ptr = data.ptr, .data_len = data.len };
1091+
var message: OutgoingMessage = .{ .address = dest, .data = &.{data} };
10871092
const err, const n = io.vtable.netSend(io.userdata, s.handle, (&message)[0..1], .{});
10881093
if (n != 1) return err.?;
1089-
if (message.data_len != data.len) return error.MessageOversize;
1094+
if (message.bytes_sent != data.len) return error.MessageOversize;
10901095
}
10911096

10921097
pub fn sendMany(s: *const Socket, io: Io, messages: []OutgoingMessage, flags: SendFlags) SendError!void {
@@ -1118,8 +1123,8 @@ pub const Socket = struct {
11181123
/// See also:
11191124
/// * `receiveTimeout`
11201125
pub fn receive(s: *const Socket, io: Io, buffer: []u8) ReceiveError!IncomingMessage {
1121-
var message: IncomingMessage = .init;
1122-
const maybe_err, const count = io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, .none);
1126+
var message: IncomingMessage = .init(&.{buffer});
1127+
const maybe_err, const count = io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], .{}, .none);
11231128
if (maybe_err) |err| switch (err) {
11241129
// No timeout is passed to `netReceieve`, so it must not return timeout related errors.
11251130
error.Timeout, error.UnsupportedClock => unreachable,
@@ -1144,8 +1149,8 @@ pub const Socket = struct {
11441149
buffer: []u8,
11451150
timeout: Io.Timeout,
11461151
) ReceiveTimeoutError!IncomingMessage {
1147-
var message: IncomingMessage = .init;
1148-
const maybe_err, const count = io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, timeout);
1152+
var message: IncomingMessage = .init(&.{buffer});
1153+
const maybe_err, const count = io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], .{}, timeout);
11491154
if (maybe_err) |err| return err;
11501155
assert(1 == count);
11511156
return message;
@@ -1163,14 +1168,13 @@ pub const Socket = struct {
11631168
pub fn receiveManyTimeout(
11641169
s: *const Socket,
11651170
io: Io,
1166-
/// Function assumes each element has initialized `control` field.
1171+
/// Function assumes each element has initialized `control` and `data` fields.
11671172
/// Initializing with `IncomingMessage.init` may be helpful.
11681173
message_buffer: []IncomingMessage,
1169-
data_buffer: []u8,
11701174
flags: ReceiveFlags,
11711175
timeout: Io.Timeout,
11721176
) struct { ?ReceiveTimeoutError, usize } {
1173-
return io.vtable.netReceive(io.userdata, s.handle, message_buffer, data_buffer, flags, timeout);
1177+
return io.vtable.netReceive(io.userdata, s.handle, message_buffer, flags, timeout);
11741178
}
11751179
};
11761180

0 commit comments

Comments
 (0)