diff --git a/scripts/RemoteTests.json b/scripts/RemoteTests.json index 586dc25a7f..485ad86221 100644 --- a/scripts/RemoteTests.json +++ b/scripts/RemoteTests.json @@ -93,6 +93,26 @@ "Formats": ["{0} kbps"], "RegressionThreshold": "-60.0" }, + { + "TestName": "TcpThroughputUp", + "Local" : { + "Platform": "linux", + "Tls": ["openssl", "openssl3"], + "Arch": ["x64", "arm"], + "Exe": "secnetperf", + "Arguments": "-exec:maxtput -test:Throughput -target:$RemoteAddress -uni:1 -timed:1 -upload:12000 -tcp:1" + }, + "Variables": [ + ], + "SkipKernel": true, + "AllowLoopback": true, + "Iterations": 5, + "RemoteReadyMatcher": "Started!", + "ResultsMatcher": ".*@ (.*) kbps.*", + "FailureDefault": "Result: 0 bytes @ 0 kbps (0.0 ms).", + "Formats": ["{0} kbps"], + "RegressionThreshold": "-60.0" + }, { "TestName": "ThroughputDown", "Local": { @@ -165,6 +185,26 @@ "Formats": ["{0} kbps"], "RegressionThreshold": "-60.0" }, + { + "TestName": "TcpThroughputDown", + "Local" : { + "Platform": "linux", + "Tls": ["openssl", "openssl3"], + "Arch": ["x64", "arm"], + "Exe": "secnetperf", + "Arguments": "-exec:maxtput -test:Throughput -target:$RemoteAddress -uni:1 -timed:1 -download:12000 -tcp:1" + }, + "Variables": [ + ], + "SkipKernel": true, + "AllowLoopback": true, + "Iterations": 5, + "RemoteReadyMatcher": "Started!", + "ResultsMatcher": ".*@ (.*) kbps.*", + "FailureDefault": "Result: 0 bytes @ 0 kbps (0.0 ms).", + "Formats": ["{0} kbps"], + "RegressionThreshold": "-60.0" + }, { "TestName": "RPS", "Local": { diff --git a/scripts/TcpTests.json b/scripts/TcpTests.json index 886fe4a1c2..527c4ce7d0 100644 --- a/scripts/TcpTests.json +++ b/scripts/TcpTests.json @@ -22,6 +22,25 @@ "ResultsMatcher": ".*@ (.*) kbps.*", "Formats": ["{0} kbps"], "RegressionThreshold": "-8.0" + }, + { + "TestName": "TcpThroughputUp", + "Local" : { + "Platform": "linux", + "Tls": ["openssl", "openssl3"], + "Arch": ["x64", "arm"], + "Exe": "secnetperf", + "Arguments": "-test:Throughput -target:$RemoteAddress -bind:$LocalAddress:4434 -ip:4 -uni:1 -upload:2000000000 -tcp:1" + }, + "Variables": [ + ], + "AllowLoopback": true, + "Iterations": 5, + "RemoteReadyMatcher": "Started!", + "ResultsMatcher": ".*@ (.*) kbps.*", + "FailureDefault": "Result: 0 bytes @ 0 kbps (0.0 ms).", + "Formats": ["{0} kbps"], + "RegressionThreshold": "-60.0" } ] } diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 0127285857..eacd550484 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -339,6 +339,8 @@ CxPlatDataPathCalculateFeatureSupport( Datapath->RecvBlockSize = Datapath->RecvBlockBufferOffset + CXPLAT_SMALL_IO_BUFFER_SIZE; } + + Datapath->Features |= CXPLAT_DATAPATH_FEATURE_TCP; } void @@ -375,6 +377,14 @@ DataPathInitialize( return QUIC_STATUS_INVALID_PARAMETER; } } + if (TcpCallbacks != NULL) { + if (TcpCallbacks->Accept == NULL || + TcpCallbacks->Connect == NULL || + TcpCallbacks->Receive == NULL || + TcpCallbacks->SendComplete == NULL) { + return QUIC_STATUS_INVALID_PARAMETER; + } + } if (!CxPlatWorkersLazyStart(Config)) { return QUIC_STATUS_OUT_OF_MEMORY; @@ -401,6 +411,10 @@ DataPathInitialize( if (UdpCallbacks) { Datapath->UdpHandlers = *UdpCallbacks; } + if (TcpCallbacks) { + Datapath->TcpHandlers = *TcpCallbacks; + } + Datapath->PartitionCount = PartitionCount; Datapath->Features = CXPLAT_DATAPATH_FEATURE_LOCAL_PORT_SHARING; CxPlatRefInitializeEx(&Datapath->RefCount, Datapath->PartitionCount); @@ -544,33 +558,20 @@ CxPlatSocketConfigureRss( #endif } -// -// Socket context interface. It abstracts a (generally per-processor) UDP socket -// and the corresponding logic/functionality like send and receive processing. -// - QUIC_STATUS -CxPlatSocketContextInitialize( - _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext, - _In_ const CXPLAT_UDP_CONFIG* Config, - _In_ const uint16_t PartitionIndex +CxPlatSocketContextSqeInitialize( + _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext ) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; - int Result = 0; - int Option = 0; - QUIC_ADDR MappedAddress = {0}; - socklen_t AssignedLocalAddressLength = 0; + CXPLAT_SOCKET* Binding = SocketContext->Binding; BOOLEAN ShutdownSqeInitialized = FALSE; BOOLEAN IoSqeInitialized = FALSE; BOOLEAN FlushTxInitialized = FALSE; - CXPLAT_SOCKET* Binding = SocketContext->Binding; - CXPLAT_DATAPATH* Datapath = Binding->Datapath; - - CXPLAT_DBG_ASSERT(PartitionIndex < Datapath->PartitionCount); - SocketContext->DatapathPartition = &Datapath->Partitions[PartitionIndex]; - CxPlatRefIncrement(&SocketContext->DatapathPartition->RefCount); + SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; + SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; + SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; if (!CxPlatSqeInitialize( SocketContext->DatapathPartition->EventQ, @@ -617,14 +618,64 @@ CxPlatSocketContextInitialize( } FlushTxInitialized = TRUE; + SocketContext->SqeInitialized = TRUE; + +Exit: + + if (QUIC_FAILED(Status)) { + if (ShutdownSqeInitialized) { + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe); + } + if (IoSqeInitialized) { + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe); + } + if (FlushTxInitialized) { + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe); + } + } + + return Status; +} + +// +// Socket context interface. It abstracts a (generally per-processor) UDP socket +// and the corresponding logic/functionality like send and receive processing. +// +QUIC_STATUS +CxPlatSocketContextInitialize( + _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext, + _In_ const CXPLAT_UDP_CONFIG* Config, + _In_ const uint16_t PartitionIndex, + _In_ CXPLAT_SOCKET_TYPE SocketType + ) +{ + QUIC_STATUS Status = QUIC_STATUS_SUCCESS; + int Result = 0; + int Option = 0; + QUIC_ADDR MappedAddress = {0}; + socklen_t AssignedLocalAddressLength = 0; + + CXPLAT_SOCKET* Binding = SocketContext->Binding; + CXPLAT_DATAPATH* Datapath = Binding->Datapath; + + CXPLAT_DBG_ASSERT(PartitionIndex < Datapath->PartitionCount); + SocketContext->DatapathPartition = &Datapath->Partitions[PartitionIndex]; + CxPlatRefIncrement(&SocketContext->DatapathPartition->RefCount); + + if (QUIC_FAILED(CxPlatSocketContextSqeInitialize(SocketContext)) || + SocketType == CXPLAT_SOCKET_TCP_SERVER) { + goto Exit; + } + // // Create datagram socket. // SocketContext->SocketFd = socket( AF_INET6, - SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, // TODO check if SOCK_CLOEXEC is required? - IPPROTO_UDP); + (SocketType == CXPLAT_SOCKET_UDP ? SOCK_DGRAM : SOCK_STREAM) | + SOCK_NONBLOCK | SOCK_CLOEXEC, // TODO check if SOCK_CLOEXEC is required? + SocketType == CXPLAT_SOCKET_UDP ? IPPROTO_UDP : IPPROTO_TCP); if (SocketContext->SocketFd == INVALID_SOCKET) { Status = errno; QuicTraceEvent( @@ -635,7 +686,6 @@ CxPlatSocketContextInitialize( "socket failed"); goto Exit; } - // // Set dual (IPv4 & IPv6) socket mode. // @@ -658,149 +708,130 @@ CxPlatSocketContextInitialize( goto Exit; } - // - // Set DON'T FRAG socket option. - // - // - // Windows: setsockopt IPPROTO_IP IP_DONTFRAGMENT TRUE. - // Linux: IP_DONTFRAGMENT option is not available. IP_MTU_DISCOVER/IPV6_MTU_DISCOVER - // is the apparent alternative. - // - Option = IP_PMTUDISC_PROBE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IP, - IP_MTU_DISCOVER, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IP_MTU_DISCOVER) failed"); - goto Exit; - } - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_MTU_DISCOVER, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_MTU_DISCOVER) failed"); - goto Exit; - } + if (SocketType == CXPLAT_SOCKET_UDP) { + // + // Set DON'T FRAG socket option. + // - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_DONTFRAG, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_DONTFRAG) failed"); - goto Exit; - } + // + // Windows: setsockopt IPPROTO_IP IP_DONTFRAGMENT TRUE. + // Linux: IP_DONTFRAGMENT option is not available. IP_MTU_DISCOVER/IPV6_MTU_DISCOVER + // is the apparent alternative. + // + Option = IP_PMTUDISC_PROBE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IP, + IP_MTU_DISCOVER, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IP_MTU_DISCOVER) failed"); + goto Exit; + } + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_MTU_DISCOVER, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_MTU_DISCOVER) failed"); + goto Exit; + } - // - // Set socket option to receive ancillary data about the incoming packets. - // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_DONTFRAG, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_DONTFRAG) failed"); + goto Exit; + } - // - // Windows: setsockopt IPPROTO_IPV6 IPV6_PKTINFO TRUE. - // Android: Returns EINVAL. IPV6_PKTINFO option is not present in documentation. - // IPV6_RECVPKTINFO seems like is the alternative. - // TODO: Check if this works as expected? - // - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_RECVPKTINFO, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_RECVPKTINFO) failed"); - goto Exit; - } + // + // Set socket option to receive ancillary data about the incoming packets. + // - // - // Set socket option to receive TOS (= DSCP + ECN) information from the - // incoming packet. - // - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_RECVTCLASS, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_RECVTCLASS) failed"); - goto Exit; - } + // + // Windows: setsockopt IPPROTO_IPV6 IPV6_PKTINFO TRUE. + // Android: Returns EINVAL. IPV6_PKTINFO option is not present in documentation. + // IPV6_RECVPKTINFO seems like is the alternative. + // TODO: Check if this works as expected? + // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_RECVPKTINFO, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_RECVPKTINFO) failed"); + goto Exit; + } - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IP, - IP_RECVTOS, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IP_RECVTOS) failed"); - goto Exit; - } + // + // Set socket option to receive TOS (= DSCP + ECN) information from the + // incoming packet. + // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_RECVTCLASS, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_RECVTCLASS) failed"); + goto Exit; + } -#ifdef UDP_GRO - if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { Option = TRUE; Result = setsockopt( SocketContext->SocketFd, - SOL_UDP, - UDP_GRO, + IPPROTO_IP, + IP_RECVTOS, (const void*)&Option, sizeof(Option)); if (Result == SOCKET_ERROR) { @@ -810,49 +841,43 @@ CxPlatSocketContextInitialize( "[data][%p] ERROR, %u, %s.", Binding, Status, - "setsockopt(UDP_GRO) failed"); + "setsockopt(IP_RECVTOS) failed"); goto Exit; } - } -#endif - // - // The socket is shared by multiple QUIC endpoints, so increase the receive - // buffer size. - // - Option = INT32_MAX; - Result = - setsockopt( - SocketContext->SocketFd, - SOL_SOCKET, - SO_RCVBUF, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(SO_RCVBUF) failed"); - goto Exit; - } + #ifdef UDP_GRO + if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + SOL_UDP, + UDP_GRO, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(UDP_GRO) failed"); + goto Exit; + } + } + #endif - // - // Only set SO_REUSEPORT on a server socket, otherwise the client could be - // assigned a server port (unless it's forcing sharing). - // - if (Config->Flags & CXPLAT_SOCKET_FLAG_SHARE || Config->RemoteAddress == NULL) { // - // The port is shared across processors. + // The socket is shared by multiple QUIC endpoints, so increase the receive + // buffer size. // - Option = TRUE; + Option = INT32_MAX; Result = setsockopt( SocketContext->SocketFd, SOL_SOCKET, - SO_REUSEPORT, + SO_RCVBUF, (const void*)&Option, sizeof(Option)); if (Result == SOCKET_ERROR) { @@ -862,9 +887,37 @@ CxPlatSocketContextInitialize( "[data][%p] ERROR, %u, %s.", Binding, Status, - "setsockopt(SO_REUSEPORT) failed"); + "setsockopt(SO_RCVBUF) failed"); goto Exit; } + + // + // Only set SO_REUSEPORT on a server socket, otherwise the client could be + // assigned a server port (unless it's forcing sharing). + // + if (Config->Flags & CXPLAT_SOCKET_FLAG_SHARE || Config->RemoteAddress == NULL) { + // + // The port is shared across processors. + // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + SOL_SOCKET, + SO_REUSEPORT, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(SO_REUSEPORT) failed"); + goto Exit; + } + } } CxPlatCopyMemory(&MappedAddress, &Binding->LocalAddress, sizeof(MappedAddress)); @@ -888,6 +941,10 @@ CxPlatSocketContextInitialize( goto Exit; } + QUIC_ADDR_STR LocalAddressStr; + QUIC_ADDR_STR RemoteAddressStr; + QuicAddrToString(&MappedAddress, &LocalAddressStr); + if (Config->RemoteAddress != NULL) { CxPlatZeroMemory(&MappedAddress, sizeof(MappedAddress)); CxPlatConvertToMappedV6(Config->RemoteAddress, &MappedAddress); @@ -895,13 +952,13 @@ CxPlatSocketContextInitialize( if (MappedAddress.Ipv6.sin6_family == QUIC_ADDRESS_FAMILY_INET6) { MappedAddress.Ipv6.sin6_family = AF_INET6; } - + QuicAddrToString(&MappedAddress, &RemoteAddressStr); Result = connect( SocketContext->SocketFd, &MappedAddress.Ip, sizeof(MappedAddress)); - if (Result == SOCKET_ERROR) { + if (Result == SOCKET_ERROR && errno != EINPROGRESS) { Status = errno; QuicTraceEvent( DatapathErrorStatus, @@ -911,7 +968,7 @@ CxPlatSocketContextInitialize( "connect failed"); goto Exit; } - Binding->Connected = TRUE; + Binding->Connected = SocketType != CXPLAT_SOCKET_TCP; } // @@ -952,22 +1009,28 @@ CxPlatSocketContextInitialize( Binding->LocalAddress.Ipv6.sin6_family = QUIC_ADDRESS_FAMILY_INET6; } - SocketContext->SqeInitialized = TRUE; + if (SocketType == CXPLAT_SOCKET_TCP_LISTENER) { + Result = + listen( + SocketContext->SocketFd, + 100); + if (Result == SOCKET_ERROR) { + int error = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + error, + "listen"); + goto Exit; + } + } Exit: if (QUIC_FAILED(Status)) { close(SocketContext->SocketFd); SocketContext->SocketFd = INVALID_SOCKET; - if (ShutdownSqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe); - } - if (IoSqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe); - } - if (FlushTxInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe); - } } return Status; @@ -1007,6 +1070,11 @@ CxPlatSocketContextUninitializeComplete( TxEntry)); } + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_LISTENER && SocketContext->AcceptSocket) { + SocketDelete(SocketContext->AcceptSocket); + SocketContext->AcceptSocket = NULL; + } + if (SocketContext->SocketFd != INVALID_SOCKET) { epoll_ctl(*SocketContext->DatapathPartition->EventQ, EPOLL_CTL_DEL, SocketContext->SocketFd, NULL); close(SocketContext->SocketFd); @@ -1040,6 +1108,25 @@ CxPlatSocketContextUninitialize( if (!SocketContext->IoStarted) { CxPlatSocketContextUninitializeComplete(SocketContext); } else { + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP || + SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_SERVER) { + // + // For TCP sockets, we should shutdown the socket before closing it. + // + SocketContext->Binding->DisconnectIndicated = TRUE; + if (shutdown(SocketContext->SocketFd, SHUT_RDWR) != 0) { + int Errno = errno; + if (Errno != ENOTCONN) { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Errno, + "shutdown"); + } + } + } + CxPlatRundownReleaseAndWait(&SocketContext->UpcallRundown); // Block until all upcalls complete. // @@ -1093,8 +1180,9 @@ SocketCreateUdp( ) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; - BOOLEAN IsServerSocket = Config->RemoteAddress == NULL; - uint16_t SocketCount = IsServerSocket ? (uint16_t)CxPlatProcMaxCount() : 1; + const BOOLEAN IsServerSocket = Config->RemoteAddress == NULL; + const BOOLEAN NumPerProcessorSockets = IsServerSocket && Datapath->PartitionCount > 1; + const uint16_t SocketCount = NumPerProcessorSockets ? (uint16_t)CxPlatProcMaxCount() : 1; CXPLAT_DBG_ASSERT(Datapath->UdpHandlers.Receive != NULL || Config->Flags & CXPLAT_SOCKET_FLAG_PCP); @@ -1122,8 +1210,10 @@ SocketCreateUdp( CxPlatZeroMemory(Binding, BindingLength); Binding->Datapath = Datapath; Binding->ClientContext = Config->CallbackContext; + Binding->NumPerProcessorSockets = NumPerProcessorSockets; Binding->HasFixedRemoteAddress = (Config->RemoteAddress != NULL); Binding->Mtu = CXPLAT_MAX_MTU; + Binding->Type = CXPLAT_SOCKET_UDP; CxPlatRefInitializeEx(&Binding->RefCount, SocketCount); if (Config->LocalAddress) { CxPlatConvertToMappedV6(Config->LocalAddress, &Binding->LocalAddress); @@ -1137,9 +1227,6 @@ SocketCreateUdp( for (uint32_t i = 0; i < SocketCount; i++) { Binding->SocketContexts[i].Binding = Binding; Binding->SocketContexts[i].SocketFd = INVALID_SOCKET; - Binding->SocketContexts[i].ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; - Binding->SocketContexts[i].IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; - Binding->SocketContexts[i].FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; CxPlatListInitializeHead(&Binding->SocketContexts[i].TxQueue); CxPlatLockInitialize(&Binding->SocketContexts[i].TxQueueLock); CxPlatRundownInitialize(&Binding->SocketContexts[i].UpcallRundown); @@ -1150,7 +1237,8 @@ SocketCreateUdp( CxPlatSocketContextInitialize( &Binding->SocketContexts[i], Config, - Config->RemoteAddress ? Config->PartitionIndex : (i % Datapath->PartitionCount)); + Config->RemoteAddress ? Config->PartitionIndex : (i % Datapath->PartitionCount), + Binding->Type); if (QUIC_FAILED(Status)) { goto Exit; } @@ -1197,6 +1285,126 @@ SocketCreateUdp( return Status; } + +_IRQL_requires_max_(PASSIVE_LEVEL) +QUIC_STATUS +CxPlatSocketCreateTcpInternal( + _In_ CXPLAT_DATAPATH* Datapath, + _In_ CXPLAT_SOCKET_TYPE Type, + _In_opt_ const QUIC_ADDR* LocalAddress, + _In_opt_ const QUIC_ADDR* RemoteAddress, + _In_opt_ void* RecvCallbackContext, + _Out_ CXPLAT_SOCKET** NewBinding + ) +{ + QUIC_STATUS Status; + uint16_t PartitionIndex; + BOOLEAN IsServerSocket = RemoteAddress == NULL; + + CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL); + + CXPLAT_SOCKET_CONTEXT* SocketContext = NULL; + uint32_t SocketLength = sizeof(CXPLAT_SOCKET) + sizeof(CXPLAT_SOCKET_CONTEXT); + CXPLAT_SOCKET* Binding = CXPLAT_ALLOC_PAGED(SocketLength, QUIC_POOL_SOCKET); + if (Binding == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "CXPLAT_SOCKET", + SocketLength); + Status = QUIC_STATUS_OUT_OF_MEMORY; + goto Exit; + } + + QuicTraceEvent( + DatapathCreated, + "[data][%p] Created, local=%!ADDR!, remote=%!ADDR!", + Binding, + CASTED_CLOG_BYTEARRAY(LocalAddress ? sizeof(*LocalAddress) : 0, LocalAddress), + CASTED_CLOG_BYTEARRAY(RemoteAddress ? sizeof(*RemoteAddress) : 0, RemoteAddress)); + + CxPlatZeroMemory(Binding, SocketLength); + Binding->Datapath = Datapath; + Binding->ClientContext = RecvCallbackContext; + Binding->HasFixedRemoteAddress = TRUE; + Binding->Mtu = CXPLAT_MAX_MTU; + Binding->Type = Type; + if (LocalAddress) { + CxPlatConvertToMappedV6(LocalAddress, &Binding->LocalAddress); + } else { + Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; + } + PartitionIndex = + RemoteAddress ? + ((uint16_t)(CxPlatProcCurrentNumber() % Datapath->PartitionCount)) : 0; + CxPlatRefInitializeEx(&Binding->RefCount, 1); + + SocketContext = &Binding->SocketContexts[0]; + SocketContext->Binding = Binding; + SocketContext->SocketFd = INVALID_SOCKET; + CxPlatListInitializeHead(&SocketContext->TxQueue); + CxPlatLockInitialize(&SocketContext->TxQueueLock); + CxPlatRundownInitialize(&SocketContext->UpcallRundown); + + CXPLAT_UDP_CONFIG Config = { + .LocalAddress = LocalAddress, + .RemoteAddress = RemoteAddress + }; + Status = + CxPlatSocketContextInitialize( + SocketContext, + &Config, + PartitionIndex, + Binding->Type); + if (QUIC_FAILED(Status)) { + goto Exit; + } + + if (IsServerSocket) { + // + // The return value is being ignored here, as if a system does not support + // bpf we still want the server to work. If this happens, the sockets will + // round robin, but each flow will be sent to the same socket, just not + // based on RSS. + // + (void)CxPlatSocketConfigureRss(SocketContext, 1); + } + + if (Type == CXPLAT_SOCKET_TCP_SERVER) { + *NewBinding = Binding; + Binding = NULL; + goto Exit; + } + + CxPlatConvertFromMappedV6(&Binding->LocalAddress, &Binding->LocalAddress); + Binding->LocalAddress.Ipv6.sin6_scope_id = 0; + + if (RemoteAddress != NULL) { + Binding->RemoteAddress = *RemoteAddress; + } else { + Binding->RemoteAddress.Ipv4.sin_port = 0; + } + + // + // Must set output pointer before starting receive path, as the receive path + // will try to use the output. + // + *NewBinding = Binding; + + CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_ADD, EPOLLIN | EPOLLOUT); + SocketContext->IoStarted = TRUE; + + Binding = NULL; + +Exit: + + if (Binding != NULL) { + CxPlatSocketDelete(Binding); + } + + return Status; +} + _IRQL_requires_max_(PASSIVE_LEVEL) QUIC_STATUS SocketCreateTcp( @@ -1207,12 +1415,14 @@ SocketCreateTcp( _Out_ CXPLAT_SOCKET** Socket ) { - UNREFERENCED_PARAMETER(Datapath); - UNREFERENCED_PARAMETER(LocalAddress); - UNREFERENCED_PARAMETER(RemoteAddress); - UNREFERENCED_PARAMETER(CallbackContext); - UNREFERENCED_PARAMETER(Socket); - return QUIC_STATUS_NOT_SUPPORTED; + return + CxPlatSocketCreateTcpInternal( + Datapath, + CXPLAT_SOCKET_TCP, + LocalAddress, + RemoteAddress, + CallbackContext, + Socket); } _IRQL_requires_max_(PASSIVE_LEVEL) @@ -1224,11 +1434,149 @@ SocketCreateTcpListener( _Out_ CXPLAT_SOCKET** Socket ) { - UNREFERENCED_PARAMETER(Datapath); - UNREFERENCED_PARAMETER(LocalAddress); - UNREFERENCED_PARAMETER(CallbackContext); - UNREFERENCED_PARAMETER(Socket); - return QUIC_STATUS_NOT_SUPPORTED; + QUIC_STATUS Status; + + CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL); + + CXPLAT_SOCKET_CONTEXT* SocketContext = NULL; + uint32_t SocketLength = sizeof(CXPLAT_SOCKET) + sizeof(CXPLAT_SOCKET_CONTEXT); + CXPLAT_SOCKET* Binding = CXPLAT_ALLOC_PAGED(SocketLength, QUIC_POOL_SOCKET); + if (Binding == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "CXPLAT_SOCKET", + SocketLength); + Status = QUIC_STATUS_OUT_OF_MEMORY; + goto Exit; + } + + QuicTraceEvent( + DatapathCreated, + "[data][%p] Created, local=%!ADDR!, remote=%!ADDR!", + Binding, + CASTED_CLOG_BYTEARRAY(LocalAddress ? sizeof(*LocalAddress) : 0, LocalAddress), + CASTED_CLOG_BYTEARRAY(0, NULL)); + + CxPlatZeroMemory(Binding, SocketLength); + Binding->Datapath = Datapath; + Binding->ClientContext = CallbackContext; + Binding->HasFixedRemoteAddress = FALSE; + Binding->Mtu = CXPLAT_MAX_MTU; + Binding->Type = CXPLAT_SOCKET_TCP_LISTENER; + if (LocalAddress) { + CxPlatConvertToMappedV6(LocalAddress, &Binding->LocalAddress); + if (Binding->LocalAddress.Ip.sa_family == AF_UNSPEC) { + Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; + } + } else { + Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; + } + CxPlatRefInitializeEx(&Binding->RefCount, 1); + + SocketContext = &Binding->SocketContexts[0]; + SocketContext->Binding = Binding; + SocketContext->SocketFd = INVALID_SOCKET; + CxPlatListInitializeHead(&SocketContext->TxQueue); + CxPlatLockInitialize(&SocketContext->TxQueueLock); + CxPlatRundownInitialize(&SocketContext->UpcallRundown); + + CXPLAT_UDP_CONFIG Config = { + .LocalAddress = LocalAddress, + .RemoteAddress = NULL + }; + Status = + CxPlatSocketContextInitialize( + SocketContext, + &Config, + 0, + Binding->Type); + if (QUIC_FAILED(Status)) { + goto Exit; + } + + *Socket = Binding; + + CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_ADD, EPOLLIN); + + SocketContext->IoStarted = TRUE; + Binding = NULL; + Status = QUIC_STATUS_SUCCESS; + +Exit: + + if (Binding != NULL) { + CxPlatSocketDelete(Binding); + } + + return Status; +} + +void +CxPlatSocketContextAcceptCompletion( + _In_ CXPLAT_SOCKET_CONTEXT* SocketContext, + _In_ CXPLAT_CQE* Cqe + ) +{ + UNREFERENCED_PARAMETER(Cqe); + CXPLAT_DATAPATH* Datapath = SocketContext->Binding->Datapath; + + QUIC_STATUS Status = + CxPlatSocketCreateTcpInternal( + Datapath, + CXPLAT_SOCKET_TCP_SERVER, + NULL, + NULL, + NULL, + (CXPLAT_SOCKET**)&SocketContext->AcceptSocket); + if (QUIC_FAILED(Status)) { + goto Error; + } + + SocketContext->AcceptSocket->SocketContexts[0].SocketFd = accept(SocketContext->SocketFd, NULL, NULL); + if (SocketContext->AcceptSocket->SocketContexts[0].SocketFd == INVALID_SOCKET) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "accept failed"); + fprintf(stderr, "Accept failure: %d\n", Status); + goto Error; + } + + CxPlatSocketContextSetEvents(&SocketContext->AcceptSocket->SocketContexts[0], EPOLL_CTL_ADD, EPOLLIN); + SocketContext->AcceptSocket->SocketContexts[0].IoStarted = TRUE; + Datapath->TcpHandlers.Accept( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + SocketContext->AcceptSocket, + &SocketContext->AcceptSocket->ClientContext); + + SocketContext->AcceptSocket = NULL; + +Error: + + if (SocketContext->AcceptSocket != NULL) { + SocketDelete(SocketContext->AcceptSocket); + SocketContext->AcceptSocket = NULL; + } +} + +void +CxPlatSocketContextConnectCompletion( + _In_ CXPLAT_SOCKET_CONTEXT* SocketContext, + _In_ CXPLAT_CQE* Cqe + ) +{ + UNREFERENCED_PARAMETER(Cqe); + CXPLAT_DATAPATH* Datapath = SocketContext->Binding->Datapath; + + Datapath->TcpHandlers.Connect( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + TRUE); } void @@ -1247,8 +1595,8 @@ SocketDelete( Socket->Uninitialized = TRUE; #endif - const uint32_t SocketCount = - Socket->HasFixedRemoteAddress ? 1 : Socket->Datapath->PartitionCount; + const uint16_t SocketCount = + Socket->NumPerProcessorSockets ? (uint16_t)CxPlatProcMaxCount() : 1; for (uint32_t i = 0; i < SocketCount; ++i) { CxPlatSocketContextUninitialize(&Socket->SocketContexts[i]); @@ -1288,18 +1636,32 @@ CxPlatSocketHandleErrors( ErrNum, "Socket error event"); - // - // Send unreachable notification to MsQuic if any related - // errors were received. - // - if (ErrNum == ECONNREFUSED || - ErrNum == EHOSTUNREACH || - ErrNum == ENETUNREACH) { - if (!SocketContext->Binding->PcpBinding) { - SocketContext->Binding->Datapath->UdpHandlers.Unreachable( + if (SocketContext->Binding->Type == CXPLAT_SOCKET_UDP) { + // + // Send unreachable notification to MsQuic if any related + // errors were received. + // + if (ErrNum == ECONNREFUSED || + ErrNum == EHOSTUNREACH || + ErrNum == ENETUNREACH) { + if (!SocketContext->Binding->PcpBinding) { + SocketContext->Binding->Datapath->UdpHandlers.Unreachable( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + &SocketContext->Binding->RemoteAddress); + } + } + } else if (ErrNum == ENOTSOCK || + ErrNum == EINTR || + ErrNum == ECANCELED || + ErrNum == ECONNABORTED || + ErrNum == ECONNRESET) { + if (!SocketContext->Binding->DisconnectIndicated) { + SocketContext->Binding->DisconnectIndicated = TRUE; + SocketContext->Binding->Datapath->TcpHandlers.Connect( SocketContext->Binding, SocketContext->Binding->ClientContext, - &SocketContext->Binding->RemoteAddress); + FALSE); } } } @@ -1597,15 +1959,97 @@ CxPlatSocketReceiveMessages( } } +void +CxPlatSocketTcpRecvComplete( + _In_ CXPLAT_SOCKET_CONTEXT* SocketContext + ) +{ + CXPLAT_DATAPATH_PARTITION* DatapathPartition = SocketContext->DatapathPartition; + DATAPATH_RX_IO_BLOCK* IoBlock = NULL; + + uint32_t RetryCount = 0; + do { + IoBlock = CxPlatPoolAlloc(&DatapathPartition->RecvBlockPool); + } while (IoBlock == NULL && ++RetryCount < 10); + if (IoBlock == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "DATAPATH_RX_IO_BLOCK", + 0); + goto Exit; + } + + IoBlock->OwningPool = &DatapathPartition->RecvBlockPool; + IoBlock->Route.State = RouteResolved; + IoBlock->Route.Queue = SocketContext; + IoBlock->RefCount = 0; + + uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset; + int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, CXPLAT_LARGE_IO_BUFFER_SIZE); + + if (NumberOfBytesTransferred == 0) { + if (!SocketContext->Binding->DisconnectIndicated) { + SocketContext->Binding->DisconnectIndicated = TRUE; + SocketContext->Binding->Datapath->TcpHandlers.Connect( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + FALSE); + } + goto Exit; + } else if (NumberOfBytesTransferred < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + errno, + "read failed"); + } + goto Exit; + } else { + DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1); + Datagram->IoBlock = IoBlock; + CXPLAT_RECV_DATA* Data = &Datagram->Data; + + Data->Next = NULL; + Data->Buffer = Buffer; + Data->BufferLength = NumberOfBytesTransferred; + Data->Route = &IoBlock->Route; + Data->PartitionIndex = SocketContext->DatapathPartition->PartitionIndex; + Data->TypeOfService = 0; + Data->Allocated = TRUE; + Data->Route->DatapathType = Data->DatapathType = CXPLAT_DATAPATH_TYPE_USER; + Data->QueuedOnConnection = FALSE; + IoBlock->RefCount++; + IoBlock = NULL; + + SocketContext->Binding->Datapath->TcpHandlers.Receive( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + Data); + } + +Exit: + + if (IoBlock) { + CxPlatPoolFree(&DatapathPartition->RecvBlockPool, IoBlock); + } +} + void CxPlatSocketReceive( _In_ CXPLAT_SOCKET_CONTEXT* SocketContext ) { - if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { - CxPlatSocketReceiveCoalesced(SocketContext); + if (SocketContext->Binding->Type == CXPLAT_SOCKET_UDP) { + if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { + CxPlatSocketReceiveCoalesced(SocketContext); + } else { + CxPlatSocketReceiveMessages(SocketContext); + } } else { - CxPlatSocketReceiveMessages(SocketContext); + CxPlatSocketTcpRecvComplete(SocketContext); } } @@ -1638,7 +2082,7 @@ SendDataAlloc( ) { CXPLAT_DBG_ASSERT(Socket != NULL); - CXPLAT_DBG_ASSERT(Config->MaxPacketSize <= MAX_UDP_PAYLOAD_LENGTH); + CXPLAT_DBG_ASSERT(Socket->Type != CXPLAT_SOCKET_UDP || Config->MaxPacketSize <= MAX_UDP_PAYLOAD_LENGTH); if (Config->Route->Queue == NULL) { Config->Route->Queue = &Socket->SocketContexts[0]; } @@ -1652,7 +2096,10 @@ SendDataAlloc( SendData->ClientBuffer.Buffer = SendData->Buffer; SendData->ClientBuffer.Length = 0; SendData->TotalSize = 0; - SendData->SegmentSize = Config->MaxPacketSize; + SendData->SegmentSize = + (Socket->Type != CXPLAT_SOCKET_UDP || + Socket->Datapath->Features & CXPLAT_DATAPATH_FEATURE_SEND_SEGMENTATION) + ? Config->MaxPacketSize : 0; SendData->BufferCount = 0; SendData->AlreadySentCount = 0; SendData->ControlBufferLength = 0; @@ -1837,6 +2284,13 @@ SocketSend( CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_MOD, EPOLLIN | EPOLLOUT); Status = QUIC_STATUS_SUCCESS; } else { + if (Socket->Type != CXPLAT_SOCKET_UDP) { + SocketContext->Binding->Datapath->TcpHandlers.SendComplete( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + errno, + SendData->TotalSize); + } CxPlatSendDataFree(SendData); } @@ -2000,6 +2454,34 @@ CxPlatSendDataSendMessages( return TRUE; } +BOOLEAN +CxPlatSendDataSendTcp( + _In_ CXPLAT_SEND_DATA* SendData + ) +{ + uint32_t TotalSize = SendData->TotalSize; + uint8_t *Buffer = SendData->Buffer; + while (TotalSize > 0) { + int BytesSent = + send( + SendData->SocketContext->SocketFd, + Buffer, + TotalSize, + 0); + if (BytesSent < 0) { + // forcibly send inline + if (errno == EAGAIN || errno == EWOULDBLOCK) { + continue; + } + return FALSE; + } + Buffer += BytesSent; + TotalSize -= BytesSent; + } + + return TRUE; +} + QUIC_STATUS CxPlatSendDataSend( _In_ CXPLAT_SEND_DATA* SendData @@ -2007,35 +2489,47 @@ CxPlatSendDataSend( { CXPLAT_DBG_ASSERT(SendData != NULL); CXPLAT_DBG_ASSERT(SendData->AlreadySentCount < CXPLAT_MAX_IO_BATCH_SIZE); + CXPLAT_SOCKET_TYPE SocketType = SendData->SocketContext->Binding->Type; QUIC_STATUS Status = QUIC_STATUS_SUCCESS; CXPLAT_SOCKET_CONTEXT* SocketContext = SendData->SocketContext; BOOLEAN Success = + SocketType == CXPLAT_SOCKET_UDP ? #ifdef UDP_SEGMENT - SendData->SegmentationSupported ? - CxPlatSendDataSendSegmented(SendData) : + SendData->SegmentationSupported ? + CxPlatSendDataSendSegmented(SendData) : #endif - CxPlatSendDataSendMessages(SendData); + CxPlatSendDataSendMessages(SendData) : + CxPlatSendDataSendTcp(SendData); if (!Success) { if (errno == EAGAIN || errno == EWOULDBLOCK) { Status = QUIC_STATUS_PENDING; } else { Status = errno; + if (SocketType == CXPLAT_SOCKET_UDP) { #ifdef UDP_SEGMENT - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - SocketContext->Binding, - Status, - "sendmsg (GSO) failed"); + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "sendmsg (GSO) failed"); #else - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - SocketContext->Binding, - Status, - "sendmmsg failed"); + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "sendmmsg failed"); #endif + } else { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "send failed"); + } if (Status == EIO && SocketContext->Binding->Datapath->Features & CXPLAT_DATAPATH_FEATURE_SEND_SEGMENTATION) { @@ -2106,6 +2600,13 @@ CxPlatSocketContextFlushTxQueue( CxPlatLockAcquire(&SocketContext->TxQueueLock); CxPlatListRemoveHead(&SocketContext->TxQueue); + if (SocketContext->Binding->Type != CXPLAT_SOCKET_UDP) { + SocketContext->Binding->Datapath->TcpHandlers.SendComplete( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + errno, + SendData->TotalSize); + } CxPlatSendDataFree(SendData); if (!CxPlatListIsEmpty(&SocketContext->TxQueue)) { SendData = @@ -2138,10 +2639,19 @@ CxPlatDataPathSocketProcessIoCompletion( CxPlatSocketHandleErrors(SocketContext); } if (EPOLLIN & Cqe->events) { - CxPlatSocketReceive(SocketContext); + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_LISTENER) { + CxPlatSocketContextAcceptCompletion(SocketContext, Cqe); + } else { + CxPlatSocketReceive(SocketContext); + } } if (EPOLLOUT & Cqe->events) { - CxPlatSocketContextFlushTxQueue(SocketContext, TRUE); + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP && !SocketContext->Binding->Connected) { + CxPlatSocketContextConnectCompletion(SocketContext, Cqe); + SocketContext->Binding->Connected = TRUE; + } else { + CxPlatSocketContextFlushTxQueue(SocketContext, TRUE); + } } CxPlatRundownRelease(&SocketContext->UpcallRundown); } diff --git a/src/platform/platform_internal.h b/src/platform/platform_internal.h index 70a6e2d759..4d26f842ec 100644 --- a/src/platform/platform_internal.h +++ b/src/platform/platform_internal.h @@ -119,6 +119,13 @@ typedef struct DATAPATH_IO_SQE { DATAPATH_SQE DatapathSqe; } DATAPATH_IO_SQE; +typedef enum CXPLAT_SOCKET_TYPE { + CXPLAT_SOCKET_UDP = 0, + CXPLAT_SOCKET_TCP_LISTENER = 1, + CXPLAT_SOCKET_TCP = 2, + CXPLAT_SOCKET_TCP_SERVER = 3 +} CXPLAT_SOCKET_TYPE; + #ifdef _KERNEL_MODE #define CXPLAT_BASE_REG_PATH L"\\Registry\\Machine\\System\\CurrentControlSet\\Services\\MsQuic\\Parameters\\" @@ -184,13 +191,6 @@ typedef struct CX_PLATFORM { } CX_PLATFORM; -typedef enum CXPLAT_SOCKET_TYPE { - CXPLAT_SOCKET_UDP = 0, - CXPLAT_SOCKET_TCP_LISTENER = 1, - CXPLAT_SOCKET_TCP = 2, - CXPLAT_SOCKET_TCP_SERVER = 3 -} CXPLAT_SOCKET_TYPE; - // // Represents a single IO completion port and thread for processing work that is // completed on a single processor. @@ -751,6 +751,8 @@ typedef struct QUIC_CACHEALIGN CXPLAT_SOCKET_CONTEXT { uint8_t Freed : 1; #endif + CXPLAT_SOCKET* AcceptSocket; + } CXPLAT_SOCKET_CONTEXT; // @@ -784,11 +786,27 @@ typedef struct CXPLAT_SOCKET { // BOOLEAN Connected : 1; + // + // Socket type. + // + uint8_t Type : 2; // CXPLAT_SOCKET_TYPE + + // + // Flag indicates the socket has more than one socket, affinitized to all + // the processors. + // + uint8_t NumPerProcessorSockets : 1; + // // Flag indicates the socket has a default remote destination. // BOOLEAN HasFixedRemoteAddress : 1; + // + // Flag indicates the socket indicated a disconnect event. + // + uint8_t DisconnectIndicated : 1; + // // Flag indicates the binding is being used for PCP. //