@@ -98,6 +98,25 @@ void binaryFramesEncoder(boolean mask) throws Exception {
98
98
client .close ();
99
99
}
100
100
101
+ @ Timeout (300 )
102
+ @ ValueSource (booleans = {true , false })
103
+ @ ParameterizedTest
104
+ void binaryFramesBulkEncoder (boolean mask ) throws Exception {
105
+ int maxFrameSize = 1000 ;
106
+ Channel s = server = nettyServer (new BinaryFramesTestServerHandler (), mask , false );
107
+ BinaryFramesEncoderClientBulkHandler clientHandler =
108
+ new BinaryFramesEncoderClientBulkHandler (maxFrameSize );
109
+ Channel client =
110
+ webSocketCallbacksClient (s .localAddress (), mask , true , maxFrameSize , clientHandler );
111
+
112
+ WebSocketFrameFactory .BulkEncoder encoder = clientHandler .onHandshakeCompleted ().join ();
113
+ Assertions .assertThat (encoder ).isNotNull ();
114
+
115
+ CompletableFuture <Void > onComplete = clientHandler .startFramesExchange ();
116
+ onComplete .join ();
117
+ client .close ();
118
+ }
119
+
101
120
@ Timeout (300 )
102
121
@ MethodSource ("maskingArgs" )
103
122
@ ParameterizedTest
@@ -444,6 +463,162 @@ protected void initChannel(SocketChannel ch) {
444
463
}
445
464
}
446
465
466
+ static class BinaryFramesEncoderClientBulkHandler
467
+ implements WebSocketCallbacksHandler , WebSocketFrameListener {
468
+ private final CompletableFuture <WebSocketFrameFactory .BulkEncoder > onHandshakeComplete =
469
+ new CompletableFuture <>();
470
+ private final CompletableFuture <Void > onFrameExchangeComplete = new CompletableFuture <>();
471
+ private WebSocketFrameFactory .BulkEncoder binaryFrameEncoder ;
472
+ private final int framesCount ;
473
+ private int receivedFrames ;
474
+ private int sentFrames ;
475
+ private ByteBuf outBuffer ;
476
+ private volatile ChannelHandlerContext ctx ;
477
+
478
+ BinaryFramesEncoderClientBulkHandler (int maxFrameSize ) {
479
+ this .framesCount = maxFrameSize ;
480
+ }
481
+
482
+ @ Override
483
+ public WebSocketFrameListener exchange (
484
+ ChannelHandlerContext ctx , WebSocketFrameFactory webSocketFrameFactory ) {
485
+ this .binaryFrameEncoder = webSocketFrameFactory .bulkEncoder ();
486
+ return this ;
487
+ }
488
+
489
+ @ Override
490
+ public void onChannelRead (
491
+ ChannelHandlerContext ctx , boolean finalFragment , int rsv , int opcode , ByteBuf payload ) {
492
+ if (!finalFragment ) {
493
+ onFrameExchangeComplete .completeExceptionally (
494
+ new AssertionError ("received non-final frame: " + finalFragment ));
495
+ payload .release ();
496
+ return ;
497
+ }
498
+ if (rsv != 0 ) {
499
+ onFrameExchangeComplete .completeExceptionally (
500
+ new AssertionError ("received frame with non-zero rsv: " + rsv ));
501
+ payload .release ();
502
+ return ;
503
+ }
504
+ if (opcode != WebSocketProtocol .OPCODE_BINARY ) {
505
+ onFrameExchangeComplete .completeExceptionally (
506
+ new AssertionError ("received non-binary frame: " + Long .toHexString (opcode )));
507
+ payload .release ();
508
+ return ;
509
+ }
510
+
511
+ int readableBytes = payload .readableBytes ();
512
+
513
+ int expectedSize = receivedFrames ;
514
+ if (expectedSize != readableBytes ) {
515
+ onFrameExchangeComplete .completeExceptionally (
516
+ new AssertionError (
517
+ "received frame of unexpected size: "
518
+ + expectedSize
519
+ + ", actual: "
520
+ + readableBytes ));
521
+ payload .release ();
522
+ return ;
523
+ }
524
+
525
+ for (int i = 0 ; i < readableBytes ; i ++) {
526
+ byte b = payload .readByte ();
527
+ if (b != (byte ) 0xFE ) {
528
+ onFrameExchangeComplete .completeExceptionally (
529
+ new AssertionError ("received frame with unexpected content: " + Long .toHexString (b )));
530
+ payload .release ();
531
+ return ;
532
+ }
533
+ }
534
+ payload .release ();
535
+ if (++receivedFrames == framesCount ) {
536
+ onFrameExchangeComplete .complete (null );
537
+ }
538
+ }
539
+
540
+ @ Override
541
+ public void onChannelWritabilityChanged (ChannelHandlerContext ctx ) {
542
+ boolean writable = ctx .channel ().isWritable ();
543
+ if (sentFrames > 0 && writable ) {
544
+ int toSend = framesCount - sentFrames ;
545
+ if (toSend > 0 ) {
546
+ sendFrames (ctx , toSend );
547
+ }
548
+ }
549
+ }
550
+
551
+ @ Override
552
+ public void onOpen (ChannelHandlerContext ctx ) {
553
+ this .ctx = ctx ;
554
+ int bufferSize = 4 * framesCount ;
555
+ this .outBuffer = ctx .alloc ().buffer (bufferSize , bufferSize );
556
+ onHandshakeComplete .complete (binaryFrameEncoder );
557
+ }
558
+
559
+ @ Override
560
+ public void onClose (ChannelHandlerContext ctx ) {
561
+ ByteBuf out = outBuffer ;
562
+ if (out != null ) {
563
+ outBuffer = null ;
564
+ out .release ();
565
+ }
566
+ if (!onFrameExchangeComplete .isDone ()) {
567
+ onFrameExchangeComplete .completeExceptionally (new ClosedChannelException ());
568
+ }
569
+ }
570
+
571
+ @ Override
572
+ public void onExceptionCaught (ChannelHandlerContext ctx , Throwable cause ) {
573
+ if (!onFrameExchangeComplete .isDone ()) {
574
+ onFrameExchangeComplete .completeExceptionally (cause );
575
+ }
576
+ }
577
+
578
+ CompletableFuture <WebSocketFrameFactory .BulkEncoder > onHandshakeCompleted () {
579
+ return onHandshakeComplete ;
580
+ }
581
+
582
+ CompletableFuture <Void > startFramesExchange () {
583
+ ChannelHandlerContext c = ctx ;
584
+ c .executor ().execute (() -> sendFrames (c , framesCount - sentFrames ));
585
+ return onFrameExchangeComplete ;
586
+ }
587
+
588
+ private void sendFrames (ChannelHandlerContext c , int toSend ) {
589
+ WebSocketFrameFactory .BulkEncoder frameEncoder = binaryFrameEncoder ;
590
+ for (int frameIdx = 0 ; frameIdx < toSend ; frameIdx ++) {
591
+ if (!c .channel ().isOpen ()) {
592
+ return ;
593
+ }
594
+ int payloadSize = sentFrames ;
595
+ int frameSize = frameEncoder .sizeofBinaryFrame (payloadSize );
596
+ ByteBuf out = outBuffer ;
597
+ if (frameSize > out .capacity () - out .writerIndex ()) {
598
+ int readableBytes = out .readableBytes ();
599
+ int bufferSize = 4 * framesCount ;
600
+ outBuffer = c .alloc ().buffer (bufferSize , bufferSize );
601
+ if (c .channel ().bytesBeforeUnwritable () < readableBytes ) {
602
+ c .writeAndFlush (out , c .voidPromise ());
603
+ if (!c .channel ().isWritable ()) {
604
+ return ;
605
+ }
606
+ } else {
607
+ c .write (out , c .voidPromise ());
608
+ }
609
+ out = outBuffer ;
610
+ }
611
+ int mask = frameEncoder .encodeBinaryFramePrefix (out , payloadSize );
612
+ for (int payloadIdx = 0 ; payloadIdx < payloadSize ; payloadIdx ++) {
613
+ out .writeByte (0xFE );
614
+ }
615
+ frameEncoder .maskBinaryFrame (out , mask , payloadSize );
616
+ sentFrames ++;
617
+ }
618
+ c .flush ();
619
+ }
620
+ }
621
+
447
622
static class BinaryFramesEncoderClientHandler
448
623
implements WebSocketCallbacksHandler , WebSocketFrameListener {
449
624
private final CompletableFuture <WebSocketFrameFactory .Encoder > onHandshakeComplete =
0 commit comments