-
Notifications
You must be signed in to change notification settings - Fork 131
/
Copy pathCurlMulti.cpp
289 lines (244 loc) · 9.33 KB
/
CurlMulti.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
#include "pch.h"
#include "CurlMulti.h"
#include "CurlProvider.h"
namespace xbox
{
namespace httpclient
{
// XCurl doesn't support curl_multi_timeout, so use a small, fixed delay between calls to curl_multi_perform
#define PERFORM_DELAY_MS 50
#define POLL_TIMEOUT_MS 0
Result<HC_UNIQUE_PTR<CurlMulti>> CurlMulti::Initialize(XTaskQueuePortHandle workPort)
{
assert(workPort);
http_stl_allocator<CurlMulti> a{};
HC_UNIQUE_PTR<CurlMulti> multi{ new (a.allocate(1)) CurlMulti };
multi->m_curlMultiHandle = curl_multi_init();
if (!multi->m_curlMultiHandle)
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::Initialize: curl_multi_init failed");
return E_FAIL;
}
RETURN_IF_FAILED(XTaskQueueCreateComposite(workPort, workPort, &multi->m_queue));
return Result<HC_UNIQUE_PTR<CurlMulti>>{ std::move(multi) };
}
CurlMulti::~CurlMulti()
{
if (m_queue)
{
XTaskQueueCloseHandle(m_queue);
}
if (!m_easyRequests.empty())
{
HC_TRACE_WARNING(HTTPCLIENT, "CurlMulti::~XCurlMulti: Failing all active requests.");
FailAllRequests(E_UNEXPECTED);
}
if (m_curlMultiHandle)
{
curl_multi_cleanup(m_curlMultiHandle);
}
}
HRESULT CurlMulti::AddRequest(HC_UNIQUE_PTR<CurlEasyRequest> easyRequest)
{
std::unique_lock<std::mutex> lock{ m_mutex };
if (m_cleanupAsyncBlock)
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::AddRequest: request added after cleanup");
return E_FAIL;
}
auto result = curl_multi_add_handle(m_curlMultiHandle, easyRequest->Handle());
if (result != CURLM_OK)
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::AddRequest: curl_multi_add_handle failed with CURLCode=%u", result);
return HrFromCurlm(result);
}
m_easyRequests.emplace(easyRequest->Handle(), std::move(easyRequest));
// Schedule the perform callback immediately
ScheduleTaskQueueCallback(std::move(lock), 0);
return S_OK;
}
HRESULT CurlMulti::CleanupAsync(HC_UNIQUE_PTR<CurlMulti> multi, XAsyncBlock* async)
{
RETURN_IF_FAILED(XAsyncBegin(async, multi.get(), __FUNCTION__, __FUNCTION__, CleanupAsyncProvider));
multi.release();
return S_OK;
}
HRESULT CALLBACK CurlMulti::CleanupAsyncProvider(XAsyncOp op, const XAsyncProviderData* data)
{
CurlMulti* multi = static_cast<CurlMulti*>(data->context);
switch (op)
{
case XAsyncOp::Begin:
{
std::unique_lock<std::mutex> lock{ multi->m_mutex };
assert(multi->m_cleanupAsyncBlock == nullptr);
multi->m_cleanupAsyncBlock = data->async;
// If there are no pending task queue callbacks (and thus no pending HTTP requets), schedule cleanup immediately.
// If this condition is true, we're also guaranteed to be the final remaining reference to the CurlMulti.
bool cleanupImmediately = multi->m_taskQueueCallbacksPending == 0;
XTaskQueueHandle queue = multi->m_queue;
// Release the lock before going any further because both cleanup paths can lead to DoWork being scheduled and run before
// XAsyncOp::Begin completes and releases the lock naturally. Because DoWork destroys the CurlMulti object, its no longer
// safe to access the multi object after this point.
multi = nullptr; // Set to null to avoid accidental use after this point
lock.unlock();
if (cleanupImmediately)
{
RETURN_IF_FAILED(XAsyncSchedule(data->async, 0));
}
else
{
// Terminate the XTaskQueue. Cleanup will be completed after completing remaining HTTP requests
RETURN_IF_FAILED(XTaskQueueTerminate(queue, false, nullptr, nullptr));
}
return S_OK;
}
case XAsyncOp::DoWork:
{
assert(multi->m_easyRequests.empty());
HC_UNIQUE_PTR<CurlMulti> reclaim{ multi };
// Ensure CurlMulti is destroyed (and thus curl_multi_cleanup is called) before completing asyncBlock
reclaim.reset();
XAsyncComplete(data->async, S_OK, 0);
return S_OK;
}
default:
{
return S_OK;
}
}
}
void CurlMulti::ScheduleTaskQueueCallback(std::unique_lock<std::mutex>&& lock, uint32_t delay)
{
assert(lock.owns_lock());
m_taskQueueCallbacksPending++;
lock.unlock();
HRESULT hr = XTaskQueueSubmitDelayedCallback(m_queue, XTaskQueuePort::Work, delay, this, CurlMulti::TaskQueueCallback);
if (FAILED(hr))
{
// Treat errors scheduling the callback as cancellations by synchronously calling 'TaskQueueCallback' with canceled=true.
// Pending requests will be completed with an E_ABORT failure and m_taskQueueCallbacksPending will be appropriatly updated.
TaskQueueCallback(this, true);
HC_TRACE_WARNING_HR(HTTPCLIENT, hr, "CurlMulti::ScheduleTaskQueueCallback: XTaskQueueSubmitDelayedCallback failed");
}
}
void CALLBACK CurlMulti::TaskQueueCallback(_In_opt_ void* context, _In_ bool canceled) noexcept
{
assert(context);
XAsyncBlock* cleanupAsyncBlock{ nullptr };
{
auto multi = static_cast<CurlMulti*>(context);
if (!canceled)
{
HRESULT hr = multi->Perform();
if (FAILED(hr))
{
HC_TRACE_ERROR_HR(HTTPCLIENT, hr, "CurlMulti::Perform failed. Failing all active requests.");
multi->FailAllRequests(hr);
}
}
else
{
multi->FailAllRequests(E_ABORT);
}
std::unique_lock<std::mutex> lock{ multi->m_mutex };
if (--multi->m_taskQueueCallbacksPending == 0 && multi->m_cleanupAsyncBlock)
{
// If CurlMulti::CleanupAsync was called and there are no remaining task queue callbacks, schedule cleanup now.
// We *MUST* schedule the cleanup outside of holding the lock though. Scheduling cleanup may free the CurlMulti
// object memory before we're done using it here and we don't want that to happen while we're holding the lock
// or still otherwise referencing the CurlMulti object memory
cleanupAsyncBlock = multi->m_cleanupAsyncBlock;
}
}
if (cleanupAsyncBlock)
{
// Must not reference CurlMulti object memory or hold CurlMulti object lock when scheduling this work.
HRESULT hr = XAsyncSchedule(cleanupAsyncBlock, 0);
if (FAILED(hr))
{
HC_TRACE_ERROR_HR(HTTPCLIENT, hr, "CurlMulti::TaskQueueCallback: Failed to schedule CleanupAsyncProvider!");
assert(false);
}
}
}
HRESULT CurlMulti::Perform() noexcept
{
std::unique_lock<std::mutex> lock{ m_mutex };
int runningRequests{ 0 };
CURLMcode result = curl_multi_perform(m_curlMultiHandle, &runningRequests);
if (result != CURLM_OK)
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::Perform: curl_multi_perform failed with CURLCode=%u", result);
return HrFromCurlm(result);
}
int remainingMessages{ 1 }; // assume there is at least 1 message so loop is always entered
while (remainingMessages)
{
CURLMsg* message = curl_multi_info_read(m_curlMultiHandle, &remainingMessages);
if (message)
{
switch (message->msg)
{
case CURLMSG_DONE:
{
auto requestIter = m_easyRequests.find(message->easy_handle);
assert(requestIter != m_easyRequests.end());
result = curl_multi_remove_handle(m_curlMultiHandle, message->easy_handle);
if (result != CURLM_OK)
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::Perform: curl_multi_remove_handle failed with CURLCode=%u", result);
}
requestIter->second->Complete(message->data.result);
m_easyRequests.erase(requestIter);
}
break;
case CURLMSG_NONE:
case CURLMSG_LAST:
default:
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::Perform: Unrecognized CURLMsg!");
assert(false);
}
break;
}
}
}
if (runningRequests)
{
// Reschedule Perform if there are still running requests
int workAvailable{ 0 };
#if HC_PLATFORM == HC_PLATFORM_GDK || CURL_AT_LEAST_VERSION(7,66,0)
result = curl_multi_poll(m_curlMultiHandle, nullptr, 0, POLL_TIMEOUT_MS, &workAvailable);
#else
result = curl_multi_wait(m_curlMultiHandle, nullptr, 0, POLL_TIMEOUT_MS, &workAvailable);
#endif
if (result != CURLM_OK)
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::Perform: curl_multi_poll failed with CURLCode=%u", result);
return HrFromCurlm(result);
}
uint32_t delay = workAvailable ? 0 : PERFORM_DELAY_MS;
ScheduleTaskQueueCallback(std::move(lock), delay);
}
return S_OK;
}
void CurlMulti::FailAllRequests(HRESULT hr) noexcept
{
std::unique_lock<std::mutex> lock{ m_mutex };
if (!m_easyRequests.empty())
{
for (auto& pair : m_easyRequests)
{
auto result = curl_multi_remove_handle(m_curlMultiHandle, pair.first);
if (FAILED(HrFromCurlm(result)))
{
HC_TRACE_ERROR(HTTPCLIENT, "CurlMulti::FailAllRequests: curl_multi_remove_handle failed with CURLCode=%u", result);
}
pair.second->Fail(hr);
}
m_easyRequests.clear();
}
}
} // httpclient
} // xbox