Skip to content

Commit 3eb66b1

Browse files
committed
feat: support POST
1 parent bf6e268 commit 3eb66b1

File tree

2 files changed

+101
-7
lines changed

2 files changed

+101
-7
lines changed

sql/pg_net--0.1.sql

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,21 @@
11
create schema if not exists net;
22

3+
create domain http_method as text
4+
check (
5+
value ilike 'get' or
6+
value ilike 'post'
7+
);
8+
39
-- Store pending requests. The background worker reads from here
410
-- API: Private
511
create table net.http_request_queue(
612
id bigserial primary key,
13+
method http_method not null,
714
url text not null,
815
params jsonb not null,
916
headers jsonb not null,
17+
content_type text,
18+
body text,
1019
timeout_milliseconds int not null,
1120
-- Available for delete after this date
1221
delete_after timestamp not null
@@ -85,8 +94,8 @@ declare
8594
respone_rec net.http_response;
8695
begin
8796
-- Add to the request queue
88-
insert into net.http_request_queue(url, params, headers, timeout_milliseconds, delete_after)
89-
values (url, params, headers, timeout_milliseconds, timezone('utc', now()) + ttl)
97+
insert into net.http_request_queue(method, url, params, headers, content_type, body, timeout_milliseconds, delete_after)
98+
values ('GET', url, params, headers, null, null, timeout_milliseconds, timezone('utc', now()) + ttl)
9099
returning id
91100
into request_id;
92101

@@ -101,6 +110,53 @@ begin
101110
end
102111
$$;
103112

113+
-- Interface to make an async request
114+
-- API: Public
115+
create or replace function net.http_post(
116+
-- url for the request
117+
url text,
118+
-- ContentType of the POST request
119+
content_type text,
120+
-- body of the POST request
121+
body text,
122+
-- key/value pairs to be url encoded and appended to the `url`
123+
params jsonb DEFAULT '{}'::jsonb,
124+
-- key/values to be included in request headers
125+
headers jsonb DEFAULT '{}'::jsonb,
126+
-- the maximum number of milliseconds the request may take before being cancelled
127+
timeout_milliseconds int DEFAULT 1000,
128+
-- the minimum amount of time the response should be persisted
129+
ttl interval default '3 days',
130+
-- when `true`, return immediately. when `false` wait for the request to complete before returning
131+
async bool default true
132+
)
133+
-- request_id reference
134+
returns bigint
135+
strict
136+
volatile
137+
parallel safe
138+
language plpgsql
139+
as $$
140+
declare
141+
request_id bigint;
142+
respone_rec net.http_response;
143+
begin
144+
-- Add to the request queue
145+
insert into net.http_request_queue(method, url, params, headers, content_type, body, timeout_milliseconds, delete_after)
146+
values ('POST', url, params, headers, content_type, body, timeout_milliseconds, timezone('utc', now()) + ttl)
147+
returning id
148+
into request_id;
149+
150+
-- If request is async, return id immediately
151+
if async then
152+
return request_id;
153+
end if;
154+
155+
-- If sync, wait for the request to complete before returning
156+
perform net._await_response(request_id);
157+
return request_id;
158+
end
159+
$$;
104160

105161
-- Collect respones of an http request
106162
-- API: Public

src/worker.c

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "utils/jsonb.h"
2525

26+
#include <curl/curl.h>
2627
#include <curl/multi.h>
2728

2829
PG_MODULE_MAGIC;
@@ -102,7 +103,7 @@ header_cb(void *contents, size_t size, size_t nmemb, void *userp)
102103
return realsize;
103104
}
104105

105-
static int init(CURLM *cm, char *url, int64 id, HTAB *curlDataMap)
106+
static int init(CURLM *cm, char *method, char *url, char *contentType, char *reqBody, int64 id, HTAB *curlDataMap)
106107
{
107108
CURL *eh = curl_easy_init();
108109

@@ -120,6 +121,25 @@ static int init(CURLM *cm, char *url, int64 id, HTAB *curlDataMap)
120121
cdata->headers = headers;
121122
}
122123

124+
if (contentType) {
125+
struct curl_slist *reqHeaders = NULL;
126+
reqHeaders = curl_slist_append(reqHeaders, contentType);
127+
curl_easy_setopt(eh, CURLOPT_HTTPHEADER, reqHeaders);
128+
}
129+
130+
if (strcasecmp(method, "GET") == 0) {
131+
if (reqBody) {
132+
curl_easy_setopt(eh, CURLOPT_POSTFIELDS, reqBody);
133+
curl_easy_setopt(eh, CURLOPT_CUSTOMREQUEST, "GET");
134+
}
135+
} else if (strcasecmp(method, "POST") == 0) {
136+
if (reqBody) {
137+
curl_easy_setopt(eh, CURLOPT_POSTFIELDS, reqBody);
138+
}
139+
} else {
140+
elog(ERROR, "error: Unsupported request method %s\n", method);
141+
}
142+
123143
curl_easy_setopt(eh, CURLOPT_WRITEFUNCTION, body_cb);
124144
curl_easy_setopt(eh, CURLOPT_WRITEDATA, cdata->body);
125145
curl_easy_setopt(eh, CURLOPT_HEADERFUNCTION, header_cb);
@@ -204,7 +224,7 @@ worker_main(Datum main_arg)
204224

205225
appendStringInfo(&select_query, "\
206226
SELECT\
207-
q.id, q.url\
227+
q.id, q.method, q.url, q.content_type, q.body\
208228
FROM net.http_request_queue q \
209229
LEFT JOIN net.http_response r ON q.id = r.id \
210230
WHERE r.id IS NULL");
@@ -223,12 +243,30 @@ worker_main(Datum main_arg)
223243

224244
for (int j = 0; j < SPI_processed; j++)
225245
{
246+
StringInfoData content_type;
247+
226248
int64 id = DatumGetInt64(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 1, &tupIsNull));
227-
char *url = TextDatumGetCString(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 2, &tupIsNull));
249+
char *method = TextDatumGetCString(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 2, &tupIsNull));
250+
char *url = TextDatumGetCString(SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 3, &tupIsNull));
251+
Datum contentTypeBin;
252+
Datum bodyBin;
253+
char *contentType = NULL;
254+
char *body = NULL;
255+
256+
contentTypeBin = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 4, &tupIsNull);
257+
if (!tupIsNull) {
258+
initStringInfo(&content_type);
259+
appendStringInfo(&content_type, "Content-Type: %s", TextDatumGetCString(contentTypeBin));
260+
contentType = content_type.data;
261+
}
262+
bodyBin = SPI_getbinval(SPI_tuptable->vals[j], SPI_tuptable->tupdesc, 5, &tupIsNull);
263+
if (!tupIsNull) body = TextDatumGetCString(bodyBin);
264+
265+
elog(DEBUG1, "Making a %s request to %s with id %ld", method, url, id);
228266

229-
elog(DEBUG1, "Making a request to %s with id %ld", url, id);
267+
res = init(cm, method, url, contentType, body, id, curlDataMap);
230268

231-
res = init(cm, url, id, curlDataMap);
269+
/* pfree(content_type.data); */
232270

233271
if(res) {
234272
elog(ERROR, "error: init() returned %d\n", res);

0 commit comments

Comments
 (0)