Skip to content

Commit

Permalink
[2023.01.17 CoCheLab] commit-1
Browse files Browse the repository at this point in the history
All codes involving FIFO/LRU/LFU have been written, and the final version will be uploaded after the tests are correct and the configuration file is written.
  • Loading branch information
DarriusL committed Jan 17, 2024
1 parent 3f8c7c4 commit 7a7b817
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 58 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ If you need to use the provided processed data set, download it to path : ./data

----------------------------------------------------

Coventional:

- [ ] FIFO
- [ ] LRU
- [ ] LFU

CL-based:

- [x] CL4SRec[[1]](#ref1)
Expand Down
163 changes: 157 additions & 6 deletions Room/officer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def get_save_path(cfg):
'''
if cfg['net']['type'].lower() in ['caser','psac_gen']:
return './data/saved/' + cfg['net']['type'].lower() + '/' + cfg['dataset']['type'] + '/' + f'{cfg["net"]["d"]}_{cfg["net"]["n_kernels"]}/model.model';
elif cfg['net']['type'].lower() in ['fifo','lru','lfu']:
return './data/saved/' + cfg['net']['type'].lower() + '/' + cfg['dataset']['type'] + '/model.model';
elif cfg['net']['is_norm_fist']:
norm_type = 'pre';
else:
Expand Down Expand Up @@ -490,7 +492,7 @@ def _caching_and_cal_qoe_trafficload(self, data, cache_size_list):
TrafficLoad = {};
for cache_size in cache_size_list:
cache_num = int(np.round(self.bs_storagy * cache_size));
#cache
#Get the cache corresponding to cache size
if len(alter_dict) <= cache_num:
cache_set = set(alter_dict.keys());
else:
Expand Down Expand Up @@ -520,9 +522,6 @@ def test(self, dataset):
-----------
dataset:dict
Processed dataset
dataset_raw:dict
original dataset
'''
#Adjust the number of workers of dataloader according to the system CPU and system
if platform.system().lower() == 'linux':
Expand Down Expand Up @@ -576,11 +575,11 @@ def test(self, dataset):
logger.debug(f'pre_types:{len(Counter(next_req_pre.reshape(-1).tolist()))}');

#calculate hit rate and ndcg
hitrate, ndcg = self._cal_hitrate_and_ndcg_atk(test_data.clone(), next_req_pre.clone(), next_req.clone(), self.metrics_at_k);
hitrate, ndcg = self._cal_hitrate_and_ndcg_atk(test_data, next_req_pre, next_req, self.metrics_at_k);
HitRate += hitrate;
NDCG += ndcg;
#calculate qoe and traffic load
qoe, trafficload = self._caching_and_cal_qoe_trafficload(data.clone(), self.cache_size);
qoe, trafficload = self._caching_and_cal_qoe_trafficload(data, self.cache_size);
for cs in self.cache_size:
result['QoE'][cs] += qoe[cs];
qoe_batch.append(qoe[cs]);
Expand Down Expand Up @@ -643,4 +642,156 @@ def _report_result(self, result):
f'--------------------------------------\n'\
f'cache_size - QoE -- TrafficLoad \n' + str_show2;
logger.info(str);
return str;

class ConventionalTester():
'''Tester for conventional algorithm: FIFO,LRU,LFU
Parameters:
-----------
'''
def __init__(self, config, model) -> None:
util.set_attr(self, config['test']);
self.model = model;
self.cfg = config;
if self.save:
self.save_path, _ = os.path.split(self.model_save_path);
else:
self.save_path = './cache/unsaved_data/[' + util.get_date('_') + ']';
if not os.path.exists(self.save_path):
os.makedirs(self.save_path);

def _caching_and_cal_qoe_trafficload(self, data, cache_size_list):
'''Calculate QoE and Traffice Load at cache_size
Parameters:
-----------
data:torch.Tensor
(batch_size, req_len)
cache_size_list:list
Returns:
--------
QoE:dict
TrafficLoad:dict
'''
batch_size, req_len = data.shape;
#cache the item
self.model.clear();
for batch_id in range(batch_size):
#su:(slide_len, T)
su = data[batch_id, :].unfold(-1, self.slide_T + 1, self.slide_T)[:, :self.slide_T];
self.model.update(su.reshape(1, -1));
QoE = {};
TrafficLoad = {};
for cache_size in cache_size_list:
cache_num = int(np.round(self.bs_storagy * cache_size));
#Get the cache corresponding to cache size
cache_set = self.model.generate_subcache(cache_num);
logger.debug('Tester._caching_and_cal_qoe_trafficload\n'
f'cache_num: {cache_num} - len(cache_set): {len(cache_set)}')
#calculate qoe and trafficload
qoe, userload, allload = 0, 0, 0;
for batch_id in range(batch_size):
#R:real data set
R = set(data[batch_id, :].unfold(-1, self.slide_T + 1, self.slide_T)[:, -1].tolist());
if len(cache_set & R) > (req_len - data[batch_id, :].eq(0).sum().item())*self.cache_satisfaction_ratio:
qoe += 1;
userload += len(R - cache_set);
allload += len(R);
QoE[cache_size] = qoe/batch_size;
TrafficLoad[cache_size] = userload/allload;
return QoE, TrafficLoad;




def test(self, dataset):
'''Test the conventional model:FIFO/LRU/LFU
Notes:
Conventional algorithms do not have predictive capabilities,
so hit rate and ndcg are no longer calculated here.
'''
#Adjust the number of workers of dataloader according to the system CPU and system
if platform.system().lower() == 'linux':
num_workers = self.cfg['linux_fast_num_workers'];
else:
num_workers = 0;
result = {'QoE':{}, 'TrafficLoad':{}};
#initial
for cs in self.cache_size:
result['QoE'][cs] = 0;
result['TrafficLoad'][cs] = 0;
test_loader = generator.get_dataloader(dataset, self.cfg['net'], num_workers, self.batch_size, shuffle = True, mode = 'test');
n_step = int(np.ceil(dataset['u_num']/self.batch_size)) * 2;
for i in range(n_step):
t_reason = 0;
t = time.time();
qoe_batch, tl_batch = [], [];
#test_data:(batch_size, seq_len)
#next_req:(batch_size, 1)
_, test_data, next_req = iter(test_loader).__next__();
test_data, next_req = test_data.to(self.device), next_req.to(self.device);
#next_req_logits:(batch_size, req_types)
with torch.no_grad():
t_r = time.time();
next_req_logits = self.model(test_data);
t_reason += (time.time() - t_r);
#next_req:(batch_size)
next_req_pre = next_req_logits.argmax(dim = -1);
logger.debug(f'pre_types:{len(Counter(next_req_pre.tolist()))}');
#new data:(batch_size, seq_len + 1)
data = torch.cat((test_data, next_req.unsqueeze(-1)), dim = -1);
#calculate qoe and traffic load
qoe, trafficload = self._caching_and_cal_qoe_trafficload(data, self.cache_size);
for cs in self.cache_size:
result['QoE'][cs] += qoe[cs];
qoe_batch.append(qoe[cs]);
result['TrafficLoad'][cs] += trafficload[cs];
tl_batch.append(trafficload[cs]);
self._report_batch_result(t, i, n_step, qoe_batch, tl_batch);
for cs in self.cache_size:
result['QoE'][cs] /= n_step;
result['TrafficLoad'][cs] /= n_step;
result['ReasonTime'] = t_reason/n_step;
json_util.jsonsave(result, self.save_path + '/test_result.json');
return self._report_result(result);

def _report_batch_result(self, t_start, cur_step, n_step, qoe, trafficload):
'''Report batch test result
'''

str_show2 = '';
for it in range(len(self.cache_size)):
str_show2 += f' {self.cache_size[it]:.1f} - {qoe[it]:.6f} -- {trafficload[it]:.6f} \n'
logger.info(
f'batch test[{cur_step+1}/{n_step}] time consuming: {util.s2hms(time.time() - t_start)}\n'
f'--------------------------------------\n'
f'[{self.model.type}]Result of batch test\n'
f'--------------------------------------\n'
f'--------------------------------------\n'
f'Performance report of the model\n'
f'--------------------------------------\n'
f'cache_size - QoE -- TrafficLoad \n' + str_show2
)

def _report_result(self, result):
'''Report the test result
'''

for it in self.cache_size:
str_show2 += f' {it:.1f} - {result["QoE"][it]:.6f} -- {result["TrafficLoad"][it]:.6f} \n'
str = f'[{self.model.type}]Result of test\n'\
f'--------------------------------------\n'\
f'Reasoning Time consuming:{result["ReasonTime"]}\n'\
f'--------------------------------------\n'\
f'--------------------------------------\n'\
f'Performance report of the model\n'\
f'--------------------------------------\n'\
f'cache_size - QoE -- TrafficLoad \n' + str_show2;
logger.info(str);
return str;
23 changes: 21 additions & 2 deletions Room/work.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import torch, time,os
from lib import glb_var, json_util, util, callback
from Room.officer import Trainer, Tester, get_save_path
from Room.officer import Trainer, Tester, ConventionalTester, get_save_path
from model import *
from data import *

Expand Down Expand Up @@ -77,13 +77,23 @@ def run_work(config_path, mode = 'train'):
config['train']['model_save_path'] = get_save_path(config);
logger.info(f"Updata save path:[{config['train']['model_save_path']}]")
json_util.jsonsave(config, config_path);
elif mode == 'test' and config['net']['type'].lower() in ['fifo', 'lru', 'lfu']:
config['test']['model_save_path'] = get_save_path(config);
logger.info(f"Updata save path:[{config['test']['model_save_path']}]")
json_util.jsonsave(config, config_path);

report(config, lab_cfg);
result = None;
if mode == 'train':
if config['net']['type'].lower() in ['fifo', 'lru', 'lfu']:
logger.error(f"{config['net']['type']} is not supported using {mode} mode");
raise RuntimeError;
trainer = Trainer(config, model);
run_train(trainer, dataset);
elif mode == 'train_and_test':
if config['net']['type'].lower() in ['fifo', 'lru', 'lfu']:
logger.error(f"{config['net']['type']} is not supported using {mode} mode");
raise RuntimeError;
trainer = Trainer(config, model);
run_train(trainer, dataset);
if config['test']['gpu_is_available']:
Expand All @@ -97,7 +107,10 @@ def run_work(config_path, mode = 'train'):
glb_var.set_value('device', torch.device("cuda:0" if torch.cuda.is_available() else "cpu"));
else:
glb_var.set_value('device', torch.device("cpu"));
tester = Tester(config, model);
if config['net']['type'].lower() in ['fifo', 'lru', 'lfu']:
tester = ConventionalTester(config, model);
else:
tester = Tester(config, model);
result = run_test(tester, dataset);
else:
logger.error(f'Unrecognized Mode [{mode}], acceptable:(train/test/train_and_test)');
Expand Down Expand Up @@ -166,6 +179,12 @@ def generate_model(cfg):
model = Caser(net_cfg_dict, cfg['train']['batch_size']);
elif net_cfg_dict['type'].lower() == 'egpc':
model = EGPC(net_cfg_dict);
elif net_cfg_dict['type'].lower() == 'fifo':
model = FIFO(net_cfg_dict);
elif net_cfg_dict['type'].lower() == 'lru':
model = LRU(net_cfg_dict);
elif net_cfg_dict['type'].lower() == 'lfu':
model = LFU(net_cfg_dict);
else:
logger.error(f'Unrecognized Mode [{net_cfg_dict["type"].lower()}]');
raise callback.CustomException('ModelTypeError');
Expand Down
32 changes: 3 additions & 29 deletions config/fifo/fifo_ml1m.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,43 +8,16 @@
"email_reminder": false,
"dataset": {
"type": "ml1m",
"path": "./data/datasets/process/complete/ml_devide_55.data",
"path": "./data/datasets/process/complete/ml.data",
"crop_or_fill": false,
"fill_mask": 0,
"limit_length": 55
},
"train": {
"batch_size": 256,
"max_epoch": 1000,
"valid_step": 10,
"stop_train_step_valid_not_improve": 50,
"gpu_is_available": false,
"use_amp": false,
"optimizer_type": "adam",
"learning_rate": 0.001,
"weight_decay": 1e-08,
"betas": [
0.9,
0.999
],
"use_lr_schedule": false,
"lr_max": 1e-05,
"metric_less": true,
"save": true,
"model_save_path": "./data/saved/caser/ml1m/64_8/model.model",
"end_save": false
},
"test": {
"batch_size": 256,
"cache_satisfaction_ratio": 0.2,
"bs_storagy": 1000,
"slide_T": 3,
"alter_topk": 10,
"metrics_at_k": [
5,
10,
20
],
"cache_size": [
0.1,
0.2,
Expand All @@ -58,6 +31,7 @@
1.0
],
"gpu_is_available": false,
"save": true
"save": true,
"model_save_path":"./"
}
}
2 changes: 1 addition & 1 deletion data/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def __getitem__(self, index):
return self.data[index].squeeze(0).to(torch.int64);

