Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions data/Setup/setup_test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
setup_01:
host: ${{host()}}
url: /roles?$cache{test1}_$cache{test2}_$cache{test3}_$cache{test4}
method: GET
detail: test
headers:
Content-Type: application/json;
Token: $cache{admin_token}
# params json file, data
requestType: data
# empty or True will run
is_run:
data:
dependence_case: True
dependence_case_data:
- case_id: setup_sql
dependent_data:
- dependent_type: sqlData
jsonpath: $.name
set_cache: test1
- dependent_type: sqlData
jsonpath: $.id
set_cache: test2
- case_id: setup_func
dependent_data:
- dependent_type: funcData
jsonpath: $.random_int
set_cache: test3
- dependent_type: funcData
jsonpath: $.query_test.id
set_cache: test4
assert:
status_code: 200
sql:
setup_sql:
- select count(*) from user
- select * from user

setup_func:
- random_int()
- query_test(2)
2 changes: 2 additions & 0 deletions utils/other_tools/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class DependentType(Enum):
REQUEST = 'request'
SQL_DATA = 'sqlData'
CACHE = "cache"
FUNCTION = 'function'


class Assert(BaseModel):
Expand Down Expand Up @@ -119,6 +120,7 @@ class TestCase(BaseModel):
dependence_case_data: Optional[Union[None, List["DependentCaseData"], Text]] = None
sql: List = None
setup_sql: List = None
setup_func: List = None
status_code: Optional[int] = None
teardown_sql: Optional[List] = None
teardown: Union[List["TearDown"], None] = None
Expand Down
13 changes: 13 additions & 0 deletions utils/read_files_tools/get_yaml_data_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def case_process(
"sql": self.get_sql(key, values),
"assert_data": self.get_assert(key, values),
"setup_sql": self.setup_sql(values),
"setup_func": self.setup_func(values),
"teardown": self.tear_down(values),
"teardown_sql": self.teardown_sql(values),
"sleep": self.time_sleep(values),
Expand Down Expand Up @@ -359,6 +360,18 @@ def setup_sql(cls, case_data: Dict) -> Union[list, None]:
except KeyError:
return None

@classmethod
def setup_func(cls, case_data: Dict) -> Union[list, None]:
"""
get setup func
:return:
"""
try:
_setup_sql = case_data['setup_func']
return _setup_sql
except KeyError:
return None

@classmethod
def tear_down(cls, case_data: Dict) -> Union[Dict, None]:
"""
Expand Down
44 changes: 43 additions & 1 deletion utils/requests_tool/dependent_case.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from jsonpath import jsonpath
from utils.requests_tool.request_control import RequestControl
from utils.mysql_tool.mysql_control import SetUpMySQL
from utils.requests_tool.setup_func_control import CustomFunc
from utils.read_files_tools.regular_control import regular, cache_regular
from utils.other_tools.jsonpath_date_replace import jsonpath_replace
from utils.logging_tool.log_control import WARNING
Expand Down Expand Up @@ -133,6 +134,40 @@ def _dependent_type_for_sql(
else:
WARNING.logger.warning("检查到数据库开关为关闭状态,请确认配置")

def _dependent_type_for_func(
self,
setup_func: List,
dependence_case_data: "DependentCaseData",
jsonpath_dates: Dict) -> None:
"""
func type, get data from custom function
@param setup_func: setup fucntion
@param dependence_case_data: dependent data
@param jsonpath_dates: dependent case data
@return:
"""

if setup_func is not None:
setup_func = ast.literal_eval(cache_regular(str(setup_func)))
func_data = CustomFunc().setup_func_data(func=setup_func)
dependent_data = dependence_case_data.dependent_data
for i in dependent_data:
_jsonpath = i.jsonpath
jsonpath_data = self.jsonpath_data(obj=func_data, expr=_jsonpath)
_set_value = self.set_cache_value(i)
_replace_key = self.replace_key(i)
if _set_value is not None:
CacheHandler.update_cache(cache_name=_set_value, value=jsonpath_data[0])
# Cache(_set_value).set_caches(jsonpath_data[0])
if _replace_key is not None:
jsonpath_dates[_replace_key] = jsonpath_data[0]
self.url_replace(
replace_key=_replace_key,
jsonpath_dates=jsonpath_dates,
jsonpath_data=jsonpath_data,
)


def dependent_handler(
self,
_jsonpath: Text,
Expand Down Expand Up @@ -169,6 +204,7 @@ def is_dependent(self) -> Union[Dict, bool]:
# 获取依赖用例数据
_dependence_case_dates = self.__yaml_case.dependence_case_data
_setup_sql = self.__yaml_case.setup_sql
_setup_func = self.__yaml_case.setup_func
# 判断是否有依赖
if _dependent_type is True:
# 读取依赖相关的用例数据
Expand All @@ -178,11 +214,17 @@ def is_dependent(self) -> Union[Dict, bool]:
for dependence_case_data in _dependence_case_dates:
_case_id = dependence_case_data.case_id
# 判断依赖数据为sql,case_id需要写成self,否则程序中无法获取case_id
if _case_id == 'self':
if _case_id == 'setup_sql':
self._dependent_type_for_sql(
setup_sql=_setup_sql,
dependence_case_data=dependence_case_data,
jsonpath_dates=jsonpath_dates)
elif _case_id == 'setup_func':
self._dependent_type_for_func(
setup_func=_setup_func,
dependence_case_data=dependence_case_data,
jsonpath_dates=jsonpath_dates
)
else:
re_data = regular(str(self.get_cache(_case_id)))
re_data = ast.literal_eval(cache_regular(str(re_data)))
Expand Down
58 changes: 58 additions & 0 deletions utils/requests_tool/setup_func_control.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import random
from utils.logging_tool.log_control import WARNING, ERROR
import re
from typing import Union, List, Dict
import ast
from utils.read_files_tools.regular_control import cache_regular
from utils.mysql_tool.mysql_control import MysqlDB


class CustomFunc:
"""
customfunc for setup_func
"""
def __init__(self) -> None:
pass


@classmethod
def random_int(cls) -> int:
"""
:return: random int
"""
_data = random.randint(0, 5000)
return _data


@classmethod
def query_test(cls, id) -> dict:
sql = f'select * from user where id = {id}'

data = MysqlDB().query(sql)[0]

return data


def setup_func_data(self, func: Union[List, None]) -> Dict:

func = ast.literal_eval(cache_regular(str(func)))

try:
data = {}
if func is not None:
for f in func:
func_name = f.split("(")[0]
value_name = f.split("(")[1][:-1]
if value_name == "":
value_data = getattr(CustomFunc(), func_name)()
else:
value_data = getattr(CustomFunc(), func_name)(*value_name.split(","))

data[func_name] = value_data

return data
except Exception as e:
ERROR.logger.error("func execute failed!!")
raise e