Skip to content

Commit 4be976a

Browse files
author
hongchunhua
committed
fix:解决并行锁管理问题
Change-Id: I2bab6f8d258c9980fe36cf4b1b2ba2c3c5e3fe0e
1 parent 9bb47c2 commit 4be976a

9 files changed

+102
-80
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ endif()
3838
set(VERSION_FILE_PATH ${XIO_ROOT_PATH})
3939

4040
#版本
41-
set(VERSION_NUM "xio_1.7.0_release")
41+
set(VERSION_NUM "v1.0.0")
4242

4343
set(VERSION_MSG "const char XIO_VERSION_STRING[] = \"${VERSION_NUM}\"\;")
4444
file(WRITE ${VERSION_FILE_PATH}/version.c ${VERSION_MSG})

inc/arpc_api.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ enum arpc_ctrl_attr{
5858
ARPC_E_CTRL_MAX = (1<<31), /*! @brief 最大标记位*/
5959
};
6060

61+
/*!
62+
* @brief arpc版本
63+
*
64+
* @return char, version
65+
*
66+
*/
67+
const char *arpc_version();
68+
6169
/*!
6270
* @brief arpc消息框架全局初始化,支持参数设置
6371
*

src/session/arpc_client.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ arpc_session_handle_t arpc_client_create_session(const struct arpc_client_sessio
141141
ret = arpc_session_connect_for_client(session, 3*1000);//等待至少一条链路可用
142142
LOG_THEN_GOTO_TAG_IF_VAL_TRUE(ret, error_2, "client session connect server[%s] fail", client_ctx->uri);
143143
}
144-
144+
ARPC_LOG_NOTICE("ARPC version[%s].", arpc_version());
145145
ARPC_LOG_NOTICE("Create session[%p] success, work thread num[%u], rx num[%u]!!", session, idle_thread_num, rx_con_num);
146146
return (arpc_session_handle_t)session;
147147

src/session/arpc_com.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "arpc_response.h"
2727
#include "crc64.h"
2828

29+
static const char *version = "v1.0.0";
2930
struct aprc_paramter{
3031
uint16_t is_init;
3132
uint16_t pad;
@@ -34,6 +35,11 @@ struct aprc_paramter{
3435
struct aprc_option opt;
3536
};
3637

38+
const char *arpc_version()
39+
{
40+
return xio_version();
41+
}
42+
3743
static struct aprc_paramter g_param= {
3844
.is_init = 0,
3945
.thread_pool = NULL,

src/session/arpc_connection.c

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,7 @@ static int arpc_client_run_loop(void * thread_ctx)
593593
ctx->xio_con_ctx = NULL;
594594
}
595595
ctx->flags = 0;
596+
SET_FLAG(ctx->flags, ARPC_CONN_ATTR_TELL_LIVE);
596597
prctl(PR_SET_NAME, "share_thread");
597598
arpc_cond_notify(&ctx->cond);
598599
ARPC_LOG_NOTICE("xio connection[%u] on thread[%lu] exit now.", con->id, pthread_self());
@@ -677,6 +678,7 @@ static void arpc_tx_event_callback(struct arpc_connection *usr_conn)
677678
}
678679
arpc_mutex_unlock(&con->msg_lock);
679680
ARPC_LOG_TRACE("xio send msg on client, msg type:%d", msg->type);
681+
arpc_cond_lock(&msg->cond);
680682
switch (msg->type)
681683
{
682684
case ARPC_MSG_TYPE_REQ:
@@ -687,18 +689,21 @@ static void arpc_tx_event_callback(struct arpc_connection *usr_conn)
687689
case ARPC_MSG_TYPE_RSP:
688690
usr_conn->tx_rsp_count++;
689691
ret = xio_send_response(msg->tx_msg);
692+
arpc_cond_notify(&msg->cond);
690693
statistics_per_time(&msg->now, &usr_conn->tx_rsp_cmit, 3);
691694
break;
692695
case ARPC_MSG_TYPE_OW:
693696
usr_conn->tx_ow_count++;
694697
ret = xio_send_msg(con->xio_con, msg->tx_msg);
698+
arpc_cond_notify(&msg->cond);
695699
statistics_per_time(&msg->now, &usr_conn->tx_ow_cmit, 3);
696700
break;
697701
default:
698702
ret = ARPC_ERROR;
699703
ARPC_LOG_ERROR("unkown msg type[%d]", msg->type);
700704
break;
701705
}
706+
arpc_cond_unlock(&msg->cond);
702707
ARPC_LOG_TRACE("xio send msg end, msg type:%d", msg->type);
703708
if(ret){
704709
ARPC_LOG_ERROR("send msg[%d] fail, errno code[%u], err msg[%s].", msg->type, xio_errno(), xio_strerror(xio_errno()));

src/session/arpc_process_request.c

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "arpc_request.h"
2727
#include "arpc_response.h"
2828

29+
static const int32_t SEND_RSP_END_MAX_TIME_MS = 2*1000;
2930
static int request_msg_async_deal(void *usr_ctx);
3031

3132
int process_request_header(struct arpc_connection *con, struct xio_msg *msg)
@@ -60,7 +61,7 @@ int process_request_data(struct arpc_connection *con, struct xio_msg *req, int l
6061
uint32_t i;
6162
struct arpc_vmsg rev_iov;
6263
struct arpc_rsp usr_rsp_param;
63-
struct arpc_common_msg *rsp_hanlde;
64+
struct arpc_common_msg *rsp_msg;
6465
struct arpc_rsp_handle *rsp_fd_ex;
6566
struct arpc_thread_param *async_param;
6667
int ret;
@@ -83,14 +84,14 @@ int process_request_data(struct arpc_connection *con, struct xio_msg *req, int l
8384
ret = destroy_xio_msg_usr_buf(req, ops->free_cb, usr_ctx);
8485
LOG_THEN_RETURN_VAL_IF_TRUE((ret), ARPC_ERROR, "destroy_xio_msg_usr_buf fail.");
8586

86-
rsp_hanlde = get_common_msg(con, ARPC_MSG_TYPE_RSP);
87-
LOG_THEN_RETURN_VAL_IF_TRUE(!rsp_hanlde, ARPC_ERROR, "rsp_hanlde alloc null.");
88-
rsp_fd_ex = (struct arpc_rsp_handle*)rsp_hanlde->ex_data;
87+
rsp_msg = get_common_msg(con, ARPC_MSG_TYPE_RSP);
88+
LOG_THEN_RETURN_VAL_IF_TRUE(!rsp_msg, ARPC_ERROR, "rsp_msg alloc null.");
89+
rsp_fd_ex = (struct arpc_rsp_handle*)rsp_msg->ex_data;
8990
rsp_fd_ex->x_rsp_msg = req;//保存回复的结构体
90-
rsp_hanlde->attr.rsp_crc = attr.req_crc;//请求保存在回复体里
91-
rsp_hanlde->attr.req_crc = 0;
91+
rsp_msg->attr.rsp_crc = attr.req_crc;//请求保存在回复体里
92+
rsp_msg->attr.req_crc = 0;
9293
memset(&usr_rsp_param, 0, sizeof(struct arpc_rsp));
93-
usr_rsp_param.rsp_fd = (void *)rsp_hanlde;
94+
usr_rsp_param.rsp_fd = (void *)rsp_msg;
9495

9596
if(IS_SET(req->usr_flags, METHOD_ARPC_PROC_SYNC) && ops->proc_data_cb){
9697
ARPC_LOG_TRACE("process rx request msg with async.");
@@ -122,7 +123,7 @@ int process_request_data(struct arpc_connection *con, struct xio_msg *req, int l
122123
async_param->ops.proc_async_cb = ops->proc_async_cb;
123124
async_param->ops.release_rsp_cb = ops->release_rsp_cb;
124125
async_param->ops.proc_oneway_async_cb = NULL;
125-
async_param->rsp_ctx = rsp_hanlde;
126+
async_param->rsp_ctx = rsp_msg;
126127
async_param->rev_iov = rev_iov;
127128
async_param->req_msg = NULL;
128129
async_param->usr_ctx = usr_ctx;
@@ -140,20 +141,19 @@ int process_request_data(struct arpc_connection *con, struct xio_msg *req, int l
140141
free_msg_xio2arpc(&rev_iov, ops->free_cb, usr_ctx);
141142

142143
do_respone:
143-
/* attach request to response */
144+
/* 框架内回复,则不需要通知模式,也不需要加锁 */
144145
ARPC_LOG_TRACE("do respone request msg.");
145146
gettimeofday(&tx_time, NULL); // 线程安全
146147