def get_dataloader(dataset, net_cfg, num_workers, loader_batch, shuffle = False, mode = 'train'):
if net_cfg['is_cl_method']:
if net_cfg['is_cl_method'] or net_cfg['type'].lower() in ['fifo', 'lru', 'lfu']:
return torch.utils.data.DataLoader(NextReqDataSet(dataset, mode = mode),
num_workers = num_workers,
pin_memory = True,
Expand Down
6 changes: 5 additions & 1 deletion model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@
from model.framework.psac import *
from model.framework.caser import *
from model.framework.egpc import *
__all__ = ['CL4SRec', 'Duo4SRec', 'EC4SRec', "PSAC_gen", "Caser", "EGPC"];
from model.framework.fifo import *
from model.framework.lfu import *
from model.framework.lru import *

__all__ = ['CL4SRec', 'Duo4SRec', 'EC4SRec', "PSAC_gen", "Caser", "EGPC", "FIFO", "LFU", "LRU"];
12 changes: 10 additions & 2 deletions model/framework/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,28 @@ def __init__(self, cache_cfg) -> None:
util.set_attr(self, cache_cfg);
self.cache = deque(maxlen = self.bs_storagy);

def extend(self, cache:deque, __iterable:Iterable, is_unique:bool) -> None:
def _extend(self, cache:deque, __iterable:Iterable, is_unique:bool) -> None:
if is_unique:
for item in __iterable:
if item in cache:
cache.remove(item);
cache.extend(__iterable);

def check_unique(self, __iterable:Iterable) -> int:
def _check_unique(self, __iterable:Iterable) -> int:
n = int(0);
for item in __iterable:
if item not in self.cache:
n += 1;
return n;

def update(self) -> None:
logger.error('Method needs to be called after being implemented');
raise NotImplementedError;

def clear(self) -> None:
logger.error('Method needs to be called after being implemented');
raise NotImplementedError;

def generate_subcache(self, n) -> set:
logger.error('Method needs to be called after being implemented');
raise NotImplementedError;
Loading

0 comments on commit 7a7b817

Please sign in to comment.