Skip to content

Commit

Permalink
stream cancellation, close the QUIC stream
Browse files Browse the repository at this point in the history
- confusion about the nghttp3 API. We should close
  the QUIC stream on cancel and not use the nghttp3
  calls intended to be invoked when the QUIC stream
  was closed by the peer.
  • Loading branch information
icing committed May 9, 2024
1 parent 84784ac commit 60177af
Showing 1 changed file with 49 additions and 52 deletions.
101 changes: 49 additions & 52 deletions lib/vquic/curl_ngtcp2.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ struct h3_stream_ctx {

#define H3_STREAM_CTX(ctx,data) ((struct h3_stream_ctx *)(\
data? Curl_hash_offt_get(&(ctx)->streams, (data)->id) : NULL))
#define H3_STREAM_CTX_ID(ctx,id) ((struct h3_stream_ctx *)(\
Curl_hash_offt_get(&(ctx)->streams, (id))))

static void h3_stream_ctx_free(struct h3_stream_ctx *stream)
{
Expand Down Expand Up @@ -223,28 +225,37 @@ static CURLcode h3_data_setup(struct Curl_cfilter *cf,
return CURLE_OK;
}

static void cf_ngtcp2_stream_close(struct Curl_cfilter *cf,
struct Curl_easy *data,
struct h3_stream_ctx *stream)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
DEBUGASSERT(data);
DEBUGASSERT(stream);
if(!stream->closed && ctx->qconn && ctx->h3conn) {
CURLcode result;

nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
stream->closed = TRUE;
(void)ngtcp2_conn_shutdown_stream(ctx->qconn, 0, stream->id,
NGHTTP3_H3_REQUEST_CANCELLED);
result = cf_progress_egress(cf, data, NULL);
if(result)
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] cancel stream -> %d",
stream->id, result);
}
}

static void h3_data_done(struct Curl_cfilter *cf, struct Curl_easy *data)
{
struct cf_ngtcp2_ctx *ctx = cf->ctx;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
CURLcode result;

(void)cf;
if(stream) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] easy handle is done",
stream->id);
if(ctx->h3conn && !stream->closed) {
nghttp3_conn_shutdown_stream_read(ctx->h3conn, stream->id);
nghttp3_conn_close_stream(ctx->h3conn, stream->id,
NGHTTP3_H3_REQUEST_CANCELLED);
nghttp3_conn_set_stream_user_data(ctx->h3conn, stream->id, NULL);
ngtcp2_conn_set_stream_user_data(ctx->qconn, stream->id, NULL);
stream->closed = TRUE;
result = cf_progress_egress(cf, data, NULL);
if(result)
CURL_TRC_CF(data, cf, "data_done, flush egress -> %d", result);
}

cf_ngtcp2_stream_close(cf, data, stream);
Curl_hash_offt_remove(&ctx->streams, data->id);
}
}
Expand Down Expand Up @@ -472,42 +483,23 @@ static int cb_recv_stream_data(ngtcp2_conn *tconn, uint32_t flags,
nghttp3_ssize nconsumed;
int fin = (flags & NGTCP2_STREAM_DATA_FLAG_FIN) ? 1 : 0;
struct Curl_easy *data = stream_user_data;
struct h3_stream_ctx *stream = H3_STREAM_CTX(ctx, data);
(void)offset;
(void)data;

nconsumed =
nghttp3_conn_read_stream(ctx->h3conn, stream_id, buf, buflen, fin);
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd",
stream_id, buflen, nconsumed);
if(!data)
data = CF_DATA_CURRENT(cf);
if(data)
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] read_stream(len=%zu) -> %zd",
stream_id, buflen, nconsumed);
if(nconsumed < 0) {
struct Curl_easy *cdata = CF_DATA_CURRENT(cf);
bool fatal = cf_ngtcp2_h3_err_is_fatal((int)nconsumed);
/* consume all bytes */
ngtcp2_conn_extend_max_stream_offset(tconn, stream_id, buflen);
ngtcp2_conn_extend_max_offset(tconn, buflen);

if(fatal) {
CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] fatal nghttp3 error on "
"nghttp3_conn_read_stream() -> %" CURL_PRId64,
stream_id, (curl_int64_t)nconsumed);
struct h3_stream_ctx *stream = H3_STREAM_CTX_ID(ctx, stream_id);
if(data && stream) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] error on known stream, "
"reset=%d, closed=%d",
stream_id, stream->reset, stream->closed);
}
if(!data) {
CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] discard data for unknown "
"stream", stream_id);
return 0;
}
else if(NGHTTP3_ERR_H3_STREAM_CREATION_ERROR == (int)nconsumed) {
CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] ignoring stream created "
"by peer", stream_id);
return 0;
}
else if(stream && stream->reset) {
CURL_TRC_CF(cdata, cf, "[%" CURL_PRId64 "] discard data for reset "
"stream", stream_id);
return 0;
}
cf_ngtcp2_h3_err_set(cf, cdata, (int)nconsumed);
return NGTCP2_ERR_CALLBACK_FAILURE;
}

Expand Down Expand Up @@ -895,16 +887,13 @@ static void h3_xfer_write_resp(struct Curl_cfilter *cf,
{

/* If we already encountered an error, skip further writes */
if(!stream->xfer_result)
if(!stream->xfer_result) {
stream->xfer_result = Curl_xfer_write_resp(data, buf, blen, eos);
/* If the transfer write is errored, we do not want any more data */
if(stream->xfer_result) {
struct cf_ngtcp2_ctx *ctx = cf->ctx;
CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes "
"of data, cancelling stream",
stream->id, stream->xfer_result, blen);
nghttp3_conn_close_stream(ctx->h3conn, stream->id,
NGHTTP3_H3_REQUEST_CANCELLED);
/* If the transfer write is errored, we do not want any more data */
if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%"CURL_PRId64"] error %d writing %zu bytes "
"of data", stream->id, stream->xfer_result, blen);
}
}
}

Expand Down Expand Up @@ -1226,6 +1215,7 @@ static ssize_t cf_ngtcp2_recv(struct Curl_cfilter *cf, struct Curl_easy *data,

if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id);
cf_ngtcp2_stream_close(cf, data, stream);
*err = stream->xfer_result;
nread = -1;
goto out;
Expand Down Expand Up @@ -1527,6 +1517,13 @@ static ssize_t cf_ngtcp2_send(struct Curl_cfilter *cf, struct Curl_easy *data,
}
stream = H3_STREAM_CTX(ctx, data);
}
else if(stream->xfer_result) {
CURL_TRC_CF(data, cf, "[%" CURL_PRId64 "] xfer write failed", stream->id);
cf_ngtcp2_stream_close(cf, data, stream);
*err = stream->xfer_result;
sent = -1;
goto out;
}
else if(stream->upload_blocked_len) {
/* the data in `buf` has already been submitted or added to the
* buffers, but have been EAGAINed on the last invocation. */
Expand Down

0 comments on commit 60177af

Please sign in to comment.