From eb610cc7ec70ee8389833a7bacdfcd4e2bd99f9b Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 4 Oct 2023 17:06:01 -0700 Subject: [PATCH 1/9] Linux TCP support --- src/platform/datapath_epoll.c | 886 ++++++++++++++++++++++++---------- 1 file changed, 637 insertions(+), 249 deletions(-) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 0127285857..118e3e11fe 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -339,6 +339,8 @@ CxPlatDataPathCalculateFeatureSupport( Datapath->RecvBlockSize = Datapath->RecvBlockBufferOffset + CXPLAT_SMALL_IO_BUFFER_SIZE; } + + Datapath->Features |= CXPLAT_DATAPATH_FEATURE_TCP; } void @@ -375,6 +377,14 @@ DataPathInitialize( return QUIC_STATUS_INVALID_PARAMETER; } } + if (TcpCallbacks != NULL) { + if (TcpCallbacks->Accept == NULL || + TcpCallbacks->Connect == NULL || + TcpCallbacks->Receive == NULL || + TcpCallbacks->SendComplete == NULL) { + return QUIC_STATUS_INVALID_PARAMETER; + } + } if (!CxPlatWorkersLazyStart(Config)) { return QUIC_STATUS_OUT_OF_MEMORY; @@ -401,6 +411,10 @@ DataPathInitialize( if (UdpCallbacks) { Datapath->UdpHandlers = *UdpCallbacks; } + if (TcpCallbacks) { + Datapath->TcpHandlers = *TcpCallbacks; + } + Datapath->PartitionCount = PartitionCount; Datapath->Features = CXPLAT_DATAPATH_FEATURE_LOCAL_PORT_SHARING; CxPlatRefInitializeEx(&Datapath->RefCount, Datapath->PartitionCount); @@ -553,7 +567,8 @@ QUIC_STATUS CxPlatSocketContextInitialize( _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext, _In_ const CXPLAT_UDP_CONFIG* Config, - _In_ const uint16_t PartitionIndex + _In_ const uint16_t PartitionIndex, + _In_ CXPLAT_SOCKET_TYPE SocketType ) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; @@ -623,8 +638,9 @@ CxPlatSocketContextInitialize( SocketContext->SocketFd = socket( AF_INET6, - SOCK_DGRAM | SOCK_NONBLOCK | SOCK_CLOEXEC, // TODO check if SOCK_CLOEXEC is required? - IPPROTO_UDP); + (SocketType == CXPLAT_SOCKET_UDP ? SOCK_DGRAM : SOCK_STREAM) | + SOCK_NONBLOCK | SOCK_CLOEXEC, // TODO check if SOCK_CLOEXEC is required? + SocketType == CXPLAT_SOCKET_UDP ? IPPROTO_UDP : IPPROTO_TCP); if (SocketContext->SocketFd == INVALID_SOCKET) { Status = errno; QuicTraceEvent( @@ -658,149 +674,130 @@ CxPlatSocketContextInitialize( goto Exit; } - // - // Set DON'T FRAG socket option. - // - // - // Windows: setsockopt IPPROTO_IP IP_DONTFRAGMENT TRUE. - // Linux: IP_DONTFRAGMENT option is not available. IP_MTU_DISCOVER/IPV6_MTU_DISCOVER - // is the apparent alternative. - // - Option = IP_PMTUDISC_PROBE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IP, - IP_MTU_DISCOVER, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IP_MTU_DISCOVER) failed"); - goto Exit; - } - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_MTU_DISCOVER, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_MTU_DISCOVER) failed"); - goto Exit; - } + if (SocketType == CXPLAT_SOCKET_UDP) { + // + // Set DON'T FRAG socket option. + // - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_DONTFRAG, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_DONTFRAG) failed"); - goto Exit; - } + // + // Windows: setsockopt IPPROTO_IP IP_DONTFRAGMENT TRUE. + // Linux: IP_DONTFRAGMENT option is not available. IP_MTU_DISCOVER/IPV6_MTU_DISCOVER + // is the apparent alternative. + // + Option = IP_PMTUDISC_PROBE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IP, + IP_MTU_DISCOVER, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IP_MTU_DISCOVER) failed"); + goto Exit; + } + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_MTU_DISCOVER, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_MTU_DISCOVER) failed"); + goto Exit; + } - // - // Set socket option to receive ancillary data about the incoming packets. - // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_DONTFRAG, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_DONTFRAG) failed"); + goto Exit; + } - // - // Windows: setsockopt IPPROTO_IPV6 IPV6_PKTINFO TRUE. - // Android: Returns EINVAL. IPV6_PKTINFO option is not present in documentation. - // IPV6_RECVPKTINFO seems like is the alternative. - // TODO: Check if this works as expected? - // - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_RECVPKTINFO, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_RECVPKTINFO) failed"); - goto Exit; - } + // + // Set socket option to receive ancillary data about the incoming packets. + // - // - // Set socket option to receive TOS (= DSCP + ECN) information from the - // incoming packet. - // - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IPV6, - IPV6_RECVTCLASS, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IPV6_RECVTCLASS) failed"); - goto Exit; - } + // + // Windows: setsockopt IPPROTO_IPV6 IPV6_PKTINFO TRUE. + // Android: Returns EINVAL. IPV6_PKTINFO option is not present in documentation. + // IPV6_RECVPKTINFO seems like is the alternative. + // TODO: Check if this works as expected? + // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_RECVPKTINFO, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_RECVPKTINFO) failed"); + goto Exit; + } - Option = TRUE; - Result = - setsockopt( - SocketContext->SocketFd, - IPPROTO_IP, - IP_RECVTOS, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(IP_RECVTOS) failed"); - goto Exit; - } + // + // Set socket option to receive TOS (= DSCP + ECN) information from the + // incoming packet. + // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + IPPROTO_IPV6, + IPV6_RECVTCLASS, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(IPV6_RECVTCLASS) failed"); + goto Exit; + } -#ifdef UDP_GRO - if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { Option = TRUE; Result = setsockopt( SocketContext->SocketFd, - SOL_UDP, - UDP_GRO, + IPPROTO_IP, + IP_RECVTOS, (const void*)&Option, sizeof(Option)); if (Result == SOCKET_ERROR) { @@ -810,49 +807,43 @@ CxPlatSocketContextInitialize( "[data][%p] ERROR, %u, %s.", Binding, Status, - "setsockopt(UDP_GRO) failed"); + "setsockopt(IP_RECVTOS) failed"); goto Exit; } - } -#endif - // - // The socket is shared by multiple QUIC endpoints, so increase the receive - // buffer size. - // - Option = INT32_MAX; - Result = - setsockopt( - SocketContext->SocketFd, - SOL_SOCKET, - SO_RCVBUF, - (const void*)&Option, - sizeof(Option)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "setsockopt(SO_RCVBUF) failed"); - goto Exit; - } + #ifdef UDP_GRO + if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + SOL_UDP, + UDP_GRO, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(UDP_GRO) failed"); + goto Exit; + } + } + #endif - // - // Only set SO_REUSEPORT on a server socket, otherwise the client could be - // assigned a server port (unless it's forcing sharing). - // - if (Config->Flags & CXPLAT_SOCKET_FLAG_SHARE || Config->RemoteAddress == NULL) { // - // The port is shared across processors. + // The socket is shared by multiple QUIC endpoints, so increase the receive + // buffer size. // - Option = TRUE; + Option = INT32_MAX; Result = setsockopt( SocketContext->SocketFd, SOL_SOCKET, - SO_REUSEPORT, + SO_RCVBUF, (const void*)&Option, sizeof(Option)); if (Result == SOCKET_ERROR) { @@ -862,9 +853,37 @@ CxPlatSocketContextInitialize( "[data][%p] ERROR, %u, %s.", Binding, Status, - "setsockopt(SO_REUSEPORT) failed"); + "setsockopt(SO_RCVBUF) failed"); goto Exit; } + + // + // Only set SO_REUSEPORT on a server socket, otherwise the client could be + // assigned a server port (unless it's forcing sharing). + // + if (Config->Flags & CXPLAT_SOCKET_FLAG_SHARE || Config->RemoteAddress == NULL) { + // + // The port is shared across processors. + // + Option = TRUE; + Result = + setsockopt( + SocketContext->SocketFd, + SOL_SOCKET, + SO_REUSEPORT, + (const void*)&Option, + sizeof(Option)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "setsockopt(SO_REUSEPORT) failed"); + goto Exit; + } + } } CxPlatCopyMemory(&MappedAddress, &Binding->LocalAddress, sizeof(MappedAddress)); @@ -872,32 +891,9 @@ CxPlatSocketContextInitialize( MappedAddress.Ipv6.sin6_family = AF_INET6; } - Result = - bind( - SocketContext->SocketFd, - &MappedAddress.Ip, - sizeof(MappedAddress)); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "bind failed"); - goto Exit; - } - - if (Config->RemoteAddress != NULL) { - CxPlatZeroMemory(&MappedAddress, sizeof(MappedAddress)); - CxPlatConvertToMappedV6(Config->RemoteAddress, &MappedAddress); - - if (MappedAddress.Ipv6.sin6_family == QUIC_ADDRESS_FAMILY_INET6) { - MappedAddress.Ipv6.sin6_family = AF_INET6; - } - + if (SocketType != CXPLAT_SOCKET_TCP_SERVER) { Result = - connect( + bind( SocketContext->SocketFd, &MappedAddress.Ip, sizeof(MappedAddress)); @@ -908,48 +904,94 @@ CxPlatSocketContextInitialize( "[data][%p] ERROR, %u, %s.", Binding, Status, - "connect failed"); + "bind failed"); goto Exit; } - Binding->Connected = TRUE; - } - // - // If no specific local port was indicated, then the stack just - // assigned this socket a port. We need to query it and use it for - // all the other sockets we are going to create. - // - AssignedLocalAddressLength = sizeof(Binding->LocalAddress); - Result = - getsockname( - SocketContext->SocketFd, - (struct sockaddr *)&Binding->LocalAddress, - &AssignedLocalAddressLength); - if (Result == SOCKET_ERROR) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "getsockname failed"); - goto Exit; - } + QUIC_ADDR_STR LocalAddressStr; + QUIC_ADDR_STR RemoteAddressStr; + QuicAddrToString(&MappedAddress, &LocalAddressStr); + + if (Config->RemoteAddress != NULL) { + CxPlatZeroMemory(&MappedAddress, sizeof(MappedAddress)); + CxPlatConvertToMappedV6(Config->RemoteAddress, &MappedAddress); + + if (MappedAddress.Ipv6.sin6_family == QUIC_ADDRESS_FAMILY_INET6) { + MappedAddress.Ipv6.sin6_family = AF_INET6; + } + QuicAddrToString(&MappedAddress, &RemoteAddressStr); + Result = + connect( + SocketContext->SocketFd, + &MappedAddress.Ip, + sizeof(MappedAddress)); + if (Result == SOCKET_ERROR && errno != EINPROGRESS) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "connect failed"); + goto Exit; + } + Binding->Connected = SocketType != CXPLAT_SOCKET_TCP; + } -#if DEBUG - if (Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port != 0) { - CXPLAT_DBG_ASSERT(Config->LocalAddress->Ipv4.sin_port == Binding->LocalAddress.Ipv4.sin_port); - } else if (Config->RemoteAddress && Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port == 0) { // - // A client socket being assigned the same port as a remote socket causes issues later - // in the datapath and binding paths. Check to make sure this case was not given to us. + // If no specific local port was indicated, then the stack just + // assigned this socket a port. We need to query it and use it for + // all the other sockets we are going to create. // - CXPLAT_DBG_ASSERT(Binding->LocalAddress.Ipv4.sin_port != Config->RemoteAddress->Ipv4.sin_port); - } + AssignedLocalAddressLength = sizeof(Binding->LocalAddress); + Result = + getsockname( + SocketContext->SocketFd, + (struct sockaddr *)&Binding->LocalAddress, + &AssignedLocalAddressLength); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "getsockname failed"); + goto Exit; + } + +#if DEBUG + if (Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port != 0) { + CXPLAT_DBG_ASSERT(Config->LocalAddress->Ipv4.sin_port == Binding->LocalAddress.Ipv4.sin_port); + } else if (Config->RemoteAddress && Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port == 0) { + // + // A client socket being assigned the same port as a remote socket causes issues later + // in the datapath and binding paths. Check to make sure this case was not given to us. + // + CXPLAT_DBG_ASSERT(Binding->LocalAddress.Ipv4.sin_port != Config->RemoteAddress->Ipv4.sin_port); + } #endif - if (Binding->LocalAddress.Ipv6.sin6_family == AF_INET6) { - Binding->LocalAddress.Ipv6.sin6_family = QUIC_ADDRESS_FAMILY_INET6; + if (Binding->LocalAddress.Ipv6.sin6_family == AF_INET6) { + Binding->LocalAddress.Ipv6.sin6_family = QUIC_ADDRESS_FAMILY_INET6; + } + + if (SocketType == CXPLAT_SOCKET_TCP_LISTENER) { + Result = + listen( + SocketContext->SocketFd, + 100); + if (Result == SOCKET_ERROR) { + int error = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + error, + "listen"); + goto Exit; + } + } } SocketContext->SqeInitialized = TRUE; @@ -1093,8 +1135,9 @@ SocketCreateUdp( ) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; - BOOLEAN IsServerSocket = Config->RemoteAddress == NULL; - uint16_t SocketCount = IsServerSocket ? (uint16_t)CxPlatProcMaxCount() : 1; + const BOOLEAN IsServerSocket = Config->RemoteAddress == NULL; + const BOOLEAN NumPerProcessorSockets = IsServerSocket && Datapath->PartitionCount > 1; + const uint16_t SocketCount = NumPerProcessorSockets ? (uint16_t)CxPlatProcMaxCount() : 1; CXPLAT_DBG_ASSERT(Datapath->UdpHandlers.Receive != NULL || Config->Flags & CXPLAT_SOCKET_FLAG_PCP); @@ -1122,8 +1165,10 @@ SocketCreateUdp( CxPlatZeroMemory(Binding, BindingLength); Binding->Datapath = Datapath; Binding->ClientContext = Config->CallbackContext; + Binding->NumPerProcessorSockets = NumPerProcessorSockets; Binding->HasFixedRemoteAddress = (Config->RemoteAddress != NULL); Binding->Mtu = CXPLAT_MAX_MTU; + Binding->Type = CXPLAT_SOCKET_UDP; CxPlatRefInitializeEx(&Binding->RefCount, SocketCount); if (Config->LocalAddress) { CxPlatConvertToMappedV6(Config->LocalAddress, &Binding->LocalAddress); @@ -1150,7 +1195,8 @@ SocketCreateUdp( CxPlatSocketContextInitialize( &Binding->SocketContexts[i], Config, - Config->RemoteAddress ? Config->PartitionIndex : (i % Datapath->PartitionCount)); + Config->RemoteAddress ? Config->PartitionIndex : (i % Datapath->PartitionCount), + Binding->Type); if (QUIC_FAILED(Status)) { goto Exit; } @@ -1197,6 +1243,123 @@ SocketCreateUdp( return Status; } + +_IRQL_requires_max_(PASSIVE_LEVEL) +QUIC_STATUS +CxPlatSocketCreateTcpInternal( + _In_ CXPLAT_DATAPATH* Datapath, + _In_ CXPLAT_SOCKET_TYPE Type, + _In_opt_ const QUIC_ADDR* LocalAddress, + _In_opt_ const QUIC_ADDR* RemoteAddress, + _In_opt_ void* RecvCallbackContext, + _Out_ CXPLAT_SOCKET** NewBinding + ) +{ + QUIC_STATUS Status; + uint16_t PartitionIndex; + BOOLEAN IsServerSocket = RemoteAddress == NULL; + + CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL); + + CXPLAT_SOCKET_CONTEXT* SocketContext = NULL; + uint32_t SocketLength = sizeof(CXPLAT_SOCKET) + sizeof(CXPLAT_SOCKET_CONTEXT); + CXPLAT_SOCKET* Binding = CXPLAT_ALLOC_PAGED(SocketLength, QUIC_POOL_SOCKET); + if (Binding == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "CXPLAT_SOCKET", + SocketLength); + Status = QUIC_STATUS_OUT_OF_MEMORY; + goto Exit; + } + + QuicTraceEvent( + DatapathCreated, + "[data][%p] Created, local=%!ADDR!, remote=%!ADDR!", + Binding, + CASTED_CLOG_BYTEARRAY(LocalAddress ? sizeof(*LocalAddress) : 0, LocalAddress), + CASTED_CLOG_BYTEARRAY(RemoteAddress ? sizeof(*RemoteAddress) : 0, RemoteAddress)); + + CxPlatZeroMemory(Binding, SocketLength); + Binding->Datapath = Datapath; + Binding->ClientContext = RecvCallbackContext; + Binding->HasFixedRemoteAddress = TRUE; + Binding->Mtu = CXPLAT_MAX_MTU; + Binding->Type = Type; + if (LocalAddress) { + CxPlatConvertToMappedV6(LocalAddress, &Binding->LocalAddress); + } else { + Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; + } + PartitionIndex = + RemoteAddress ? + ((uint16_t)(CxPlatProcCurrentNumber() % Datapath->PartitionCount)) : 0; + CxPlatRefInitializeEx(&Binding->RefCount, 1); + + SocketContext = &Binding->SocketContexts[0]; + SocketContext->Binding = Binding; + SocketContext->SocketFd = INVALID_SOCKET; + SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; + SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; + SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; + CxPlatListInitializeHead(&SocketContext->TxQueue); + CxPlatLockInitialize(&SocketContext->TxQueueLock); + CxPlatRundownInitialize(&SocketContext->UpcallRundown); + + CXPLAT_UDP_CONFIG Config = { + .LocalAddress = LocalAddress, + .RemoteAddress = RemoteAddress + }; + Status = + CxPlatSocketContextInitialize( + SocketContext, + &Config, + PartitionIndex, + Binding->Type); + if (QUIC_FAILED(Status)) { + goto Exit; + } + + if (IsServerSocket) { + // + // The return value is being ignored here, as if a system does not support + // bpf we still want the server to work. If this happens, the sockets will + // round robin, but each flow will be sent to the same socket, just not + // based on RSS. + // + (void)CxPlatSocketConfigureRss(SocketContext, 1); + } + + CxPlatConvertFromMappedV6(&Binding->LocalAddress, &Binding->LocalAddress); + Binding->LocalAddress.Ipv6.sin6_scope_id = 0; + + if (RemoteAddress != NULL) { + Binding->RemoteAddress = *RemoteAddress; + } else { + Binding->RemoteAddress.Ipv4.sin_port = 0; + } + + // + // Must set output pointer before starting receive path, as the receive path + // will try to use the output. + // + *NewBinding = Binding; + + CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_ADD, EPOLLIN | EPOLLOUT); + SocketContext->IoStarted = TRUE; + + Binding = NULL; + +Exit: + + if (Binding != NULL) { + CxPlatSocketDelete(Binding); + } + + return Status; +} + _IRQL_requires_max_(PASSIVE_LEVEL) QUIC_STATUS SocketCreateTcp( @@ -1207,12 +1370,14 @@ SocketCreateTcp( _Out_ CXPLAT_SOCKET** Socket ) { - UNREFERENCED_PARAMETER(Datapath); - UNREFERENCED_PARAMETER(LocalAddress); - UNREFERENCED_PARAMETER(RemoteAddress); - UNREFERENCED_PARAMETER(CallbackContext); - UNREFERENCED_PARAMETER(Socket); - return QUIC_STATUS_NOT_SUPPORTED; + return + CxPlatSocketCreateTcpInternal( + Datapath, + CXPLAT_SOCKET_TCP, + LocalAddress, + RemoteAddress, + CallbackContext, + Socket); } _IRQL_requires_max_(PASSIVE_LEVEL) @@ -1224,11 +1389,156 @@ SocketCreateTcpListener( _Out_ CXPLAT_SOCKET** Socket ) { - UNREFERENCED_PARAMETER(Datapath); - UNREFERENCED_PARAMETER(LocalAddress); - UNREFERENCED_PARAMETER(CallbackContext); - UNREFERENCED_PARAMETER(Socket); - return QUIC_STATUS_NOT_SUPPORTED; + QUIC_STATUS Status; + // int Result; + // int Option; + + CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL); + + CXPLAT_SOCKET_CONTEXT* SocketContext = NULL; + uint32_t SocketLength = sizeof(CXPLAT_SOCKET) + sizeof(CXPLAT_SOCKET_CONTEXT); + CXPLAT_SOCKET* Binding = CXPLAT_ALLOC_PAGED(SocketLength, QUIC_POOL_SOCKET); + if (Binding == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "CXPLAT_SOCKET", + SocketLength); + Status = QUIC_STATUS_OUT_OF_MEMORY; + goto Exit; + } + + QuicTraceEvent( + DatapathCreated, + "[data][%p] Created, local=%!ADDR!, remote=%!ADDR!", + Binding, + CASTED_CLOG_BYTEARRAY(LocalAddress ? sizeof(*LocalAddress) : 0, LocalAddress), + CASTED_CLOG_BYTEARRAY(0, NULL)); + + CxPlatZeroMemory(Binding, SocketLength); + Binding->Datapath = Datapath; + Binding->ClientContext = CallbackContext; + Binding->HasFixedRemoteAddress = FALSE; + Binding->Mtu = CXPLAT_MAX_MTU; + Binding->Type = CXPLAT_SOCKET_TCP_LISTENER; + if (LocalAddress) { + CxPlatConvertToMappedV6(LocalAddress, &Binding->LocalAddress); + if (Binding->LocalAddress.Ip.sa_family == AF_UNSPEC) { + Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; + } + } else { + Binding->LocalAddress.Ip.sa_family = QUIC_ADDRESS_FAMILY_INET6; + } + CxPlatRefInitializeEx(&Binding->RefCount, 1); + + SocketContext = &Binding->SocketContexts[0]; + SocketContext->Binding = Binding; + SocketContext->SocketFd = INVALID_SOCKET; + SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; + SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; + SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; + CxPlatListInitializeHead(&SocketContext->TxQueue); + CxPlatLockInitialize(&SocketContext->TxQueueLock); + CxPlatRundownInitialize(&SocketContext->UpcallRundown); + + CXPLAT_UDP_CONFIG Config = { + .LocalAddress = LocalAddress, + .RemoteAddress = NULL + }; + Status = + CxPlatSocketContextInitialize( + SocketContext, + &Config, + 0, + Binding->Type); + if (QUIC_FAILED(Status)) { + goto Exit; + } + + *Socket = Binding; + + CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_ADD, EPOLLIN | EPOLLOUT); + + SocketContext->IoStarted = TRUE; + Binding = NULL; + Status = QUIC_STATUS_SUCCESS; + +Exit: + + if (Binding != NULL) { + CxPlatSocketDelete(Binding); + } + + return Status; +} + +void +CxPlatSocketContextAcceptCompletion( + _In_ CXPLAT_SOCKET_CONTEXT* SocketContext, + _In_ CXPLAT_CQE* Cqe + ) +{ + UNREFERENCED_PARAMETER(Cqe); + CXPLAT_DATAPATH* Datapath = SocketContext->Binding->Datapath; + + if (!CxPlatRundownAcquire(&SocketContext->UpcallRundown)) { + return; + } + + QUIC_STATUS Status = + CxPlatSocketCreateTcpInternal( + Datapath, + CXPLAT_SOCKET_TCP_SERVER, + NULL, + NULL, + NULL, + (CXPLAT_SOCKET**)&SocketContext->AcceptSocket); + if (QUIC_FAILED(Status)) { + goto Error; + } + + SocketContext->AcceptSocket->SocketContexts[0].SocketFd = accept(SocketContext->SocketFd, NULL, NULL); + + CxPlatSocketContextSetEvents(&SocketContext->AcceptSocket->SocketContexts[0], EPOLL_CTL_ADD, EPOLLIN); + SocketContext->AcceptSocket->SocketContexts[0].IoStarted = TRUE; + Datapath->TcpHandlers.Accept( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + SocketContext->AcceptSocket, + &SocketContext->AcceptSocket->ClientContext); + + SocketContext->AcceptSocket = NULL; + +Error: + + if (SocketContext->AcceptSocket != NULL) { + SocketDelete(SocketContext->AcceptSocket); + SocketContext->AcceptSocket = NULL; + } + + CxPlatRundownRelease(&SocketContext->UpcallRundown); +} + +void +CxPlatSocketContextConnectCompletion( + _In_ CXPLAT_SOCKET_CONTEXT* SocketContext, + _In_ CXPLAT_CQE* Cqe + ) +{ + UNREFERENCED_PARAMETER(Cqe); + CXPLAT_DATAPATH* Datapath = SocketContext->Binding->Datapath; + + if (!CxPlatRundownAcquire(&SocketContext->UpcallRundown)) { + return; + } + + Datapath->TcpHandlers.Connect( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + TRUE); + // TODO: error case? + + CxPlatRundownRelease(&SocketContext->UpcallRundown); } void @@ -1247,8 +1557,8 @@ SocketDelete( Socket->Uninitialized = TRUE; #endif - const uint32_t SocketCount = - Socket->HasFixedRemoteAddress ? 1 : Socket->Datapath->PartitionCount; + const uint16_t SocketCount = + Socket->NumPerProcessorSockets ? (uint16_t)CxPlatProcMaxCount() : 1; for (uint32_t i = 0; i < SocketCount; ++i) { CxPlatSocketContextUninitialize(&Socket->SocketContexts[i]); @@ -1597,15 +1907,84 @@ CxPlatSocketReceiveMessages( } } +void +CxPlatSocketTcpRecvComplete( + _In_ CXPLAT_SOCKET_CONTEXT* SocketContext + ) +{ + CXPLAT_DATAPATH_PARTITION* DatapathPartition = SocketContext->DatapathPartition; + DATAPATH_RX_IO_BLOCK* IoBlock = NULL; + + uint32_t RetryCount = 0; + do { + IoBlock = CxPlatPoolAlloc(&DatapathPartition->RecvBlockPool); + } while (IoBlock == NULL && ++RetryCount < 10); + if (IoBlock == NULL) { + QuicTraceEvent( + AllocFailure, + "Allocation of '%s' failed. (%llu bytes)", + "DATAPATH_RX_IO_BLOCK", + 0); + goto Exit; + } + + IoBlock->OwningPool = &DatapathPartition->RecvBlockPool; + IoBlock->Route.State = RouteResolved; + IoBlock->Route.Queue = SocketContext; + IoBlock->RefCount = 0; + + uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset; + int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, CXPLAT_LARGE_IO_BUFFER_SIZE); + + if (NumberOfBytesTransferred <= 0) { + SocketContext->Binding->DisconnectIndicated = TRUE; + SocketContext->Binding->Datapath->TcpHandlers.Connect( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + FALSE); + } else { + DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1); + Datagram->IoBlock = IoBlock; + CXPLAT_RECV_DATA* Data = &Datagram->Data; + + Data->Next = NULL; + Data->Buffer = Buffer; + Data->BufferLength = NumberOfBytesTransferred; + Data->Route = &IoBlock->Route; + Data->PartitionIndex = SocketContext->DatapathPartition->PartitionIndex; + Data->TypeOfService = 0; + Data->Allocated = TRUE; + Data->Route->DatapathType = Data->DatapathType = CXPLAT_DATAPATH_TYPE_USER; + Data->QueuedOnConnection = FALSE; + IoBlock->RefCount++; + IoBlock = NULL; + + SocketContext->Binding->Datapath->TcpHandlers.Receive( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + Data); + } + +Exit: + + if (IoBlock) { + CxPlatPoolFree(&DatapathPartition->RecvBlockPool, IoBlock); + } +} + void CxPlatSocketReceive( _In_ CXPLAT_SOCKET_CONTEXT* SocketContext ) { - if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { - CxPlatSocketReceiveCoalesced(SocketContext); + if (SocketContext->Binding->Type == CXPLAT_SOCKET_UDP) { + if (SocketContext->DatapathPartition->Datapath->Features & CXPLAT_DATAPATH_FEATURE_RECV_COALESCING) { + CxPlatSocketReceiveCoalesced(SocketContext); + } else { + CxPlatSocketReceiveMessages(SocketContext); + } } else { - CxPlatSocketReceiveMessages(SocketContext); + CxPlatSocketTcpRecvComplete(SocketContext); } } @@ -2138,10 +2517,19 @@ CxPlatDataPathSocketProcessIoCompletion( CxPlatSocketHandleErrors(SocketContext); } if (EPOLLIN & Cqe->events) { - CxPlatSocketReceive(SocketContext); + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_LISTENER) { + CxPlatSocketContextAcceptCompletion(SocketContext, Cqe); + } else { + CxPlatSocketReceive(SocketContext); + } } if (EPOLLOUT & Cqe->events) { - CxPlatSocketContextFlushTxQueue(SocketContext, TRUE); + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP && !SocketContext->Binding->Connected) { + CxPlatSocketContextConnectCompletion(SocketContext, Cqe); + SocketContext->Binding->Connected = TRUE; + } else { + CxPlatSocketContextFlushTxQueue(SocketContext, TRUE); + } } CxPlatRundownRelease(&SocketContext->UpcallRundown); } From ec8fb56ffe2cd9605acf642deee1747599ce8e7f Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Thu, 5 Oct 2023 09:39:51 -0700 Subject: [PATCH 2/9] add definitions --- src/platform/platform_internal.h | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/platform/platform_internal.h b/src/platform/platform_internal.h index 70a6e2d759..4d26f842ec 100644 --- a/src/platform/platform_internal.h +++ b/src/platform/platform_internal.h @@ -119,6 +119,13 @@ typedef struct DATAPATH_IO_SQE { DATAPATH_SQE DatapathSqe; } DATAPATH_IO_SQE; +typedef enum CXPLAT_SOCKET_TYPE { + CXPLAT_SOCKET_UDP = 0, + CXPLAT_SOCKET_TCP_LISTENER = 1, + CXPLAT_SOCKET_TCP = 2, + CXPLAT_SOCKET_TCP_SERVER = 3 +} CXPLAT_SOCKET_TYPE; + #ifdef _KERNEL_MODE #define CXPLAT_BASE_REG_PATH L"\\Registry\\Machine\\System\\CurrentControlSet\\Services\\MsQuic\\Parameters\\" @@ -184,13 +191,6 @@ typedef struct CX_PLATFORM { } CX_PLATFORM; -typedef enum CXPLAT_SOCKET_TYPE { - CXPLAT_SOCKET_UDP = 0, - CXPLAT_SOCKET_TCP_LISTENER = 1, - CXPLAT_SOCKET_TCP = 2, - CXPLAT_SOCKET_TCP_SERVER = 3 -} CXPLAT_SOCKET_TYPE; - // // Represents a single IO completion port and thread for processing work that is // completed on a single processor. @@ -751,6 +751,8 @@ typedef struct QUIC_CACHEALIGN CXPLAT_SOCKET_CONTEXT { uint8_t Freed : 1; #endif + CXPLAT_SOCKET* AcceptSocket; + } CXPLAT_SOCKET_CONTEXT; // @@ -784,11 +786,27 @@ typedef struct CXPLAT_SOCKET { // BOOLEAN Connected : 1; + // + // Socket type. + // + uint8_t Type : 2; // CXPLAT_SOCKET_TYPE + + // + // Flag indicates the socket has more than one socket, affinitized to all + // the processors. + // + uint8_t NumPerProcessorSockets : 1; + // // Flag indicates the socket has a default remote destination. // BOOLEAN HasFixedRemoteAddress : 1; + // + // Flag indicates the socket indicated a disconnect event. + // + uint8_t DisconnectIndicated : 1; + // // Flag indicates the binding is being used for PCP. // From 19b3f38df887ce1374cf36221012d996e5a5b0c1 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Thu, 5 Oct 2023 12:00:32 -0700 Subject: [PATCH 3/9] cleanup & perf test onboarding --- scripts/RemoteTests.json | 40 ++++++++++++++++++++++++ src/platform/datapath_epoll.c | 58 +++++++++++++++++++++++++++++------ 2 files changed, 88 insertions(+), 10 deletions(-) diff --git a/scripts/RemoteTests.json b/scripts/RemoteTests.json index 586dc25a7f..485ad86221 100644 --- a/scripts/RemoteTests.json +++ b/scripts/RemoteTests.json @@ -93,6 +93,26 @@ "Formats": ["{0} kbps"], "RegressionThreshold": "-60.0" }, + { + "TestName": "TcpThroughputUp", + "Local" : { + "Platform": "linux", + "Tls": ["openssl", "openssl3"], + "Arch": ["x64", "arm"], + "Exe": "secnetperf", + "Arguments": "-exec:maxtput -test:Throughput -target:$RemoteAddress -uni:1 -timed:1 -upload:12000 -tcp:1" + }, + "Variables": [ + ], + "SkipKernel": true, + "AllowLoopback": true, + "Iterations": 5, + "RemoteReadyMatcher": "Started!", + "ResultsMatcher": ".*@ (.*) kbps.*", + "FailureDefault": "Result: 0 bytes @ 0 kbps (0.0 ms).", + "Formats": ["{0} kbps"], + "RegressionThreshold": "-60.0" + }, { "TestName": "ThroughputDown", "Local": { @@ -165,6 +185,26 @@ "Formats": ["{0} kbps"], "RegressionThreshold": "-60.0" }, + { + "TestName": "TcpThroughputDown", + "Local" : { + "Platform": "linux", + "Tls": ["openssl", "openssl3"], + "Arch": ["x64", "arm"], + "Exe": "secnetperf", + "Arguments": "-exec:maxtput -test:Throughput -target:$RemoteAddress -uni:1 -timed:1 -download:12000 -tcp:1" + }, + "Variables": [ + ], + "SkipKernel": true, + "AllowLoopback": true, + "Iterations": 5, + "RemoteReadyMatcher": "Started!", + "ResultsMatcher": ".*@ (.*) kbps.*", + "FailureDefault": "Result: 0 bytes @ 0 kbps (0.0 ms).", + "Formats": ["{0} kbps"], + "RegressionThreshold": "-60.0" + }, { "TestName": "RPS", "Local": { diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 118e3e11fe..e6f28286e4 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -632,6 +632,11 @@ CxPlatSocketContextInitialize( } FlushTxInitialized = TRUE; + + if (SocketType == CXPLAT_SOCKET_TCP_SERVER) { + return Status; + } + // // Create datagram socket. // @@ -651,7 +656,6 @@ CxPlatSocketContextInitialize( "socket failed"); goto Exit; } - // // Set dual (IPv4 & IPv6) socket mode. // @@ -1082,6 +1086,25 @@ CxPlatSocketContextUninitialize( if (!SocketContext->IoStarted) { CxPlatSocketContextUninitializeComplete(SocketContext); } else { + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP || + SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_SERVER) { + // + // For TCP sockets, we should shutdown the socket before closing it. + // + SocketContext->Binding->DisconnectIndicated = TRUE; + if (shutdown(SocketContext->SocketFd, SHUT_RDWR) != 0) { + int Errno = errno; + if (Errno != ENOTCONN) { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Errno, + "shutdown"); + } + } + } + CxPlatRundownReleaseAndWait(&SocketContext->UpcallRundown); // Block until all upcalls complete. // @@ -1320,6 +1343,10 @@ CxPlatSocketCreateTcpInternal( if (QUIC_FAILED(Status)) { goto Exit; } + if (Binding->Type == CXPLAT_SOCKET_TCP_SERVER) { + *NewBinding = Binding; + return Status; + } if (IsServerSocket) { // @@ -1390,8 +1417,6 @@ SocketCreateTcpListener( ) { QUIC_STATUS Status; - // int Result; - // int Option; CXPLAT_DBG_ASSERT(Datapath->TcpHandlers.Receive != NULL); @@ -1457,7 +1482,7 @@ SocketCreateTcpListener( *Socket = Binding; - CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_ADD, EPOLLIN | EPOLLOUT); + CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_ADD, EPOLLIN); SocketContext->IoStarted = TRUE; Binding = NULL; @@ -1936,12 +1961,25 @@ CxPlatSocketTcpRecvComplete( uint8_t* Buffer = (uint8_t*)IoBlock + DatapathPartition->Datapath->RecvBlockBufferOffset; int NumberOfBytesTransferred = read(SocketContext->SocketFd, Buffer, CXPLAT_LARGE_IO_BUFFER_SIZE); - if (NumberOfBytesTransferred <= 0) { - SocketContext->Binding->DisconnectIndicated = TRUE; - SocketContext->Binding->Datapath->TcpHandlers.Connect( - SocketContext->Binding, - SocketContext->Binding->ClientContext, - FALSE); + if (NumberOfBytesTransferred == 0) { + if (!SocketContext->Binding->DisconnectIndicated) { + SocketContext->Binding->DisconnectIndicated = TRUE; + SocketContext->Binding->Datapath->TcpHandlers.Connect( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + FALSE); + } + goto Exit; + } else if (NumberOfBytesTransferred < 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + errno, + "read failed"); + } + goto Exit; } else { DATAPATH_RX_PACKET* Datagram = (DATAPATH_RX_PACKET*)(IoBlock + 1); Datagram->IoBlock = IoBlock; From 41d71445fdab85c2fccaad17397854ce92a7cfab Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Thu, 5 Oct 2023 16:54:14 -0700 Subject: [PATCH 4/9] implement send feature --- src/platform/datapath_epoll.c | 85 ++++++++++++++++++++++++++--------- 1 file changed, 64 insertions(+), 21 deletions(-) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index e6f28286e4..013bcc8558 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -2055,7 +2055,7 @@ SendDataAlloc( ) { CXPLAT_DBG_ASSERT(Socket != NULL); - CXPLAT_DBG_ASSERT(Config->MaxPacketSize <= MAX_UDP_PAYLOAD_LENGTH); + CXPLAT_DBG_ASSERT(Socket->Type != CXPLAT_SOCKET_UDP || Config->MaxPacketSize <= MAX_UDP_PAYLOAD_LENGTH); if (Config->Route->Queue == NULL) { Config->Route->Queue = &Socket->SocketContexts[0]; } @@ -2069,7 +2069,10 @@ SendDataAlloc( SendData->ClientBuffer.Buffer = SendData->Buffer; SendData->ClientBuffer.Length = 0; SendData->TotalSize = 0; - SendData->SegmentSize = Config->MaxPacketSize; + SendData->SegmentSize = + (Socket->Type != CXPLAT_SOCKET_UDP || + Socket->Datapath->Features & CXPLAT_DATAPATH_FEATURE_SEND_SEGMENTATION) + ? Config->MaxPacketSize : 0; SendData->BufferCount = 0; SendData->AlreadySentCount = 0; SendData->ControlBufferLength = 0; @@ -2417,6 +2420,30 @@ CxPlatSendDataSendMessages( return TRUE; } +BOOLEAN +CxPlatSendDataSendTcp( + _In_ CXPLAT_SEND_DATA* SendData + ) +{ + uint32_t TotalSize = SendData->TotalSize; + uint8_t *Buffer = SendData->Buffer; + while (TotalSize > 0) { + int BytesSent = + send( + SendData->SocketContext->SocketFd, + Buffer, + TotalSize, + 0); + if (BytesSent < 0) { + return FALSE; + } + Buffer += BytesSent; + TotalSize -= BytesSent; + } + + return TRUE; +} + QUIC_STATUS CxPlatSendDataSend( _In_ CXPLAT_SEND_DATA* SendData @@ -2424,35 +2451,47 @@ CxPlatSendDataSend( { CXPLAT_DBG_ASSERT(SendData != NULL); CXPLAT_DBG_ASSERT(SendData->AlreadySentCount < CXPLAT_MAX_IO_BATCH_SIZE); + CXPLAT_SOCKET_TYPE SocketType = SendData->SocketContext->Binding->Type; QUIC_STATUS Status = QUIC_STATUS_SUCCESS; CXPLAT_SOCKET_CONTEXT* SocketContext = SendData->SocketContext; BOOLEAN Success = + SocketType == CXPLAT_SOCKET_UDP ? #ifdef UDP_SEGMENT - SendData->SegmentationSupported ? - CxPlatSendDataSendSegmented(SendData) : + SendData->SegmentationSupported ? + CxPlatSendDataSendSegmented(SendData) : #endif - CxPlatSendDataSendMessages(SendData); + CxPlatSendDataSendMessages(SendData) : + CxPlatSendDataSendTcp(SendData); if (!Success) { if (errno == EAGAIN || errno == EWOULDBLOCK) { Status = QUIC_STATUS_PENDING; } else { Status = errno; + if (SocketType == CXPLAT_SOCKET_UDP) { #ifdef UDP_SEGMENT - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - SocketContext->Binding, - Status, - "sendmsg (GSO) failed"); + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "sendmsg (GSO) failed"); #else - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - SocketContext->Binding, - Status, - "sendmmsg failed"); + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "sendmmsg failed"); #endif + } else { + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "send failed"); + } if (Status == EIO && SocketContext->Binding->Datapath->Features & CXPLAT_DATAPATH_FEATURE_SEND_SEGMENTATION) { @@ -2477,10 +2516,14 @@ CxPlatSendDataSend( Status == EHOSTUNREACH || Status == ENETUNREACH) { if (!SocketContext->Binding->PcpBinding) { - SocketContext->Binding->Datapath->UdpHandlers.Unreachable( - SocketContext->Binding, - SocketContext->Binding->ClientContext, - &SocketContext->Binding->RemoteAddress); + if (SocketType == CXPLAT_SOCKET_UDP) { + SocketContext->Binding->Datapath->UdpHandlers.Unreachable( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + &SocketContext->Binding->RemoteAddress); + } else { + // TODO - TCP error + } } } } From b3f71e93f2571de4de79263fafbf7c81e53bfbd0 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Fri, 6 Oct 2023 12:16:15 -0700 Subject: [PATCH 5/9] use recv complete --- scripts/TcpTests.json | 19 +++++++++++ src/platform/datapath_epoll.c | 60 ++++++++++++++++++++++++----------- 2 files changed, 61 insertions(+), 18 deletions(-) diff --git a/scripts/TcpTests.json b/scripts/TcpTests.json index 886fe4a1c2..527c4ce7d0 100644 --- a/scripts/TcpTests.json +++ b/scripts/TcpTests.json @@ -22,6 +22,25 @@ "ResultsMatcher": ".*@ (.*) kbps.*", "Formats": ["{0} kbps"], "RegressionThreshold": "-8.0" + }, + { + "TestName": "TcpThroughputUp", + "Local" : { + "Platform": "linux", + "Tls": ["openssl", "openssl3"], + "Arch": ["x64", "arm"], + "Exe": "secnetperf", + "Arguments": "-test:Throughput -target:$RemoteAddress -bind:$LocalAddress:4434 -ip:4 -uni:1 -upload:2000000000 -tcp:1" + }, + "Variables": [ + ], + "AllowLoopback": true, + "Iterations": 5, + "RemoteReadyMatcher": "Started!", + "ResultsMatcher": ".*@ (.*) kbps.*", + "FailureDefault": "Result: 0 bytes @ 0 kbps (0.0 ms).", + "Formats": ["{0} kbps"], + "RegressionThreshold": "-60.0" } ] } diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 013bcc8558..b485557186 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -1623,18 +1623,32 @@ CxPlatSocketHandleErrors( ErrNum, "Socket error event"); - // - // Send unreachable notification to MsQuic if any related - // errors were received. - // - if (ErrNum == ECONNREFUSED || - ErrNum == EHOSTUNREACH || - ErrNum == ENETUNREACH) { - if (!SocketContext->Binding->PcpBinding) { - SocketContext->Binding->Datapath->UdpHandlers.Unreachable( + if (SocketContext->Binding->Type == CXPLAT_SOCKET_UDP) { + // + // Send unreachable notification to MsQuic if any related + // errors were received. + // + if (ErrNum == ECONNREFUSED || + ErrNum == EHOSTUNREACH || + ErrNum == ENETUNREACH) { + if (!SocketContext->Binding->PcpBinding) { + SocketContext->Binding->Datapath->UdpHandlers.Unreachable( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + &SocketContext->Binding->RemoteAddress); + } + } + } else if (ErrNum == ENOTSOCK || + ErrNum == EINTR || + ErrNum == ECANCELED || + ErrNum == ECONNABORTED || + ErrNum == ECONNRESET) { + if (!SocketContext->Binding->DisconnectIndicated) { + SocketContext->Binding->DisconnectIndicated = TRUE; + SocketContext->Binding->Datapath->TcpHandlers.Connect( SocketContext->Binding, SocketContext->Binding->ClientContext, - &SocketContext->Binding->RemoteAddress); + FALSE); } } } @@ -2257,6 +2271,13 @@ SocketSend( CxPlatSocketContextSetEvents(SocketContext, EPOLL_CTL_MOD, EPOLLIN | EPOLLOUT); Status = QUIC_STATUS_SUCCESS; } else { + if (Socket->Type != CXPLAT_SOCKET_UDP) { + SocketContext->Binding->Datapath->TcpHandlers.SendComplete( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + errno, + SendData->TotalSize); + } CxPlatSendDataFree(SendData); } @@ -2516,14 +2537,10 @@ CxPlatSendDataSend( Status == EHOSTUNREACH || Status == ENETUNREACH) { if (!SocketContext->Binding->PcpBinding) { - if (SocketType == CXPLAT_SOCKET_UDP) { - SocketContext->Binding->Datapath->UdpHandlers.Unreachable( - SocketContext->Binding, - SocketContext->Binding->ClientContext, - &SocketContext->Binding->RemoteAddress); - } else { - // TODO - TCP error - } + SocketContext->Binding->Datapath->UdpHandlers.Unreachable( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + &SocketContext->Binding->RemoteAddress); } } } @@ -2566,6 +2583,13 @@ CxPlatSocketContextFlushTxQueue( CxPlatLockAcquire(&SocketContext->TxQueueLock); CxPlatListRemoveHead(&SocketContext->TxQueue); + if (SocketContext->Binding->Type != CXPLAT_SOCKET_UDP) { + SocketContext->Binding->Datapath->TcpHandlers.SendComplete( + SocketContext->Binding, + SocketContext->Binding->ClientContext, + errno, + SendData->TotalSize); + } CxPlatSendDataFree(SendData); if (!CxPlatListIsEmpty(&SocketContext->TxQueue)) { SendData = From b787fcfbae2e68e4aed34cc8a17c7298ef5bae1c Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Mon, 9 Oct 2023 18:10:19 -0700 Subject: [PATCH 6/9] forcibly send inline --- src/platform/datapath_epoll.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index b485557186..29a1df533f 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -2456,6 +2456,10 @@ CxPlatSendDataSendTcp( TotalSize, 0); if (BytesSent < 0) { + // forcibly send inline + if (errno == EAGAIN || errno == EWOULDBLOCK) { + continue; + } return FALSE; } Buffer += BytesSent; From c93c7eec19e5c7c29c83c2ea9ca59be12f602be1 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Tue, 10 Oct 2023 16:40:05 -0700 Subject: [PATCH 7/9] solving time issue? --- src/platform/datapath_epoll.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 29a1df533f..b18eab7730 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -1053,6 +1053,11 @@ CxPlatSocketContextUninitializeComplete( TxEntry)); } + if (SocketContext->Binding->Type == CXPLAT_SOCKET_TCP_LISTENER && SocketContext->AcceptSocket) { + SocketDelete(SocketContext->AcceptSocket); + SocketContext->AcceptSocket = NULL; + } + if (SocketContext->SocketFd != INVALID_SOCKET) { epoll_ctl(*SocketContext->DatapathPartition->EventQ, EPOLL_CTL_DEL, SocketContext->SocketFd, NULL); close(SocketContext->SocketFd); From d48c760b83ae24903e9e4dbdef141766a9b260d3 Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 11 Oct 2023 15:24:35 -0700 Subject: [PATCH 8/9] clean up eventfd --- src/platform/datapath_epoll.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index b18eab7730..3871141f09 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -634,6 +634,7 @@ CxPlatSocketContextInitialize( if (SocketType == CXPLAT_SOCKET_TCP_SERVER) { + SocketContext->SqeInitialized = TRUE; return Status; } From 5ef63a419571c55f1dc7ff9f7c50dcc2e2f1937f Mon Sep 17 00:00:00 2001 From: ami-GS <1991.daiki@gmail.com> Date: Wed, 11 Oct 2023 16:56:27 -0700 Subject: [PATCH 9/9] cleanup --- src/platform/datapath_epoll.c | 279 +++++++++++++++++----------------- 1 file changed, 143 insertions(+), 136 deletions(-) diff --git a/src/platform/datapath_epoll.c b/src/platform/datapath_epoll.c index 3871141f09..eacd550484 100644 --- a/src/platform/datapath_epoll.c +++ b/src/platform/datapath_epoll.c @@ -558,34 +558,20 @@ CxPlatSocketConfigureRss( #endif } -// -// Socket context interface. It abstracts a (generally per-processor) UDP socket -// and the corresponding logic/functionality like send and receive processing. -// - QUIC_STATUS -CxPlatSocketContextInitialize( - _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext, - _In_ const CXPLAT_UDP_CONFIG* Config, - _In_ const uint16_t PartitionIndex, - _In_ CXPLAT_SOCKET_TYPE SocketType +CxPlatSocketContextSqeInitialize( + _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext ) { QUIC_STATUS Status = QUIC_STATUS_SUCCESS; - int Result = 0; - int Option = 0; - QUIC_ADDR MappedAddress = {0}; - socklen_t AssignedLocalAddressLength = 0; + CXPLAT_SOCKET* Binding = SocketContext->Binding; BOOLEAN ShutdownSqeInitialized = FALSE; BOOLEAN IoSqeInitialized = FALSE; BOOLEAN FlushTxInitialized = FALSE; - CXPLAT_SOCKET* Binding = SocketContext->Binding; - CXPLAT_DATAPATH* Datapath = Binding->Datapath; - - CXPLAT_DBG_ASSERT(PartitionIndex < Datapath->PartitionCount); - SocketContext->DatapathPartition = &Datapath->Partitions[PartitionIndex]; - CxPlatRefIncrement(&SocketContext->DatapathPartition->RefCount); + SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; + SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; + SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; if (!CxPlatSqeInitialize( SocketContext->DatapathPartition->EventQ, @@ -632,10 +618,53 @@ CxPlatSocketContextInitialize( } FlushTxInitialized = TRUE; + SocketContext->SqeInitialized = TRUE; + +Exit: + + if (QUIC_FAILED(Status)) { + if (ShutdownSqeInitialized) { + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe); + } + if (IoSqeInitialized) { + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe); + } + if (FlushTxInitialized) { + CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe); + } + } + + return Status; +} + +// +// Socket context interface. It abstracts a (generally per-processor) UDP socket +// and the corresponding logic/functionality like send and receive processing. +// +QUIC_STATUS +CxPlatSocketContextInitialize( + _Inout_ CXPLAT_SOCKET_CONTEXT* SocketContext, + _In_ const CXPLAT_UDP_CONFIG* Config, + _In_ const uint16_t PartitionIndex, + _In_ CXPLAT_SOCKET_TYPE SocketType + ) +{ + QUIC_STATUS Status = QUIC_STATUS_SUCCESS; + int Result = 0; + int Option = 0; + QUIC_ADDR MappedAddress = {0}; + socklen_t AssignedLocalAddressLength = 0; + + CXPLAT_SOCKET* Binding = SocketContext->Binding; + CXPLAT_DATAPATH* Datapath = Binding->Datapath; - if (SocketType == CXPLAT_SOCKET_TCP_SERVER) { - SocketContext->SqeInitialized = TRUE; - return Status; + CXPLAT_DBG_ASSERT(PartitionIndex < Datapath->PartitionCount); + SocketContext->DatapathPartition = &Datapath->Partitions[PartitionIndex]; + CxPlatRefIncrement(&SocketContext->DatapathPartition->RefCount); + + if (QUIC_FAILED(CxPlatSocketContextSqeInitialize(SocketContext)) || + SocketType == CXPLAT_SOCKET_TCP_SERVER) { + goto Exit; } // @@ -896,125 +925,112 @@ CxPlatSocketContextInitialize( MappedAddress.Ipv6.sin6_family = AF_INET6; } - if (SocketType != CXPLAT_SOCKET_TCP_SERVER) { + Result = + bind( + SocketContext->SocketFd, + &MappedAddress.Ip, + sizeof(MappedAddress)); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "bind failed"); + goto Exit; + } + + QUIC_ADDR_STR LocalAddressStr; + QUIC_ADDR_STR RemoteAddressStr; + QuicAddrToString(&MappedAddress, &LocalAddressStr); + + if (Config->RemoteAddress != NULL) { + CxPlatZeroMemory(&MappedAddress, sizeof(MappedAddress)); + CxPlatConvertToMappedV6(Config->RemoteAddress, &MappedAddress); + + if (MappedAddress.Ipv6.sin6_family == QUIC_ADDRESS_FAMILY_INET6) { + MappedAddress.Ipv6.sin6_family = AF_INET6; + } + QuicAddrToString(&MappedAddress, &RemoteAddressStr); Result = - bind( + connect( SocketContext->SocketFd, &MappedAddress.Ip, sizeof(MappedAddress)); - if (Result == SOCKET_ERROR) { + if (Result == SOCKET_ERROR && errno != EINPROGRESS) { Status = errno; QuicTraceEvent( DatapathErrorStatus, "[data][%p] ERROR, %u, %s.", Binding, Status, - "bind failed"); + "connect failed"); goto Exit; } + Binding->Connected = SocketType != CXPLAT_SOCKET_TCP; + } - QUIC_ADDR_STR LocalAddressStr; - QUIC_ADDR_STR RemoteAddressStr; - QuicAddrToString(&MappedAddress, &LocalAddressStr); - - if (Config->RemoteAddress != NULL) { - CxPlatZeroMemory(&MappedAddress, sizeof(MappedAddress)); - CxPlatConvertToMappedV6(Config->RemoteAddress, &MappedAddress); - - if (MappedAddress.Ipv6.sin6_family == QUIC_ADDRESS_FAMILY_INET6) { - MappedAddress.Ipv6.sin6_family = AF_INET6; - } - QuicAddrToString(&MappedAddress, &RemoteAddressStr); - Result = - connect( - SocketContext->SocketFd, - &MappedAddress.Ip, - sizeof(MappedAddress)); - if (Result == SOCKET_ERROR && errno != EINPROGRESS) { - Status = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - Status, - "connect failed"); - goto Exit; - } - Binding->Connected = SocketType != CXPLAT_SOCKET_TCP; - } + // + // If no specific local port was indicated, then the stack just + // assigned this socket a port. We need to query it and use it for + // all the other sockets we are going to create. + // + AssignedLocalAddressLength = sizeof(Binding->LocalAddress); + Result = + getsockname( + SocketContext->SocketFd, + (struct sockaddr *)&Binding->LocalAddress, + &AssignedLocalAddressLength); + if (Result == SOCKET_ERROR) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + Binding, + Status, + "getsockname failed"); + goto Exit; + } +#if DEBUG + if (Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port != 0) { + CXPLAT_DBG_ASSERT(Config->LocalAddress->Ipv4.sin_port == Binding->LocalAddress.Ipv4.sin_port); + } else if (Config->RemoteAddress && Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port == 0) { // - // If no specific local port was indicated, then the stack just - // assigned this socket a port. We need to query it and use it for - // all the other sockets we are going to create. + // A client socket being assigned the same port as a remote socket causes issues later + // in the datapath and binding paths. Check to make sure this case was not given to us. // - AssignedLocalAddressLength = sizeof(Binding->LocalAddress); + CXPLAT_DBG_ASSERT(Binding->LocalAddress.Ipv4.sin_port != Config->RemoteAddress->Ipv4.sin_port); + } +#endif + + if (Binding->LocalAddress.Ipv6.sin6_family == AF_INET6) { + Binding->LocalAddress.Ipv6.sin6_family = QUIC_ADDRESS_FAMILY_INET6; + } + + if (SocketType == CXPLAT_SOCKET_TCP_LISTENER) { Result = - getsockname( + listen( SocketContext->SocketFd, - (struct sockaddr *)&Binding->LocalAddress, - &AssignedLocalAddressLength); + 100); if (Result == SOCKET_ERROR) { - Status = errno; + int error = errno; QuicTraceEvent( DatapathErrorStatus, "[data][%p] ERROR, %u, %s.", Binding, - Status, - "getsockname failed"); + error, + "listen"); goto Exit; } - -#if DEBUG - if (Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port != 0) { - CXPLAT_DBG_ASSERT(Config->LocalAddress->Ipv4.sin_port == Binding->LocalAddress.Ipv4.sin_port); - } else if (Config->RemoteAddress && Config->LocalAddress && Config->LocalAddress->Ipv4.sin_port == 0) { - // - // A client socket being assigned the same port as a remote socket causes issues later - // in the datapath and binding paths. Check to make sure this case was not given to us. - // - CXPLAT_DBG_ASSERT(Binding->LocalAddress.Ipv4.sin_port != Config->RemoteAddress->Ipv4.sin_port); - } -#endif - - if (Binding->LocalAddress.Ipv6.sin6_family == AF_INET6) { - Binding->LocalAddress.Ipv6.sin6_family = QUIC_ADDRESS_FAMILY_INET6; - } - - if (SocketType == CXPLAT_SOCKET_TCP_LISTENER) { - Result = - listen( - SocketContext->SocketFd, - 100); - if (Result == SOCKET_ERROR) { - int error = errno; - QuicTraceEvent( - DatapathErrorStatus, - "[data][%p] ERROR, %u, %s.", - Binding, - error, - "listen"); - goto Exit; - } - } } - SocketContext->SqeInitialized = TRUE; - Exit: if (QUIC_FAILED(Status)) { close(SocketContext->SocketFd); SocketContext->SocketFd = INVALID_SOCKET; - if (ShutdownSqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->ShutdownSqe.Sqe); - } - if (IoSqeInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->IoSqe.Sqe); - } - if (FlushTxInitialized) { - CxPlatSqeCleanup(SocketContext->DatapathPartition->EventQ, &SocketContext->FlushTxSqe.Sqe); - } } return Status; @@ -1211,9 +1227,6 @@ SocketCreateUdp( for (uint32_t i = 0; i < SocketCount; i++) { Binding->SocketContexts[i].Binding = Binding; Binding->SocketContexts[i].SocketFd = INVALID_SOCKET; - Binding->SocketContexts[i].ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; - Binding->SocketContexts[i].IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; - Binding->SocketContexts[i].FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; CxPlatListInitializeHead(&Binding->SocketContexts[i].TxQueue); CxPlatLockInitialize(&Binding->SocketContexts[i].TxQueueLock); CxPlatRundownInitialize(&Binding->SocketContexts[i].UpcallRundown); @@ -1329,9 +1342,6 @@ CxPlatSocketCreateTcpInternal( SocketContext = &Binding->SocketContexts[0]; SocketContext->Binding = Binding; SocketContext->SocketFd = INVALID_SOCKET; - SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; - SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; - SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; CxPlatListInitializeHead(&SocketContext->TxQueue); CxPlatLockInitialize(&SocketContext->TxQueueLock); CxPlatRundownInitialize(&SocketContext->UpcallRundown); @@ -1349,10 +1359,6 @@ CxPlatSocketCreateTcpInternal( if (QUIC_FAILED(Status)) { goto Exit; } - if (Binding->Type == CXPLAT_SOCKET_TCP_SERVER) { - *NewBinding = Binding; - return Status; - } if (IsServerSocket) { // @@ -1364,6 +1370,12 @@ CxPlatSocketCreateTcpInternal( (void)CxPlatSocketConfigureRss(SocketContext, 1); } + if (Type == CXPLAT_SOCKET_TCP_SERVER) { + *NewBinding = Binding; + Binding = NULL; + goto Exit; + } + CxPlatConvertFromMappedV6(&Binding->LocalAddress, &Binding->LocalAddress); Binding->LocalAddress.Ipv6.sin6_scope_id = 0; @@ -1465,9 +1477,6 @@ SocketCreateTcpListener( SocketContext = &Binding->SocketContexts[0]; SocketContext->Binding = Binding; SocketContext->SocketFd = INVALID_SOCKET; - SocketContext->ShutdownSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_SHUTDOWN; - SocketContext->IoSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_IO; - SocketContext->FlushTxSqe.CqeType = CXPLAT_CQE_TYPE_SOCKET_FLUSH_TX; CxPlatListInitializeHead(&SocketContext->TxQueue); CxPlatLockInitialize(&SocketContext->TxQueueLock); CxPlatRundownInitialize(&SocketContext->UpcallRundown); @@ -1512,10 +1521,6 @@ CxPlatSocketContextAcceptCompletion( UNREFERENCED_PARAMETER(Cqe); CXPLAT_DATAPATH* Datapath = SocketContext->Binding->Datapath; - if (!CxPlatRundownAcquire(&SocketContext->UpcallRundown)) { - return; - } - QUIC_STATUS Status = CxPlatSocketCreateTcpInternal( Datapath, @@ -1529,6 +1534,17 @@ CxPlatSocketContextAcceptCompletion( } SocketContext->AcceptSocket->SocketContexts[0].SocketFd = accept(SocketContext->SocketFd, NULL, NULL); + if (SocketContext->AcceptSocket->SocketContexts[0].SocketFd == INVALID_SOCKET) { + Status = errno; + QuicTraceEvent( + DatapathErrorStatus, + "[data][%p] ERROR, %u, %s.", + SocketContext->Binding, + Status, + "accept failed"); + fprintf(stderr, "Accept failure: %d\n", Status); + goto Error; + } CxPlatSocketContextSetEvents(&SocketContext->AcceptSocket->SocketContexts[0], EPOLL_CTL_ADD, EPOLLIN); SocketContext->AcceptSocket->SocketContexts[0].IoStarted = TRUE; @@ -1546,8 +1562,6 @@ CxPlatSocketContextAcceptCompletion( SocketDelete(SocketContext->AcceptSocket); SocketContext->AcceptSocket = NULL; } - - CxPlatRundownRelease(&SocketContext->UpcallRundown); } void @@ -1559,17 +1573,10 @@ CxPlatSocketContextConnectCompletion( UNREFERENCED_PARAMETER(Cqe); CXPLAT_DATAPATH* Datapath = SocketContext->Binding->Datapath; - if (!CxPlatRundownAcquire(&SocketContext->UpcallRundown)) { - return; - } - Datapath->TcpHandlers.Connect( SocketContext->Binding, SocketContext->Binding->ClientContext, TRUE); - // TODO: error case? - - CxPlatRundownRelease(&SocketContext->UpcallRundown); } void