Skip to content

Commit 0318532

Browse files
committed
Removed query queue, save only one query at the time
1 parent 7876e3a commit 0318532

File tree

4 files changed

+45
-29
lines changed

4 files changed

+45
-29
lines changed

source/async_postgres.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ namespace async_postgres {
8989

9090
struct Connection {
9191
pg::conn conn;
92-
std::queue<Query> queries;
92+
std::optional<Query> query;
9393
std::optional<ResetEvent> reset_event;
9494
GLua::AutoReference on_notify;
9595

@@ -113,7 +113,7 @@ namespace async_postgres {
113113
void process_notifications(GLua::ILuaInterface* lua, Connection* state);
114114

115115
// query.cpp
116-
void process_queries(GLua::ILuaInterface* lua, Connection* state);
116+
void process_query(GLua::ILuaInterface* lua, Connection* state);
117117

118118
// result.cpp
119119
void create_result_table(GLua::ILuaInterface* lua, PGresult* result);

source/main.cpp

+17-7
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace async_postgres::lua {
2323

2424
async_postgres::process_reset(lua, state);
2525
async_postgres::process_notifications(lua, state);
26-
async_postgres::process_queries(lua, state);
26+
async_postgres::process_query(lua, state);
2727
}
2828

2929
return 0;
@@ -47,14 +47,18 @@ namespace async_postgres::lua {
4747
lua->CheckType(3, GLua::Type::Function);
4848

4949
auto state = lua_connection_state();
50+
if (state->query) {
51+
throw std::runtime_error("query already in progress");
52+
}
5053

5154
async_postgres::SimpleCommand command = {lua->GetString(2)};
5255
async_postgres::Query query = {std::move(command)};
53-
if (!lua->IsType(3, GLua::Type::Nil)) {
56+
57+
if (lua->IsType(3, GLua::Type::Function)) {
5458
query.callback = GLua::AutoReference(lua, 3);
5559
}
5660

57-
state->queries.push(std::move(query));
61+
state->query = std::move(query);
5862
return 0;
5963
}
6064

@@ -65,15 +69,21 @@ namespace async_postgres::lua {
6569
lua->CheckType(4, GLua::Type::Function);
6670

6771
auto state = lua_connection_state();
72+
if (state->query) {
73+
throw std::runtime_error("query already in progress");
74+
}
6875

6976
async_postgres::ParameterizedCommand command = {
7077
lua->GetString(2),
7178
async_postgres::array_to_params(lua, 3),
7279
};
73-
7480
async_postgres::Query query = {std::move(command)};
75-
query.callback = GLua::AutoReference(lua, 4);
76-
state->queries.push(std::move(query));
81+
82+
if (lua->IsType(4, GLua::Type::Function)) {
83+
query.callback = GLua::AutoReference(lua, 4);
84+
}
85+
86+
state->query = std::move(query);
7787

7888
return 0;
7989
}
@@ -82,7 +92,7 @@ namespace async_postgres::lua {
8292
lua->CheckType(1, async_postgres::connection_meta);
8393

8494
GLua::AutoReference callback;
85-
if (!lua->IsType(2, GLua::Type::Nil)) {
95+
if (lua->IsType(2, GLua::Type::Function)) {
8696
callback = GLua::AutoReference(lua, 2);
8797
}
8898

source/notifications.cpp

+1-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,7 @@ void async_postgres::process_notifications(GLua::ILuaInterface* lua,
1313
return;
1414
}
1515

16-
if (state->queries.empty() &&
17-
check_socket_status(state->conn.get()).read_ready &&
16+
if (!state->query && check_socket_status(state->conn.get()).read_ready &&
1817
PQconsumeInput(state->conn.get()) == 0) {
1918
// we consumed input
2019
// but there was some error

source/query.cpp

+25-18
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,23 @@ inline bool send_query(PGconn* conn, Query& query) {
1818
return false;
1919
}
2020

21-
void query_failed(GLua::ILuaInterface* lua, PGconn* conn, Query& query) {
21+
// This function will remove the query from the connection state
22+
// and call the callback with the error message
23+
void query_failed(GLua::ILuaInterface* lua, Connection* state) {
24+
if (!state->query) {
25+
return;
26+
}
27+
28+
auto query = std::move(*state->query);
29+
lua->Msg("[async_postgres] query failed: current query %d\n",
30+
state->query.has_value());
31+
32+
state->query.reset();
33+
2234
if (query.callback) {
2335
query.callback.Push();
2436
lua->PushBool(false);
25-
lua->PushString(PQerrorMessage(conn));
37+
lua->PushString(PQerrorMessage(state->conn.get()));
2638
pcall(lua, 2, 0);
2739
}
2840
}
@@ -54,42 +66,37 @@ bool bad_result(PGresult* result) {
5466
status == PGRES_FATAL_ERROR;
5567
}
5668

57-
void async_postgres::process_queries(GLua::ILuaInterface* lua,
58-
Connection* state) {
59-
if (state->queries.empty()) {
69+
void async_postgres::process_query(GLua::ILuaInterface* lua,
70+
Connection* state) {
71+
if (!state->query || state->reset_event) {
6072
// no queries to process
61-
return;
62-
}
63-
64-
if (state->reset_event) {
6573
// don't process queries while reconnecting
6674
return;
6775
}
6876

69-
auto& query = state->queries.front();
77+
auto& query = state->query.value();
7078
if (!query.sent) {
7179
if (!send_query(state->conn.get(), query)) {
72-
query_failed(lua, state->conn.get(), query);
73-
state->queries.pop();
74-
return process_queries(lua, state);
80+
query_failed(lua, state);
81+
return process_query(lua, state);
7582
}
7683

7784
query.sent = true;
7885
query.flushed = PQflush(state->conn.get()) == 0;
7986
}
8087

8188
if (!poll_query(state->conn.get(), query)) {
82-
query_failed(lua, state->conn.get(), query);
83-
state->queries.pop();
84-
return process_queries(lua, state);
89+
query_failed(lua, state);
90+
return process_query(lua, state);
8591
}
8692

8793
while (PQisBusy(state->conn.get()) == 0) {
8894
auto result = pg::getResult(state->conn);
8995
if (!result) {
9096
// query is done
91-
state->queries.pop();
92-
return process_queries(lua, state);
97+
// TODO: remove query if we have a final result
98+
state->query.reset();
99+
return process_query(lua, state);
93100
}
94101

95102
if (query.callback) {

0 commit comments

Comments
 (0)