147-
rsp_hanlde->attr.tx_sec= tx_time.tv_sec;
148-
rsp_hanlde->attr.tx_usec= tx_time.tv_usec;
149-
rsp_hanlde->attr.conn_id = con->id;
148+
rsp_msg->attr.tx_sec= tx_time.tv_sec;
149+
rsp_msg->attr.tx_usec= tx_time.tv_usec;
150+
rsp_msg->attr.conn_id = con->id;
150151

151-
ret = arpc_init_response(rsp_hanlde);
152+
ret = arpc_init_response(rsp_msg);
152153
LOG_ERROR_IF_VAL_TRUE(ret, "arpc_init_response fail.");
153-
if(!ret){
154-
ret = arpc_connection_async_send(rsp_hanlde->conn, rsp_hanlde);
155-
LOG_ERROR_IF_VAL_TRUE(ret, "arpc_connection_async_send fail.");
156-
}
154+
ret = xio_send_response(rsp_msg->tx_msg);
155+
LOG_ERROR_IF_VAL_TRUE(ret, "xio_send_response fail.");
156+
157157
return ret;
158158
}
159159

@@ -162,17 +162,17 @@ static int request_msg_async_deal(void *usr_ctx)
162162
struct arpc_thread_param *async = (struct arpc_thread_param *)usr_ctx;
163163
struct arpc_rsp rsp;
164164
int ret;
165-
struct arpc_common_msg *rsp_fd;
165+
struct arpc_common_msg *rsp_msg;
166166
struct arpc_rsp_handle *rsp_fd_ex;
167167
struct timeval tx_time;
168168

