Skip to content

Commit

Permalink
modify:The upload speed limit is now processed using a new mode, netw…
Browse files Browse the repository at this point in the history
…ork events + algorithm events, and will not affect other network clients.
  • Loading branch information
xengine-qyt committed Oct 25, 2024
1 parent 525c446 commit 94a1f64
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 20 deletions.
34 changes: 29 additions & 5 deletions XEngine_Source/StorageModule_Session/Session_Define.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,22 +283,32 @@ extern "C" bool Session_UPStroage_Destory();
类型:常量字符指针
可空:N
意思:输入文件地址
参数.四:nFileSize
参数.四:xhSpeed
In/Out:Out
类型:整数型
可空:N
意思:输入限速句柄
参数.五:nFileSize
In/Out:Out
类型:整数型
可空:N
意思:输入文件大小
参数.:bRewrite
参数.:bRewrite
In/Out:In
类型:整数型
可空:N
意思:是否允许覆写
参数.六:nPosStart
参数.七:nSpeedLimit
In/Out:In
类型:整数型
可空:Y
意思:输入上传限速速率
参数.八:nPosStart
In/Out:In
类型:整数型
可空:Y
意思:输入起始位置
参数.:nPostEnd
参数.:nPostEnd
In/Out:In
类型:整数型
可空:Y
Expand All @@ -308,7 +318,7 @@ extern "C" bool Session_UPStroage_Destory();
意思:是否成功
备注:
*********************************************************************/
extern "C" bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, __int64x nFileSize, bool bRewrite, int nPosStart = 0, int nPostEnd = 0);
extern "C" bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, XHANDLE xhSpeed, __int64x nFileSize, bool bRewrite, int nSpeedLimit = 0, int nPosStart = 0, int nPostEnd = 0);
/********************************************************************
函数名称:Session_UPStroage_GetInfo
函数功能:获取上传客户端信息
Expand All @@ -329,6 +339,20 @@ extern "C" bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuc
*********************************************************************/
extern "C" bool Session_UPStroage_GetInfo(LPCXSTR lpszClientAddr, SESSION_STORAGEINFO* pSt_StorageInfo);
/********************************************************************
函数名称:Session_UPStroage_GetSpeed
函数功能:获得速率限制句柄
参数.一:lpszClientAddr
In/Out:In
类型:常量字符指针
可空:N
意思:输入要操作的客户端
返回值
类型:句柄
意思:返回速率句柄
备注:
*********************************************************************/
extern "C" XHANDLE Session_UPStroage_GetSpeed(LPCXSTR lpszClientAddr);
/********************************************************************
函数名称:Session_UPStroage_Write
函数功能:写入数据到文件
参数.一:lpszClientAddr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,32 @@ bool CSession_UPStroage::Session_UPStroage_Destory()
类型:常量字符指针
可空:N
意思:输入文件地址
参数.四:nFileSize
参数.四:xhSpeed
In/Out:Out
类型:整数型
可空:N
意思:输入限速句柄
参数.五:nFileSize
In/Out:Out
类型:整数型
可空:N
意思:输入文件大小
参数.:bRewrite
参数.:bRewrite
In/Out:In
类型:整数型
可空:N
意思:是否允许覆写
参数.六:nPosStart
参数.七:nSpeedLimit
In/Out:In
类型:整数型
可空:Y
意思:输入上传限速速率
参数.八:nPosStart
In/Out:In
类型:整数型
可空:Y
意思:输入起始位置
参数.:nPostEnd
参数.:nPostEnd
In/Out:In
类型:整数型
可空:Y
Expand All @@ -107,7 +117,7 @@ bool CSession_UPStroage::Session_UPStroage_Destory()
意思:是否成功
备注:
*********************************************************************/
bool CSession_UPStroage::Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, __int64x nFileSize, bool bRewrite, int nPosStart /* = 0 */, int nPostEnd /* = 0 */)
bool CSession_UPStroage::Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, XHANDLE xhSpeed, __int64x nFileSize, bool bRewrite, int nSpeedLimit, int nPosStart , int nPostEnd )
{
Session_IsErrorOccur = false;

Expand All @@ -132,9 +142,11 @@ bool CSession_UPStroage::Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXST
SESSION_STORAGEUPLOADER st_Client;
memset(&st_Client, '\0', sizeof(SESSION_STORAGEUPLOADER));
//填充下载信息
st_Client.xhSpeed = xhSpeed;
st_Client.st_StorageInfo.ullPosStart = nPosStart;
st_Client.st_StorageInfo.ullPosEnd = nPostEnd;
st_Client.st_StorageInfo.ullCount = nFileSize;
st_Client.st_StorageInfo.nLimit = nSpeedLimit;
_tcsxcpy(st_Client.st_StorageInfo.tszBuckKey, lpszBuckKey);
_tcsxcpy(st_Client.st_StorageInfo.tszFileDir, lpszFileDir);
_tcsxcpy(st_Client.st_StorageInfo.tszClientAddr, lpszClientAddr);
Expand Down Expand Up @@ -234,6 +246,43 @@ bool CSession_UPStroage::Session_UPStroage_GetInfo(LPCXSTR lpszClientAddr, SESSI
return true;
}
/********************************************************************
函数名称:Session_UPStroage_GetSpeed
函数功能:获得速率限制句柄
参数.一:lpszClientAddr
In/Out:In
类型:常量字符指针
可空:N
意思:输入要操作的客户端
返回值
类型:句柄
意思:返回速率句柄
备注:
*********************************************************************/
XHANDLE CSession_UPStroage::Session_UPStroage_GetSpeed(LPCXSTR lpszClientAddr)
{
Session_IsErrorOccur = false;

if ((NULL == lpszClientAddr))
{
Session_IsErrorOccur = true;
Session_dwErrorCode = ERROR_STORAGE_MODULE_SESSION_PARAMENT;
return NULL;
}

st_Locker.lock_shared();
unordered_map<string, SESSION_STORAGEUPLOADER>::iterator stl_MapIterator = stl_MapStroage.find(lpszClientAddr);
if (stl_MapIterator == stl_MapStroage.end())
{
Session_IsErrorOccur = true;
Session_dwErrorCode = ERROR_STORAGE_MODULE_SESSION_NOTFOUND;
st_Locker.unlock_shared();
return NULL;
}
XHANDLE xhSpeed = stl_MapIterator->second.xhSpeed;;
st_Locker.unlock_shared();
return xhSpeed;
}
/********************************************************************
函数名称:Session_UPStroage_Write
函数功能:写入数据到文件
参数.一:lpszClientAddr
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ typedef struct
{
time_t nTimeStart;
SESSION_STORAGEINFO st_StorageInfo;
XHANDLE xhSpeed;
}SESSION_STORAGEUPLOADER;

class CSession_UPStroage
Expand All @@ -24,8 +25,9 @@ class CSession_UPStroage
public:
bool Session_UPStroage_Init(int nMaxConnect, bool bUPResume = false);
bool Session_UPStroage_Destory();
bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, __int64x nFileSize, bool bRewrite, int nPosStart = 0, int nPostEnd = 0);
bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, XHANDLE xhSpeed, __int64x nFileSize, bool bRewrite, int nSpeedLimit = 0, int nPosStart = 0, int nPostEnd = 0);
bool Session_UPStroage_GetInfo(LPCXSTR lpszClientAddr, SESSION_STORAGEINFO* pSt_StorageInfo);
XHANDLE Session_UPStroage_GetSpeed(LPCXSTR lpszClientAddr);
bool Session_UPStroage_Write(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, int nMsgLen);
bool Session_UPStroage_Exist(LPCXSTR lpszClientAddr);
bool Session_UPStorage_GetAll(SESSION_STORAGEINFO*** pppSt_StorageInfo, int* pInt_ListCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ EXPORTS
Session_UPStroage_Destory
Session_UPStroage_Insert
Session_UPStroage_GetInfo
Session_UPStroage_GetSpeed
Session_UPStroage_Write
Session_UPStroage_Exist
Session_UPStorage_GetAll
Expand Down
8 changes: 6 additions & 2 deletions XEngine_Source/StorageModule_Session/pch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,18 @@ extern "C" bool Session_UPStroage_Destory()
{
return m_UPStorage.Session_UPStroage_Destory();
}
extern "C" bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, __int64x nFileSize, bool bRewrite, int nPosStart, int nPostEnd)
extern "C" bool Session_UPStroage_Insert(LPCXSTR lpszClientAddr, LPCXSTR lpszBuckKey, LPCXSTR lpszFileDir, XHANDLE xhSpeed, __int64x nFileSize, bool bRewrite, int nSpeedLimit, int nPosStart, int nPostEnd)
{
return m_UPStorage.Session_UPStroage_Insert(lpszClientAddr, lpszBuckKey, lpszFileDir, nFileSize, bRewrite, nPosStart, nPostEnd);
return m_UPStorage.Session_UPStroage_Insert(lpszClientAddr, lpszBuckKey, lpszFileDir, xhSpeed, nFileSize, bRewrite, nSpeedLimit, nPosStart, nPostEnd);
}
extern "C" bool Session_UPStroage_GetInfo(LPCXSTR lpszClientAddr, SESSION_STORAGEINFO * pSt_StorageInfo)
{
return m_UPStorage.Session_UPStroage_GetInfo(lpszClientAddr, pSt_StorageInfo);
}
extern "C" XHANDLE Session_UPStroage_GetSpeed(LPCXSTR lpszClientAddr)
{
return m_UPStorage.Session_UPStroage_GetSpeed(lpszClientAddr);
}
extern "C" bool Session_UPStroage_Write(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, int nMsgLen)
{
return m_UPStorage.Session_UPStroage_Write(lpszClientAddr, lpszMsgBuffer, nMsgLen);
Expand Down
7 changes: 1 addition & 6 deletions XEngine_Source/XEngine_StorageApp/StorageApp_Network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,6 @@ void CALLBACK XEngine_Callback_UPLoaderRecv(LPCXSTR lpszClientAddr, XSOCKET hSoc
}
}
SocketOpt_HeartBeat_ActiveAddrEx(xhHBUPLoader, lpszClientAddr);

int nCount = 0;
__int64u nTimeWait = 0;
Session_UPStorage_GetAll(NULL, &nCount);
Algorithm_Calculation_SleepFlow(xhLimit, &nTimeWait, st_ServiceCfg.st_XLimit.nMaxUPLoader, nCount, nMsgLen);
std::this_thread::sleep_for(std::chrono::microseconds(nTimeWait));
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_DEBUG, _X("上传客户端:%s,投递包成功,大小:%d"), lpszClientAddr, nMsgLen);
}
void CALLBACK XEngine_Callback_UPLoaderLeave(LPCXSTR lpszClientAddr, XSOCKET hSocket, XPVOID lParam)
Expand Down Expand Up @@ -212,6 +206,7 @@ bool XEngine_Net_CloseClient(LPCXSTR lpszClientAddr, int nLeaveType, int nClient
NetCore_TCPXCore_CloseForClientEx(xhNetUPLoader, lpszClientAddr);
SocketOpt_HeartBeat_DeleteAddrEx(xhHBUPLoader, lpszClientAddr);
}
Algorithm_Calculation_Close(Session_UPStroage_GetSpeed(lpszClientAddr));
Session_UPStroage_Delete(lpszClientAddr);
HttpProtocol_Server_CloseClinetEx(xhUPHttp, lpszClientAddr);
OPenSsl_Server_CloseClientEx(xhUPSsl, lpszClientAddr);
Expand Down
31 changes: 30 additions & 1 deletion XEngine_Source/XEngine_StorageApp/StorageApp_UPLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ XHTHREAD CALLBACK XEngine_UPLoader_HTTPThread(XPVOID lParam)
}
return 0;
}
void CALLBACK XEngine_UPLoader_UPFlow(XHANDLE xhToken, bool bSDFlow, bool bRVFlow, bool bTime, __int64u nSDFlow, __int64u nRVFlow, __int64u nTimeFlow, XPVOID lParam)
{
XCHAR tszIPAddr[128] = {};
_tcsxcpy(tszIPAddr, (LPCXSTR)lParam);
if (bSDFlow)
{
NetCore_TCPXCore_PasueRecvEx(xhNetUPLoader, tszIPAddr, false);
}
else
{
NetCore_TCPXCore_PasueRecvEx(xhNetUPLoader, tszIPAddr, true);
}
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("上传客户端:%s,接受数据标志:%d,当前平均流量:%llu"), tszIPAddr, bSDFlow, nSDFlow);
}
bool XEngine_Task_HttpUPLoader(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, int nMsgLen, RFCCOMPONENTS_HTTP_REQPARAM* pSt_HTTPParam, XCHAR** pptszListHdr, int nHdrCount)
{
int nSDLen = 2048;
Expand Down Expand Up @@ -292,7 +306,21 @@ bool XEngine_Task_HttpUPLoader(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, in
}
SystemApi_File_CreateMutilFolder(tszTmpPath);
}
if (!Session_UPStroage_Insert(lpszClientAddr, st_StorageBucket.tszBuckKey, tszFileDir, nPosCount, st_StorageBucket.st_PermissionFlags.bRewrite, nPosStart, nPosEnd))
XHANDLE xhUPSpeed = NULL;
if (nLimit > 0 || (st_ServiceCfg.st_XLimit.bLimitMode && st_ServiceCfg.st_XLimit.nMaxUPLoader > 0))
{
//处理限速情况
XCHAR* ptszIPClient = (XCHAR*)malloc(MAX_PATH);
memset(ptszIPClient, '\0', MAX_PATH);
_tcsxcpy(ptszIPClient, lpszClientAddr);

nLimit = nLimit == 0 ? st_ServiceCfg.st_XLimit.nMaxUPLoader : nLimit;
xhUPSpeed = Algorithm_Calculation_Create();
Algorithm_Calculation_PassiveOPen(xhUPSpeed, XEngine_UPLoader_UPFlow, nLimit, 0, 0, false, ptszIPClient);
NetCore_TCPXCore_PasueRecvEx(xhNetUPLoader, lpszClientAddr, false);
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("上传客户端:%s,上传限速被启用,文件:%s,限速:%d"), lpszClientAddr, tszFileDir, nLimit);
}
if (!Session_UPStroage_Insert(lpszClientAddr, st_StorageBucket.tszBuckKey, tszFileDir, xhUPSpeed, nPosCount, st_StorageBucket.st_PermissionFlags.bRewrite, nLimit, nPosStart, nPosEnd))
{
st_HDRParam.bIsClose = true;
st_HDRParam.nHttpCode = 500;
Expand Down Expand Up @@ -354,6 +382,7 @@ bool XEngine_Task_HttpUPLoader(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, in
{
Session_UPStroage_Write(lpszClientAddr, lpszMsgBuffer, nMsgLen);
}
Algorithm_Calculation_ADDSDFlow(Session_UPStroage_GetSpeed(lpszClientAddr), nMsgLen);
HttpProtocol_Server_GetRecvModeEx(xhUPHttp, lpszClientAddr, &nRVMode, &nRVCount, &nHDSize);
if (nHDSize >= nRVCount)
{
Expand Down
1 change: 1 addition & 0 deletions XEngine_Source/XEngine_StorageApp/StorageApp_UPLoader.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once

XHTHREAD CALLBACK XEngine_UPLoader_HTTPThread(XPVOID lParam);
void CALLBACK XEngine_UPLoader_UPFlow(XHANDLE xhToken, bool bSDFlow, bool bRVFlow, bool bTime, XPVOID lParam);
bool XEngine_Task_HttpUPLoader(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, int nMsgLen, RFCCOMPONENTS_HTTP_REQPARAM* pSt_HTTPParam, XCHAR** pptszListHdr, int nHdrCount);

0 comments on commit 94a1f64

Please sign in to comment.