Skip to content

Commit 86477a5

Browse files
authored
H2 stream receives DATA (#221)
Hook up DATA decoder callbacks so they go through connection and stream to the end-user.
1 parent 7b23eea commit 86477a5

File tree

9 files changed

+210
-4
lines changed

9 files changed

+210
-4
lines changed

include/aws/http/private/connection_impl.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,6 @@ struct aws_http_connection {
6666
struct aws_channel_slot *channel_slot;
6767
struct aws_allocator *alloc;
6868
enum aws_http_version http_version;
69-
size_t initial_window_size;
7069

7170
aws_http_proxy_request_transform_fn *proxy_request_transform;
7271
void *user_data;

include/aws/http/private/h2_stream.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ int aws_h2_stream_on_decoder_headers_end(
9393
bool malformed,
9494
enum aws_http_header_block block_type);
9595

96+
int aws_h2_stream_on_decoder_data(struct aws_h2_stream *stream, struct aws_byte_cursor data);
9697
int aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *stream);
9798

9899
int aws_h2_stream_activate(struct aws_http_stream *stream);

source/h1_connection.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ static const struct aws_h1_decoder_vtable s_h1_decoder_vtable = {
113113
struct h1_connection {
114114
struct aws_http_connection base;
115115

116+
size_t initial_window_size;
117+
116118
/* Single task used repeatedly for sending data from streams. */
117119
struct aws_channel_task outgoing_stream_task;
118120

@@ -1257,7 +1259,6 @@ static struct h1_connection *s_connection_new(
12571259
connection->base.channel_handler.alloc = alloc;
12581260
connection->base.channel_handler.impl = connection;
12591261
connection->base.http_version = AWS_HTTP_VERSION_1_1;
1260-
connection->base.initial_window_size = initial_window_size;
12611262
connection->base.manual_window_management = manual_window_management;
12621263

12631264
/* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/
@@ -1266,6 +1267,8 @@ static struct h1_connection *s_connection_new(
12661267
/* 1 refcount for user */
12671268
aws_atomic_init_int(&connection->base.refcount, 1);
12681269

1270+
connection->initial_window_size = initial_window_size;
1271+
12691272
aws_h1_encoder_init(&connection->thread_data.encoder, alloc);
12701273

12711274
aws_channel_task_init(
@@ -1825,7 +1828,7 @@ static int s_handler_shutdown(
18251828

18261829
static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
18271830
struct h1_connection *connection = handler->impl;
1828-
return connection->base.initial_window_size;
1831+
return connection->initial_window_size;
18291832
}
18301833

18311834
static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {

source/h2_connection.c

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ static int s_decoder_on_headers_end(
7474
bool malformed,
7575
enum aws_http_header_block block_type,
7676
void *userdata);
77+
static int s_decoder_on_data(uint32_t stream_id, struct aws_byte_cursor data, void *userdata);
7778
static int s_decoder_on_end_stream(uint32_t stream_id, void *userdata);
7879
static int s_decoder_on_ping(uint8_t opaque_data[AWS_H2_PING_DATA_SIZE], void *userdata);
7980
static int s_decoder_on_settings(
@@ -107,6 +108,7 @@ static const struct aws_h2_decoder_vtable s_h2_decoder_vtable = {
107108
.on_headers_begin = s_decoder_on_headers_begin,
108109
.on_headers_i = s_decoder_on_headers_i,
109110
.on_headers_end = s_decoder_on_headers_end,
111+
.on_data = s_decoder_on_data,
110112
.on_end_stream = s_decoder_on_end_stream,
111113
.on_ping = s_decoder_on_ping,
112114
.on_settings = s_decoder_on_settings,
@@ -187,6 +189,7 @@ static struct aws_h2_connection *s_connection_new(
187189
bool server) {
188190

189191
(void)server;
192+
(void)initial_window_size; /* #TODO use this for our initial settings */
190193

191194
struct aws_h2_connection *connection = aws_mem_calloc(alloc, 1, sizeof(struct aws_h2_connection));
192195
if (!connection) {
@@ -199,7 +202,6 @@ static struct aws_h2_connection *s_connection_new(
199202
connection->base.channel_handler.alloc = alloc;
200203
connection->base.channel_handler.impl = connection;
201204
connection->base.http_version = AWS_HTTP_VERSION_2;
202-
connection->base.initial_window_size = initial_window_size;
203205
/* Init the next stream id (server must use even ids, client odd [RFC 7540 5.1.1])*/
204206
connection->base.next_stream_id = (server ? 2 : 1);
205207
connection->base.manual_window_management = manual_window_management;
@@ -713,6 +715,26 @@ int s_decoder_on_headers_end(
713715
return AWS_OP_SUCCESS;
714716
}
715717

718+
int s_decoder_on_data(uint32_t stream_id, struct aws_byte_cursor data, void *userdata) {
719+
struct aws_h2_connection *connection = userdata;
720+
721+
/* #TODO Update connection's flow-control window */
722+
723+
/* Pass data to stream */
724+
struct aws_h2_stream *stream;
725+
if (s_get_active_stream_for_incoming_frame(connection, stream_id, AWS_H2_FRAME_T_DATA, &stream)) {
726+
return AWS_OP_ERR;
727+
}
728+
729+
if (stream) {
730+
if (aws_h2_stream_on_decoder_data(stream, data)) {
731+
return AWS_OP_ERR;
732+
}
733+
}
734+
735+
return AWS_OP_SUCCESS;
736+
}
737+
716738
int s_decoder_on_end_stream(uint32_t stream_id, void *userdata) {
717739
struct aws_h2_connection *connection = userdata;
718740

source/h2_stream.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,32 @@ int aws_h2_stream_on_decoder_headers_end(
430430
return AWS_OP_SUCCESS;
431431
}
432432

433+
int aws_h2_stream_on_decoder_data(struct aws_h2_stream *stream, struct aws_byte_cursor data) {
434+
AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
435+
436+
if (s_check_state_allows_frame_type(stream, AWS_H2_FRAME_T_DATA)) {
437+
return s_send_rst_and_close_stream(stream, aws_last_error());
438+
}
439+
440+
if (!stream->thread_data.received_main_headers) {
441+
/* #TODO Not 100% sure whether this is Stream Error or Connection Error. */
442+
AWS_H2_STREAM_LOG(ERROR, stream, "Malformed message, received DATA before main HEADERS");
443+
return s_send_rst_and_close_stream(stream, AWS_ERROR_HTTP_PROTOCOL_ERROR);
444+
}
445+
446+
/* #TODO Update stream's flow-control window */
447+
448+
if (stream->base.on_incoming_body) {
449+
if (stream->base.on_incoming_body(&stream->base, &data, stream->base.user_data)) {
450+
AWS_H2_STREAM_LOGF(
451+
ERROR, stream, "Incoming body callback raised error, %s", aws_error_name(aws_last_error()));
452+
return AWS_OP_ERR;
453+
}
454+
}
455+
456+
return AWS_OP_SUCCESS;
457+
}
458+
433459
int aws_h2_stream_on_decoder_end_stream(struct aws_h2_stream *stream) {
434460
AWS_PRECONDITION_ON_CHANNEL_THREAD(stream);
435461

tests/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,8 @@ add_test_case(h2_client_stream_ignores_some_frames_received_soon_after_closing)
339339
#TODO add_test_case(h2_client_stream_err_receive_info_headers_after_main)
340340
#TODO add_test_case(h2_client_stream_receive_trailing_headers)
341341
#TODO add_test_case(h2_client_stream_err_receive_trailing_before_main)
342+
add_test_case(h2_client_stream_receive_data)
343+
add_test_case(h2_client_stream_err_receive_data_before_headers)
342344

343345

344346
add_test_case(server_new_destroy)

tests/h2_test_helper.c

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
#include "h2_test_helper.h"
1717

1818
#include <aws/http/private/h2_decoder.h>
19+
#include <aws/io/stream.h>
1920
#include <aws/testing/io_testing_channel.h>
2021

2122
/*******************************************************************************
@@ -522,6 +523,35 @@ int h2_fake_peer_send_frame(struct h2_fake_peer *peer, struct aws_h2_frame *fram
522523
return AWS_OP_SUCCESS;
523524
}
524525

526+
int h2_fake_peer_send_data_frame(
527+
struct h2_fake_peer *peer,
528+
uint32_t stream_id,
529+
struct aws_byte_cursor data,
530+
bool end_stream) {
531+
532+
struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(peer->alloc, &data);
533+
ASSERT_NOT_NULL(body_stream);
534+
535+
struct aws_io_message *msg = aws_channel_acquire_message_from_pool(
536+
peer->testing_channel->channel, AWS_IO_MESSAGE_APPLICATION_DATA, g_aws_channel_max_fragment_size);
537+
ASSERT_NOT_NULL(msg);
538+
539+
bool body_complete;
540+
ASSERT_SUCCESS(aws_h2_encode_data_frame(
541+
&peer->encoder, stream_id, body_stream, end_stream, 0, &msg->message_data, &body_complete));
542+
543+
ASSERT_TRUE(body_complete);
544+
ASSERT_TRUE(msg->message_data.len != 0);
545+
546+
ASSERT_SUCCESS(testing_channel_push_read_message(peer->testing_channel, msg));
547+
aws_input_stream_destroy(body_stream);
548+
return AWS_OP_SUCCESS;
549+
}
550+
551+
int h2_fake_peer_send_data_frame_str(struct h2_fake_peer *peer, uint32_t stream_id, const char *data, bool end_stream) {
552+
return h2_fake_peer_send_data_frame(peer, stream_id, aws_byte_cursor_from_c_str(data), end_stream);
553+
}
554+
525555
int h2_fake_peer_send_connection_preface(struct h2_fake_peer *peer, struct aws_h2_frame *settings) {
526556
if (!peer->is_server) {
527557
/* Client must first send magic string */

tests/h2_test_helper.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,22 @@ int h2_fake_peer_decode_messages_from_testing_channel(struct h2_fake_peer *peer)
147147
*/
148148
int h2_fake_peer_send_frame(struct h2_fake_peer *peer, struct aws_h2_frame *frame);
149149

150+
/**
151+
* Encode the entire byte cursor into a single DATA frame.
152+
* Fails if the cursor is too large for this to work.
153+
*/
154+
int h2_fake_peer_send_data_frame(
155+
struct h2_fake_peer *peer,
156+
uint32_t stream_id,
157+
struct aws_byte_cursor data,
158+
bool end_stream);
159+
160+
/**
161+
* Encode the entire string into a single DATA frame.
162+
* Fails if the string is too large for this to work.
163+
*/
164+
int h2_fake_peer_send_data_frame_str(struct h2_fake_peer *peer, uint32_t stream_id, const char *data, bool end_stream);
165+
150166
/**
151167
* Peer sends the connection preface with specified settings.
152168
* Takes ownership of frame and destroys after sending

tests/test_h2_client.c

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -451,3 +451,110 @@ TEST_CASE(h2_client_stream_ignores_some_frames_received_soon_after_closing) {
451451
client_stream_tester_clean_up(&stream_tester);
452452
return s_tester_clean_up();
453453
}
454+
455+
/* Test receiving a response with DATA frames */
456+
TEST_CASE(h2_client_stream_receive_data) {
457+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
458+
459+
/* fake peer sends connection preface */
460+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
461+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
462+
463+
/* send request */
464+
struct aws_http_message *request = aws_http_message_new_request(allocator);
465+
ASSERT_NOT_NULL(request);
466+
467+
struct aws_http_header request_headers_src[] = {
468+
DEFINE_HEADER(":method", "GET"),
469+
DEFINE_HEADER(":scheme", "https"),
470+
DEFINE_HEADER(":path", "/"),
471+
};
472+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
473+
474+
struct client_stream_tester stream_tester;
475+
ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, request));
476+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
477+
478+
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);
479+
480+
/* fake peer sends response headers */
481+
struct aws_http_header response_headers_src[] = {
482+
DEFINE_HEADER(":status", "200"),
483+
};
484+
485+
struct aws_http_headers *response_headers = aws_http_headers_new(allocator);
486+
aws_http_headers_add_array(response_headers, response_headers_src, AWS_ARRAY_SIZE(response_headers_src));
487+
488+
struct aws_h2_frame *response_frame =
489+
aws_h2_frame_new_headers(allocator, stream_id, response_headers, false /*end_stream*/, 0, NULL);
490+
ASSERT_SUCCESS(h2_fake_peer_send_frame(&s_tester.peer, response_frame));
491+
492+
/* fake peer sends response body */
493+
const char *body_src = "hello";
494+
ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, body_src, true /*end_stream*/));
495+
496+
/* validate that client received complete response */
497+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
498+
ASSERT_TRUE(stream_tester.complete);
499+
ASSERT_INT_EQUALS(AWS_ERROR_SUCCESS, stream_tester.on_complete_error_code);
500+
ASSERT_INT_EQUALS(200, stream_tester.response_status);
501+
ASSERT_SUCCESS(s_compare_headers(response_headers, stream_tester.response_headers));
502+
ASSERT_TRUE(aws_byte_buf_eq_c_str(&stream_tester.response_body, body_src));
503+
504+
ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection));
505+
506+
/* clean up */
507+
aws_http_headers_release(response_headers);
508+
aws_http_message_release(request);
509+
client_stream_tester_clean_up(&stream_tester);
510+
return s_tester_clean_up();
511+
}
512+
513+
/* A message is malformed if DATA is received before HEADERS */
514+
TEST_CASE(h2_client_stream_err_receive_data_before_headers) {
515+
ASSERT_SUCCESS(s_tester_init(allocator, ctx));
516+
517+
/* fake peer sends connection preface */
518+
ASSERT_SUCCESS(h2_fake_peer_send_connection_preface_default_settings(&s_tester.peer));
519+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
520+
521+
/* send request */
522+
struct aws_http_message *request = aws_http_message_new_request(allocator);
523+
ASSERT_NOT_NULL(request);
524+
525+
struct aws_http_header request_headers_src[] = {
526+
DEFINE_HEADER(":method", "GET"),
527+
DEFINE_HEADER(":scheme", "https"),
528+
DEFINE_HEADER(":path", "/"),
529+
};
530+
aws_http_message_add_header_array(request, request_headers_src, AWS_ARRAY_SIZE(request_headers_src));
531+
532+
struct client_stream_tester stream_tester;
533+
ASSERT_SUCCESS(s_stream_tester_init(&stream_tester, request));
534+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
535+
536+
uint32_t stream_id = aws_http_stream_get_id(stream_tester.stream);
537+
538+
/* fake peer sends response body BEFORE any response headers */
539+
const char *body_src = "hello";
540+
ASSERT_SUCCESS(h2_fake_peer_send_data_frame_str(&s_tester.peer, stream_id, body_src, true /*end_stream*/));
541+
542+
/* validate that stream completed with error */
543+
testing_channel_drain_queued_tasks(&s_tester.testing_channel);
544+
ASSERT_TRUE(stream_tester.complete);
545+
ASSERT_INT_EQUALS(AWS_ERROR_HTTP_PROTOCOL_ERROR, stream_tester.on_complete_error_code);
546+
547+
/* a stream error should not affect the connection */
548+
ASSERT_TRUE(aws_http_connection_is_open(s_tester.connection));
549+
550+
/* validate that stream sent RST_STREAM */
551+
ASSERT_SUCCESS(h2_fake_peer_decode_messages_from_testing_channel(&s_tester.peer));
552+
struct h2_decoded_frame *rst_stream_frame = h2_decode_tester_latest_frame(&s_tester.peer.decode);
553+
ASSERT_INT_EQUALS(AWS_H2_FRAME_T_RST_STREAM, rst_stream_frame->type);
554+
ASSERT_UINT_EQUALS(AWS_H2_ERR_PROTOCOL_ERROR, rst_stream_frame->error_code);
555+
556+
/* clean up */
557+
aws_http_message_release(request);
558+
client_stream_tester_clean_up(&stream_tester);
559+
return s_tester_clean_up();
560+
}

0 commit comments

Comments
 (0)