169169
LOG_THEN_RETURN_VAL_IF_TRUE(!async, ARPC_ERROR, "async null.");
170170
LOG_THEN_RETURN_VAL_IF_TRUE(!async->ops.proc_async_cb, ARPC_ERROR, "request proc_async_cb null.");
171171
LOG_THEN_RETURN_VAL_IF_TRUE(!async->rsp_ctx, ARPC_ERROR, "request rsp context is null.");
172172

173-
rsp_fd = (struct arpc_common_msg *)async->rsp_ctx;
173+
rsp_msg = (struct arpc_common_msg *)async->rsp_ctx;
174174
memset(&rsp, 0, sizeof (struct arpc_rsp));
175-
rsp.rsp_fd = (void*)rsp_fd;
175+
rsp.rsp_fd = (void*)rsp_msg;
176176

177177
ARPC_LOG_TRACE("process request msg with async.");
178178
ret = async->ops.proc_async_cb(&async->rev_iov, &rsp, async->usr_ctx);
@@ -185,21 +185,26 @@ static int request_msg_async_deal(void *usr_ctx)
185185
async->rev_iov.vec = NULL;
186186
}
187187

188-
rsp_fd_ex = (struct arpc_rsp_handle*)rsp_fd->ex_data;
188+
rsp_fd_ex = (struct arpc_rsp_handle*)rsp_msg->ex_data;
189189
rsp_fd_ex->rsp_usr_iov = rsp.rsp_iov;
190190
rsp_fd_ex->rsp_usr_ctx = rsp.rsp_ctx;
191191
rsp_fd_ex->release_rsp_cb = async->ops.release_rsp_cb;
192192

