Skip to content

Commit

Permalink
refactor: use stl container in server (swoole#3253)
Browse files Browse the repository at this point in the history
* refactor: remove swUserWorker_node
use std::list instead of swUserWorker_node

* fix

* refactor: remove swListenPort node

* fix

* improve code

* fix

* improve: use vector instead of list

* improve: free memory in swServer_destory

* improve: delete the unused list header file

* fix
  • Loading branch information
huanghantao authored Apr 20, 2020
1 parent 79583d2 commit 76d7713
Show file tree
Hide file tree
Showing 9 changed files with 96 additions and 93 deletions.
12 changes: 2 additions & 10 deletions include/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ struct swReactorThread

struct swListenPort
{
swListenPort *next, *prev;

/**
* tcp socket listen backlog
*/
Expand Down Expand Up @@ -206,12 +204,6 @@ struct swListenPort
int (*onRead)(swReactor *reactor, swListenPort *port, swEvent *event);
};

struct swUserWorker_node
{
swUserWorker_node *next, *prev;
swWorker *worker;
};

struct swWorkerStopMessage
{
pid_t pid;
Expand Down Expand Up @@ -490,7 +482,7 @@ struct swServer
void *private_data_3;

swFactory factory;
swListenPort *listen_list;
std::vector<swListenPort*> *listen_list;
pthread_t heartbeat_pidt;

/**
Expand All @@ -507,7 +499,7 @@ struct swServer
* user process
*/
uint32_t user_worker_num;
swUserWorker_node *user_worker_list;
std::vector<swWorker*> *user_worker_list;
swHashMap *user_worker_map;
swWorker *user_workers;

Expand Down
15 changes: 7 additions & 8 deletions src/server/manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,13 @@ static sw_inline int swManager_spawn_user_workers(swServer *serv)

if (serv->user_worker_list)
{
swUserWorker_node *user_worker;
LL_FOREACH(serv->user_worker_list, user_worker)
for (auto worker : *serv->user_worker_list)
{
if (user_worker->worker->pipe_object)
if (worker->pipe_object)
{
swServer_store_pipe_fd(serv, user_worker->worker->pipe_object);
swServer_store_pipe_fd(serv, worker->pipe_object);
}
pid = swManager_spawn_user_worker(serv, user_worker->worker);
pid = swManager_spawn_user_worker(serv, worker);
if (pid < 0)
{
return SW_ERR;
Expand Down Expand Up @@ -205,11 +204,11 @@ int swManager_start(swServer *serv)
{
return SW_ERR;
}
swUserWorker_node *user_worker;

i = 0;
LL_FOREACH(serv->user_worker_list, user_worker)
for (auto worker : *serv->user_worker_list)
{
memcpy(&serv->user_workers[i], user_worker->worker, sizeof(swWorker));
memcpy(&serv->user_workers[i], worker, sizeof(swWorker));
if (swServer_worker_create(serv, &serv->user_workers[i]) < 0)
{
return SW_ERR;
Expand Down
80 changes: 48 additions & 32 deletions src/server/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,13 @@ static void swServer_check_port_type(swServer *serv, swListenPort *ls);

static void swServer_disable_accept(swServer *serv)
{
swListenPort *ls;

serv->enable_accept_timer = swoole_timer_add(SW_ACCEPT_RETRY_TIME * 1000, 0, swServer_enable_accept, serv);
if (serv->enable_accept_timer == nullptr)
{
return;
}

LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
//UDP
if (swSocket_is_dgram(ls->type))
Expand All @@ -70,10 +68,9 @@ static void swServer_disable_accept(swServer *serv)

static void swServer_enable_accept(swTimer *timer, swTimer_node *tnode)
{
swListenPort *ls;
swServer *serv = (swServer *) tnode->data;

LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (swSocket_is_dgram(ls->type))
{
Expand All @@ -87,8 +84,7 @@ static void swServer_enable_accept(swTimer *timer, swTimer_node *tnode)

void swServer_close_port(swServer *serv, enum swBool_type only_stream_port)
{
swListenPort *ls;
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (only_stream_port && swSocket_is_dgram(ls->type))
{
Expand Down Expand Up @@ -400,8 +396,7 @@ static int swServer_start_check(swServer *serv)
swWarn("serv->max_connection is exceed the SW_SESSION_LIST_SIZE, it's reset to %u", SW_SESSION_LIST_SIZE);
}
// package max length
swListenPort *ls;
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (ls->protocol.package_max_length < SW_BUFFER_MIN_SIZE)
{
Expand Down Expand Up @@ -433,9 +428,9 @@ static int swServer_start_check(swServer *serv)

void swServer_store_listen_socket(swServer *serv)
{
swListenPort *ls;
int sockfd;
LL_FOREACH(serv->listen_list, ls)

for (auto ls : *serv->listen_list)
{
sockfd = ls->socket->fd;
//save server socket to connection_list
Expand Down Expand Up @@ -560,10 +555,23 @@ int swServer_create_task_workers(swServer *serv)
}

/**
* only the memory of the swWorker structure is allocated, no process is fork
* @description:
* only the memory of the swWorker structure is allocated, no process is fork.
* called when the manager process start.
* @param swServer
* @return: SW_OK|SW_ERR
*/
int swServer_create_user_workers(swServer *serv)
{
/**
* if Swoole\Server::addProcess is called first,
* swServer::user_worker_list is initialized in the swServer_add_worker function
*/
if (serv->user_worker_list == nullptr)
{
serv->user_worker_list = new std::vector<swWorker *>;
}

serv->user_workers = (swWorker *) SwooleG.memory_pool->alloc(SwooleG.memory_pool, serv->user_worker_num * sizeof(swWorker));
if (serv->user_workers == NULL)
{
Expand Down Expand Up @@ -812,11 +820,10 @@ int swServer_start(swServer *serv)
*/
if (serv->user_worker_list)
{
swUserWorker_node *user_worker;
i = 0;
LL_FOREACH(serv->user_worker_list, user_worker)
for (auto worker : *serv->user_worker_list)
{
user_worker->worker->id = serv->worker_num + serv->task_worker_num + i;
worker->id = serv->worker_num + serv->task_worker_num + i;
i++;
}
}
Expand Down Expand Up @@ -983,12 +990,11 @@ int swServer_shutdown(swServer *serv)
{
swReactor *reactor = SwooleTG.reactor;
reactor->wait_exit = 1;
swListenPort *port;
LL_FOREACH(serv->listen_list, port)
for (auto ls : *serv->listen_list)
{
if (swSocket_is_stream(port->type))
if (swSocket_is_stream(ls->type))
{
reactor->del(reactor, port->socket);
reactor->del(reactor, ls->socket);
}
}
swServer_clear_timer(serv);
Expand Down Expand Up @@ -1028,11 +1034,21 @@ static int swServer_destory(swServer *serv)
swReactorThread_join(serv);
}

swListenPort *port;
LL_FOREACH(serv->listen_list, port)
for (auto ls : *serv->listen_list)
{
swPort_free(port);
swPort_free(ls);
}
delete serv->listen_list;
serv->listen_list = nullptr;

/**
* because the swWorker in user_worker_list is the memory allocated by emalloc,
* the efree function will be called when the user process is destructed,
* so there's no need to call the efree here.
*/
delete serv->user_worker_list;
serv->user_worker_list = nullptr;

//close log file
if (SwooleG.log_file != 0)
{
Expand Down Expand Up @@ -1641,16 +1657,13 @@ void swServer_master_onTimer(swTimer *timer, swTimer_node *tnode)

int swServer_add_worker(swServer *serv, swWorker *worker)
{
swUserWorker_node *user_worker = (swUserWorker_node *) sw_malloc(sizeof(swUserWorker_node));
if (!user_worker)
if (serv->user_worker_list == nullptr)
{
return SW_ERR;
serv->user_worker_list = new std::vector<swWorker *>;
}

serv->user_worker_num++;
user_worker->worker = worker;
serv->user_worker_list->push_back(worker);

LL_APPEND(serv->user_worker_list, user_worker);
if (!serv->user_worker_map)
{
serv->user_worker_map = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, NULL);
Expand Down Expand Up @@ -1739,7 +1752,7 @@ int swServer_add_systemd_socket(swServer *serv)
}
swServer_check_port_type(serv, ls);

LL_APPEND(serv->listen_list, ls);
serv->listen_list->push_back(ls);
serv->listen_port_num++;
count++;
}
Expand Down Expand Up @@ -1835,15 +1848,18 @@ swListenPort* swServer_add_port(swServer *serv, enum swSocket_type type, const c
}
swServer_check_port_type(serv, ls);
ls->socket_fd = ls->socket->fd;
LL_APPEND(serv->listen_list, ls);
if (serv->listen_list == nullptr)
{
serv->listen_list = new std::vector<swListenPort *>;
}
serv->listen_list->push_back(ls);
serv->listen_port_num++;
return ls;
}

int swServer_get_socket(swServer *serv, int port)
{
swListenPort *ls;
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (ls->port == port || port == 0)
{
Expand Down
17 changes: 7 additions & 10 deletions src/server/reactor_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ static uint32_t heartbeat_check_lasttime = 0;

static bool swServer_is_single(swServer *serv)
{
return serv->worker_num == 1 && serv->task_worker_num == 0 && serv->max_request == 0 && serv->user_worker_list == NULL;
return serv->worker_num == 1 && serv->task_worker_num == 0 && serv->max_request == 0 && serv->user_worker_list == nullptr;
}

int swReactorProcess_create(swServer *serv)
Expand Down Expand Up @@ -61,13 +61,12 @@ void swReactorProcess_free(swServer *serv)

int swReactorProcess_start(swServer *serv)
{
swListenPort *ls;
serv->single_thread = 1;

//listen TCP
if (serv->have_stream_sock == 1)
{
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (swSocket_is_dgram(ls->type))
{
Expand Down Expand Up @@ -157,17 +156,16 @@ int swReactorProcess_start(swServer *serv)
swSysWarn("gmalloc[server->user_workers] failed");
return SW_ERR;
}
swUserWorker_node *user_worker;
LL_FOREACH(serv->user_worker_list, user_worker)
for (auto worker : *serv->user_worker_list)
{
/**
* store the pipe object
*/
if (user_worker->worker->pipe_object)
if (worker->pipe_object)
{
swServer_store_pipe_fd(serv, user_worker->worker->pipe_object);
swServer_store_pipe_fd(serv, worker->pipe_object);
}
swManager_spawn_user_worker(serv, user_worker->worker);
swManager_spawn_user_worker(serv, worker);
}
}

Expand Down Expand Up @@ -331,10 +329,9 @@ static int swReactorProcess_loop(swProcessPool *pool, swWorker *worker)
return SW_ERR;
}

swListenPort *ls;
int fdtype;

LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
fdtype = swSocket_is_dgram(ls->type) ? SW_FD_DGRAM_SERVER : SW_FD_STREAM_SERVER;
#ifdef HAVE_REUSEPORT
Expand Down
19 changes: 8 additions & 11 deletions src/server/reactor_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,7 @@ static void swReactorThread_shutdown(swReactor *reactor)
//stop listen UDP Port
if (serv->have_dgram_sock == 1)
{
swListenPort *ls;
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (swSocket_is_dgram(ls->type))
{
Expand Down Expand Up @@ -619,9 +618,8 @@ void swReactorThread_set_protocol(swServer *serv, swReactor *reactor)
//Read
swReactor_set_handler(reactor, SW_FD_SESSION | SW_EVENT_READ, swReactorThread_onRead);

swListenPort *ls;
//listen the all tcp port
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (swSocket_is_dgram(ls->type)
#ifdef SW_SUPPORT_DTLS
Expand Down Expand Up @@ -853,22 +851,22 @@ int swReactorThread_start(swServer *serv)
#endif

//set listen socket options
swListenPort *ls;
LL_FOREACH(serv->listen_list, ls)
std::vector<swListenPort *>::iterator ls;
for (ls = serv->listen_list->begin(); ls != serv->listen_list->end(); ls++)
{
if (swSocket_is_dgram(ls->type))
if (swSocket_is_dgram((*ls)->type))
{
continue;
}
if (swPort_listen(ls) < 0)
if (swPort_listen(*ls) < 0)
{
_failed:
reactor->free(reactor);
SwooleTG.reactor = nullptr;
sw_free(reactor);
return SW_ERR;
}
reactor->add(reactor, ls->socket, SW_EVENT_READ);
reactor->add(reactor, (*ls)->socket, SW_EVENT_READ);
}

/**
Expand Down Expand Up @@ -996,8 +994,7 @@ static int swReactorThread_init(swServer *serv, swReactor *reactor, uint16_t rea
//listen UDP port
if (serv->have_dgram_sock == 1)
{
swListenPort *ls;
LL_FOREACH(serv->listen_list, ls)
for (auto ls : *serv->listen_list)
{
if (swSocket_is_stream(ls->type))
{
Expand Down
5 changes: 2 additions & 3 deletions src/server/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -507,10 +507,9 @@ void swWorker_stop(swWorker *worker)

if (serv->factory_mode == SW_MODE_BASE && swIsWorker())
{
swListenPort *port;
LL_FOREACH(serv->listen_list, port)
for (auto ls : *serv->listen_list)
{
reactor->del(reactor, port->socket);
reactor->del(reactor, ls->socket);
}
if (worker->pipe_master)
{
Expand Down
Loading

0 comments on commit 76d7713

Please sign in to comment.