193193
if (!IS_SET(rsp.flags, METHOD_CALLER_ASYNC)) {
194194
gettimeofday(&tx_time, NULL); // 线程安全
195-
rsp_fd->attr.tx_sec= tx_time.tv_sec;
196-
rsp_fd->attr.tx_usec= tx_time.tv_usec;
197-
ret = arpc_init_response(rsp_fd);
198-
LOG_ERROR_IF_VAL_TRUE(ret, "arpc_do_respone fail.");
199-
if(!ret){
200-
ret = arpc_connection_async_send(rsp_fd->conn, rsp_fd);
201-
LOG_ERROR_IF_VAL_TRUE(ret, "arpc_do_respone fail.");
195+
arpc_cond_lock(&rsp_msg->cond);
196+
rsp_msg->attr.tx_sec= tx_time.tv_sec;
197+
rsp_msg->attr.tx_usec= tx_time.tv_usec;
198+
ret = arpc_init_response(rsp_msg);
199+
LOG_ERROR_IF_VAL_TRUE(ret, "arpc_init_response fail.");
200+
ret = arpc_connection_async_send(rsp_msg->conn, rsp_msg);
201+
if(!ret) {
202+
ret = arpc_cond_wait_timeout(&rsp_msg->cond, SEND_RSP_END_MAX_TIME_MS); // 默认等待
203+
LOG_ERROR_IF_VAL_TRUE(ret, "rsp send timeout fail, conn[%u], msg[%p].", rsp_msg->conn->id, rsp_msg);
204+
}else{
205+
ARPC_LOG_ERROR("arpc_connection_async_send fail, conn[%u], msg[%p].", rsp_msg->conn->id, rsp_msg);
202206
}
207+
arpc_cond_unlock(&rsp_msg->cond);
203208
}
204209
SAFE_FREE_MEM(async->rev_iov.head);
205210
SAFE_FREE_MEM(async);

src/session/arpc_request.c

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -204,10 +204,9 @@ int arpc_send_oneway_msg(const arpc_session_handle_t fd, struct arpc_vmsg *send,
204204
ret = convert_msg_arpc2xio(send, &req->out, &req_msg->attr);
205205
LOG_THEN_GOTO_TAG_IF_VAL_TRUE(ret, free_common_msg, "convert xio msg fail.");
206206
req->user_context = req_msg;
207-
if (!ow_msg->clean_send_cb){
207+
/*if (!ow_msg->clean_send_cb){
208208
req->flags |= XIO_MSG_FLAG_IMM_SEND_COMP;
209-
}
210-
209+
}*/
211210
arpc_cond_lock(&req_msg->cond);
212211
while(send_cnt < 1) {
213212
send_cnt++;
@@ -224,11 +223,8 @@ int arpc_send_oneway_msg(const arpc_session_handle_t fd, struct arpc_vmsg *send,
224223
if (!ow_msg->clean_send_cb){
225224
ret = arpc_cond_wait_timeout(&req_msg->cond, SEND_ONEWAY_END_MAX_TIME); // 默认等待
226225
arpc_cond_unlock(&req_msg->cond);
227-
if (!ret){
228-
free_msg_arpc2xio(&req->out);
229-
put_common_msg(req_msg); //un lock
230-
}else{
231-
free_msg_arpc2xio(&req->out);
226+
free_msg_arpc2xio(&req->out);
227+
if (ret){
232228
ARPC_LOG_ERROR("wait oneway msg send complete timeout fail, msg keep."); // TODO 释放超时的资源
233229
}
234230
}else{
@@ -263,7 +259,6 @@ int arpc_oneway_send_complete(struct arpc_common_msg *ow_msg)
263259
free_msg_arpc2xio(&ow_msg->xio_msg.out);
264260
put_common_msg(ow_msg); //un lock
265261
}else{
266-
arpc_cond_notify(&ow_msg->cond);
267262
arpc_cond_unlock(&ow_msg->cond);
268263
}
269264
ARPC_LOG_DEBUG("send end complete.");

0 commit comments

Comments
 (0)