diff --git a/backend/library/CMakeLists.txt b/backend/library/CMakeLists.txt index 4c7cf3c..ae13ade 100644 --- a/backend/library/CMakeLists.txt +++ b/backend/library/CMakeLists.txt @@ -20,5 +20,6 @@ add_library(pyhusky-backend-library-objs OBJECT logistic_regression.cpp word.cpp graph.cpp + svm.cpp ) set_property(TARGET pyhusky-backend-library-objs PROPERTY CXX_STANDARD 14) diff --git a/backend/library/svm.cpp b/backend/library/svm.cpp index 2fced95..d14e6a7 100644 --- a/backend/library/svm.cpp +++ b/backend/library/svm.cpp @@ -25,10 +25,6 @@ #include "husky/core/engine.hpp" #include "husky/core/utils.hpp" #include "husky/core/zmq_helpers.hpp" -#include "husky/lib/ml/data_loader.hpp" -#include "husky/lib/ml/feature_label.hpp" -#include "husky/lib/ml/parameter.hpp" -#include "husky/lib/ml/vector_linalg.hpp" #include "backend/itc.hpp" #include "backend/operation.hpp" @@ -38,17 +34,14 @@ namespace husky { -using husky::lib::ml::SparseFeatureLabel; using husky::lib::ml::ParameterBucket; using husky::lib::Aggregator; using husky::lib::AggregatorFactory; -typedef SparseFeatureLabel ObjT; +thread_local std::unordered_map> SVM_models; // how to get label and feature from data object -double get_y_(ObjT& this_obj) { return this_obj.get_label(); } -std::vector> get_X_(ObjT& this_obj) { return this_obj.get_feature(); } void PyHuskySVM::init_py_handlers() { PythonConnector::add_handler("SVMModel#SVM_load_pyhlist_py", SVM_load_pyhlist_handler); @@ -58,44 +51,113 @@ void PyHuskySVM::init_cpp_handlers() { WorkerDriver::add_handler("SVMModel#SVM_init_py", SVM_init_handler); WorkerDriver::add_handler("SVMModel#SVM_load_hdfs_py", SVM_load_hdfs_handler); WorkerDriver::add_handler("SVMModel#SVM_train_py", SVM_train_handler); + WorkerDriver::add_handler("SVMModel#SVM_test_py", SVM_test_handler); } -void PyHuskySVM::init_daemon_handlers() { ThreadConnector::add_handler("SVMModel#SVM_train", daemon_train_handler); } +void PyHuskySVM::init_daemon_handlers() { + ThreadConnector::add_handler("SVMModel#SVM_train", daemon_train_handler); + ThreadConnector::add_handler("SVMModel#SVM_test", daemon_train_handler); +} void PyHuskySVM::SVM_load_pyhlist_handler(PythonSocket& python_socket, ITCWorker& daemon_socket) { LOG_I << "start SVM_load_pyhlist"; - - LOG_I << "set_model"; + // override + std::string name = zmq_recv_string(python_socket.pipe_from_python); + std::string sparse = zmq_recv_string(python_socket.pipe_from_python); + + // create model + if (sparse == "true") { + PyHuskySVM::create_model_from_pyhuskylist(name, python_socket, daemon_socket); + } else { + PyHuskySVM::create_model_from_pyhuskylist(name, python_socket, daemon_socket); + } LOG_I << "finish SVM_load_pyhlist"; } +template +void PyHuskySVM::create_model_from_pyhuskylist(std::string name, PythonSocket& python_socket, ITCWorker& daemon_socket) { + using LabeledPointHObj = husky::lib::ml::LabeledPointHObj; + auto& load_list = husky::ObjListStore::create_objlist(name); + + int n_sample = std::stoi(zmq_recv_string(python_socket.pipe_from_python)); + + husky::lib::Aggregator n_feature_agg(0, [](int& a, const int& b) { a = std::max(a, b); }); + auto& ac = husky::lib::AggregatorFactory::get_channel(); + + int num_features = 0; + + for (int i = 0; i < n_sample; i++) { + int n = std::stoi(zmq_recv_string(python_socket.pipe_from_python)); + LabeledPointHObj this_obj(n); + for (int j = 0; j < n; j++) { + int X_idx = std::stoi(zmq_recv_string(python_socket.pipe_from_python)); + double X_elem = std::stod(zmq_recv_string(python_socket.pipe_from_python)); + this_obj.x.set(X_idx, X_elem); + num_features = std::max(num_features, X_idx + 1); + } + double y = std::stod(zmq_recv_string(python_socket.pipe_from_python)); + this_obj.y = y; + load_list.add_object(this_obj); + } + + n_feature_agg.update(num_features); + husky::lib::AggregatorFactory::sync(); + num_features = n_feature_agg.get_value(); + + list_execute(load_list, [&](LabeledPointHObj& this_obj) { + if (this_obj.x.get_feature_num() != num_features) { + this_obj.x.resize(num_features); + } + }); + + assert(num_features > 0); + SVM_models[name] = std::make_shared(num_features); +} + void PyHuskySVM::SVM_init_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket) { LOG_I << "SVM_init_handler"; } void PyHuskySVM::SVM_load_hdfs_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket) { LOG_I << "SVM_load_hdfs_handler"; - // overide - // Get Parameters sent from python + const std::string& url = op.get_param("url"); const std::string& name = op.get_param("list_name"); - auto& load_list = husky::ObjListFactory::create_objlist(name); + const std::string& format = op.get_param("format"); // load data - husky::lib::ml::DataLoader data_loader(husky::lib::ml::kLIBSVMFormat); - data_loader.load_info(url, load_list); - int num_features = data_loader.get_num_feature(); + const std::string& sparse = op.get_param("is_sparse"); + bool is_sparse = sparse == "true" ? true : false; + husky::lib::ml::DataFormat data_format = + format == "tsv" ? husky::lib::ml::kTSVFormat : husky::lib::ml::kLIBSVMFormat; + if (is_sparse) { + SVM_create_model_from_url(name, url, data_format); + } else { + SVM_create_model_from_url(name, url, data_format); + } + LOG_I << "create SVM Model"; +} + +template +void PyHuskySVM::train_SVM(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket) { + LOG_I << "start SVM_train"; + // Get Parameters sent from python + const std::string& name = op.get_param("list_name"); + using LabeledPointHObj = husky::lib::ml::LabeledPointHObj; + auto& load_list = husky::ObjListStore::get_objlist(name); // get model config parameters - double lambda = std::stod(husky::Context::get_param("lambda")); - int num_iter = std::stoi(husky::Context::get_param("n_iter")); + double lambda = std::stod(op.get_param("lambda")); + int num_iter = std::stoi(op.get_param("n_iter")); // initialize parameters - ParameterBucket param_list(num_features + 1); // scalar b and vector w + ParameterBucket& param_list = SVM_models[name]->init_param_list(); // scalar b and vector w + int num_features = SVM_models[name]->num_features; if (husky::Context::get_global_tid() == 0) { LOG_I << "num of params: " + std::to_string(param_list.get_num_param()); + LOG_I << "num of features: " << num_features; } // get the number of global records Aggregator num_samples_agg(0, [](int& a, const int& b) { a += b; }); @@ -107,7 +169,6 @@ void PyHuskySVM::SVM_load_hdfs_handler(const Operation& op, PythonSocket& python } // Aggregators for regulator, w square and loss - Aggregator regulator_agg(0.0, [](double& a, const double& b) { a += b; }); Aggregator sqr_w_agg(0.0, [](double& a, const double& b) { a += b; }); sqr_w_agg.to_reset_each_iter(); Aggregator loss_agg(0.0, [](double& a, const double& b) { a += b; }); @@ -117,19 +178,19 @@ void PyHuskySVM::SVM_load_hdfs_handler(const Operation& op, PythonSocket& python auto start = std::chrono::steady_clock::now(); for (int i = 0; i < num_iter; i++) { double sqr_w = 0.0; // ||w||^2 - double regulator = 0.0; // prevent overfitting + double regulator = std::stod(op.get_param_or("C", "0")); // prevent overfitting // calculate w square - for (int idx = 1; idx <= num_features; idx++) { + for (int idx = 0; idx < num_features; idx++) { double w = param_list.param_at(idx); sqr_w += w * w; } // get local copy of parameters - std::vector bweight = param_list.get_all_param(); + husky::lib::Vector bweight = param_list.get_all_param(); // bweight, 0 - n-1: weights, n: intercept // calculate regulator - regulator = (sqr_w == 0) ? 1.0 : std::min(1.0, 1.0 / sqrt(sqr_w * lambda)); + regulator = regulator ? regulator : (sqr_w == 0) ? 1.0 : std::min(1.0, 1.0 / sqrt(sqr_w * lambda)); if (regulator < 1) { bweight *= regulator; sqr_w = 1 / lambda; @@ -139,7 +200,7 @@ void PyHuskySVM::SVM_load_hdfs_handler(const Operation& op, PythonSocket& python // regularize w in param_list if (husky::Context::get_global_tid() == 0) { - for (int idx = 1; idx < bweight.size(); idx++) { + for (int idx = 0; idx < num_features ; idx++) { double w = bweight[idx]; param_list.update(idx, (w - w / regulator - eta * w)); } @@ -147,32 +208,30 @@ void PyHuskySVM::SVM_load_hdfs_handler(const Operation& op, PythonSocket& python auto& ac = AggregatorFactory::get_channel(); // calculate gradient - husky::list_execute(load_list, {}, {&ac}, [&](ObjT& this_obj) { + husky::list_execute(load_list, {}, {&ac}, [&](LabeledPointHObj& this_obj) { double prod = 0; // prod = WX * y - double y = get_y_(this_obj); - std::vector> X = get_X_(this_obj); - for (auto& x : X) - prod += bweight[x.first] * x.second; + double y = this_obj.y; + auto X = this_obj.x; + for (auto it = X.begin_feaval(); it != X.end_feaval(); ++it) + prod += bweight[(*it).fea] * (*it).val; // bias - prod += bweight[0]; + prod += bweight[num_features]; prod *= y; if (prod < 1) { // the data point falls within the margin - for (auto& x : X) { - x.second *= y; // calculate the gradient for each parameter - param_list.update(x.first, eta * x.second / num_samples / lambda); + for (auto it = X.begin_feaval(); it != X.end_feaval(); ++it) { + auto x = *it; + x.val *= y; // calculate the gradient for each parameter + param_list.update(x.fea, eta * x.val / num_samples / lambda); } // update bias - param_list.update(0, eta * y / num_samples); + param_list.update(num_features, eta * y / num_samples); loss_agg.update(1 - prod); } sqr_w_agg.update(sqr_w); - regulator_agg.update(regulator); }); - int num_samples = num_samples_agg.get_value(); sqr_w = sqr_w_agg.get_value() / num_samples; - regulator = regulator_agg.get_value() / num_samples; double loss = lambda / 2 * sqr_w + loss_agg.get_value() / num_samples; if (husky::Context::get_global_tid() == 0) { LOG_I << "Iteration " + std::to_string(i + 1) + ": ||w|| = " + std::to_string(sqrt(sqr_w)) + ", loss = " + @@ -183,18 +242,83 @@ void PyHuskySVM::SVM_load_hdfs_handler(const Operation& op, PythonSocket& python // Show result if (husky::Context::get_global_tid() == 0) { - param_list.present(); + // param_list.present(); LOG_I << "Time per iter: " + - std::to_string(std::chrono::duration_cast>(end - start).count() / - num_iter); + std::to_string(std::chrono::duration_cast>(end - start).count() / num_iter); + LOG_I << "send back the parameter to pyhusky"; + BinStream result; + result << param_list.get_num_param(); + for (auto v : param_list.get_all_param()) { + result << v; + } + daemon_socket.sendmore("SVMModel#SVM_train"); + daemon_socket.send(std::move(result)); } + + LOG_I << "finish SVM_finish"; } void PyHuskySVM::SVM_train_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket) { - // override - LOG_I << "start SVM_train"; + const std::string& sparse = op.get_param("is_sparse"); + bool is_sparse = sparse == "true" ? true : false; + if (is_sparse) { + PyHuskySVM::train_SVM(op, python_socket, daemon_socket); + } else { + PyHuskySVM::train_SVM(op, python_socket, daemon_socket); + } +} - LOG_I << "finish SVM_finish"; +template +void PyHuskySVM::test_SVM(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket) { + using LabeledPointHObj = husky::lib::ml::LabeledPointHObj; + const std::string& format = op.get_param("format"); + husky::lib::ml::DataFormat data_format = + format == "tsv" ? husky::lib::ml::kTSVFormat : husky::lib::ml::kLIBSVMFormat; + const std::string& name = op.get_param("list_name"); + + auto& test_set = husky::ObjListStore::create_objlist(); + husky::lib::ml::load_data(op.get_param("url"), test_set, data_format); + + ParameterBucket& param_list = SVM_models[name]->get_param_list(); // scalar b and vector w + int num_features = SVM_models[name]->num_features; + + Aggregator accu_agg(0, [](int& a, const int& b) { a += b; }); + Aggregator num_test_agg(0, [](int& a, const int& b) { a += b; }); + auto& ac = AggregatorFactory::get_channel(); + auto bweight = param_list.get_all_param(); + list_execute(test_set, {}, {&ac}, [&](LabeledPointHObj& this_obj) { + double indicator = 0; + auto y = this_obj.y; + auto X = this_obj.x; + for (auto it = X.begin_feaval(); it != X.end_feaval(); it++) + indicator += bweight[(*it).fea] * (*it).val; + // bias + indicator += bweight[num_features]; + indicator *= y; // right prediction if positive (Wx+b and y have the same sign) + if (indicator >= 0) + accu_agg.update(1); + num_test_agg.update(1); + }); + + if (husky::Context::get_global_tid() == 0) { + BinStream result; + double accu = static_cast(accu_agg.get_value()) / num_test_agg.get_value(); + result << accu; + husky::LOG_I << "Accuracy rate on testing set: " << accu; + daemon_socket.sendmore("SVMModel#SVM_test"); + daemon_socket.send(std::move(result)); + } +} + + +void PyHuskySVM::SVM_test_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket) { + const std::string& sparse = op.get_param("is_sparse"); + bool is_sparse = sparse == "true" ? true : false; + if (is_sparse) { + PyHuskySVM::test_SVM(op, python_socket, daemon_socket); + } else { + PyHuskySVM::test_SVM(op, python_socket, daemon_socket); + } } void PyHuskySVM::daemon_train_handler(ITCDaemon& to_worker, BinStream& buffer) { diff --git a/backend/library/svm.hpp b/backend/library/svm.hpp index 1b845da..4fc334a 100644 --- a/backend/library/svm.hpp +++ b/backend/library/svm.hpp @@ -14,6 +14,9 @@ #pragma once +#include "husky/lib/ml/data_loader.hpp" +#include "husky/lib/ml/feature_label.hpp" +#include "husky/lib/ml/parameter.hpp" #include "husky/base/serialization.hpp" namespace husky { @@ -41,9 +44,62 @@ class PyHuskySVM { static void SVM_init_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket); static void SVM_load_hdfs_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket); static void SVM_train_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket); + static void SVM_test_handler(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket); // daemon handlers static void daemon_train_handler(ITCDaemon&, BinStream&); + + template + static void train_SVM(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket); + template + static void test_SVM(const Operation& op, PythonSocket& python_socket, ITCWorker& daemon_socket); + template + static void create_model_from_pyhuskylist(std::string name, PythonSocket& python_socket, ITCWorker& daemon_socket); }; // class PyHuskyML +class SVMModel { + public: + SVMModel(int num_features) + : num_features(num_features) { + } + + husky::lib::ml::ParameterBucket& init_param_list() { + if (param_list) { + delete param_list; + } + param_list = new husky::lib::ml::ParameterBucket(num_features + 1); + return *param_list; + } + + husky::lib::ml::ParameterBucket& get_param_list() { + return *param_list; + } + + ~SVMModel() { + if (param_list) { + delete param_list; + } + } + + int num_features; + husky::lib::ml::ParameterBucket* param_list = nullptr; +}; + +extern thread_local std::unordered_map> SVM_models; + +template +void SVM_create_model_from_url(std::string name, std::string url, husky::lib::ml::DataFormat data_format) { + husky::base::log_msg("create model name: " + name); + + using LabeledPointHObj = husky::lib::ml::LabeledPointHObj; + auto& load_list = husky::ObjListStore::create_objlist(name); + + // load data + int num_features = husky::lib::ml::load_data(url, load_list, data_format); + + // init model + assert(num_features > 0); + SVM_models[name] = std::make_shared(num_features); +} + } // namespace husky diff --git a/backend/register.cpp b/backend/register.cpp index db9bce5..214066a 100644 --- a/backend/register.cpp +++ b/backend/register.cpp @@ -18,6 +18,7 @@ #include "backend/library/graph.hpp" #include "backend/library/linear_regression.hpp" #include "backend/library/logistic_regression.hpp" +#include "backend/library/svm.hpp" #include "backend/library/word.hpp" namespace husky { @@ -25,7 +26,7 @@ namespace husky { void RegisterFunction::register_py_handlers() { PyHuskyFunctional::init_py_handlers(); PyHuskyLinearR::init_py_handlers(); - // PyHuskySVM::init_py_handlers(); + PyHuskySVM::init_py_handlers(); PyHuskyLogisticR::init_py_handlers(); PyHuskyWord::init_py_handlers(); PyHuskyGraph::init_py_handlers(); @@ -33,7 +34,7 @@ void RegisterFunction::register_py_handlers() { void RegisterFunction::register_cpp_handlers() { PyHuskyLinearR::init_cpp_handlers(); - // PyHuskySVM::init_cpp_handlers(); + PyHuskySVM::init_cpp_handlers(); PyHuskyLogisticR::init_cpp_handlers(); PyHuskyWord::init_cpp_handlers(); PyHuskyGraph::init_cpp_handlers(); @@ -42,7 +43,7 @@ void RegisterFunction::register_cpp_handlers() { void RegisterFunction::register_daemon_handlers() { PyHuskyFunctional::init_daemon_handlers(); PyHuskyLinearR::init_daemon_handlers(); - // PyHuskySVM::init_daemon_handlers(); + PyHuskySVM::init_daemon_handlers(); PyHuskyLogisticR::init_daemon_handlers(); PyHuskyWord::init_daemon_handlers(); PyHuskyGraph::init_daemon_handlers(); diff --git a/master/splitter_register.cpp b/master/splitter_register.cpp index 0b6bb8b..225185c 100644 --- a/master/splitter_register.cpp +++ b/master/splitter_register.cpp @@ -42,7 +42,7 @@ void splitter_register() { OperationSplitter::add_splitter("SVMModel#SVM_init_py", OperationSplitter::load); OperationSplitter::add_splitter("SVMModel#SVM_load_hdfs_py", OperationSplitter::load); OperationSplitter::add_splitter("SVMModel#SVM_load_pyhlist_py", OperationSplitter::simple_split); - OperationSplitter::add_splitter("SVMModel#SVM_train_py", OperationSplitter::load); + OperationSplitter::add_splitter("SVMModel#SVM_test_py", OperationSplitter::load); // Logistic Regression OperationSplitter::add_splitter("LogisticRegressionModel#LogisticR_init_py", OperationSplitter::load); diff --git a/python/pyhusky/backend/library/svm.py b/python/pyhusky/backend/library/svm.py index 6d8b74d..b8fb124 100644 --- a/python/pyhusky/backend/library/svm.py +++ b/python/pyhusky/backend/library/svm.py @@ -35,6 +35,7 @@ def register(): @staticmethod def load_pyhlist_prefunc(op): GlobalVar.xy_line_list = op.op_param[OperationParam.list_str] + GlobalVar.is_sparse = op.op_param["is_sparse"] GlobalVar.xy_line_store = [] @staticmethod @@ -49,12 +50,15 @@ def load_pyhlist_func(_, data): def load_pyhlist_end_postfunc(_): GlobalSocket.pipe_to_cpp.send("SVMModel#SVM_load_pyhlist_py") GlobalSocket.pipe_to_cpp.send(GlobalVar.xy_line_list) + GlobalSocket.pipe_to_cpp.send(GlobalVar.is_sparse) # send out the len of datatset + assert len(GlobalVar.xy_line_store) != 0 GlobalSocket.pipe_to_cpp.send(str(len(GlobalVar.xy_line_store))) for (X, y) in GlobalVar.xy_line_store: # send out the len of X GlobalSocket.pipe_to_cpp.send(str(len(X))) - for i in X: + for (i, v) in X: GlobalSocket.pipe_to_cpp.send(str(i)) + GlobalSocket.pipe_to_cpp.send(str(v)) GlobalSocket.pipe_to_cpp.send(str(y)) GlobalSocket.xy_line_store = [] diff --git a/python/pyhusky/backend/processor.py b/python/pyhusky/backend/processor.py index 4de55d1..24af0df 100644 --- a/python/pyhusky/backend/processor.py +++ b/python/pyhusky/backend/processor.py @@ -18,8 +18,8 @@ from pyhusky.common.binstream import BinStream def log_msg(msg): - print msg - sys.stdout.flush() + sys.stderr.write(msg + "\n") + sys.stderr.flush() def get_map_partition(node): map_partitions = [] diff --git a/python/pyhusky/backend/register.py b/python/pyhusky/backend/register.py index f474fad..3948ed7 100644 --- a/python/pyhusky/backend/register.py +++ b/python/pyhusky/backend/register.py @@ -15,6 +15,7 @@ import pyhusky.backend.library.functional as functional import pyhusky.backend.library.linear_regression as LinearR import pyhusky.backend.library.logistic_regression as LogisticR +import pyhusky.backend.library.svm as SVM import pyhusky.backend.library.word as word import pyhusky.backend.library.graph as graph @@ -23,5 +24,6 @@ def register_func(): functional.register_all() LinearR.register_all() LogisticR.register_all() + SVM.register_all() word.register_all() graph.register_all() diff --git a/python/pyhusky/frontend/library/register.py b/python/pyhusky/frontend/library/register.py index 458f8c6..eee4d89 100644 --- a/python/pyhusky/frontend/library/register.py +++ b/python/pyhusky/frontend/library/register.py @@ -14,6 +14,7 @@ from pyhusky.frontend.library.linear_regression_receiver import LinearRegressionModelReceiver from pyhusky.frontend.library.logistic_regression_receiver import LogisticRegressionModelReceiver +from pyhusky.frontend.library.svm_receiver import SVMReceiver from pyhusky.frontend.library.word_receiver import WordReceiver from pyhusky.frontend.library.graph_receiver import GraphReceiver @@ -22,3 +23,4 @@ def register(receiver_map): LogisticRegressionModelReceiver.register(receiver_map) WordReceiver.register(receiver_map) GraphReceiver.register(receiver_map) + SVMReceiver.register(receiver_map) diff --git a/python/pyhusky/frontend/library/svm.py b/python/pyhusky/frontend/library/svm.py index 9d73ee1..fd260aa 100644 --- a/python/pyhusky/frontend/library/svm.py +++ b/python/pyhusky/frontend/library/svm.py @@ -20,58 +20,68 @@ from pyhusky.frontend.huskylist import PyHuskyList class SVMModel(HuskyList): - def __init__(self, n_feature=-1): + def __init__(self, n_feature=-1, is_sparse="true"): assert isinstance(n_feature, int) super(SVMModel, self).__init__() self.list_name += "SVM" self.loaded = False self.trained = False + self.train_set_cached = False self.param = None self.intercept = None + self.is_sparse = is_sparse param = {"n_feature" : str(n_feature), OperationParam.list_str : self.list_name, + "is_sparse" : self.is_sparse, "Type" : "cpp"} - op = Operation("SVMModel#SVM_init_py", param, []) - scheduler.compute(op) + # op = Operation("SVMModel#SVM_init_py", param, []) + # scheduler.compute(op) + self.pending_op = Operation("SVMModel#SVM_init_py", param, []) - def load_hdfs(self, url): + def load_hdfs(self, url, fmat): assert isinstance(url, str) assert not self.loaded param = {"url" : url, + "format" : fmat, + "is_sparse" : self.is_sparse, OperationParam.list_str : self.list_name, "Type" : "cpp"} - op = Operation("SVMModel#SVM_load_hdfs_py", param, []) - scheduler.compute(op) + self.pending_op = Operation("SVMModel#SVM_load_hdfs_py", param, [self.pending_op]) + # scheduler.compute(op) self.loaded = True def load_pyhlist(self, xy_list): assert not self.loaded if isinstance(xy_list, PyHuskyList): - param = {OperationParam.list_str : self.list_name} - self.pending_op = Operation("SVMModel#SVM_load_pyhlist_py", param, [xy_list.pending_op]) + param = {OperationParam.list_str : self.list_name, "is_sparse" : self.is_sparse} + self.pending_op = Operation("SVMModel#SVM_load_pyhlist_py", param, [self.pending_op, xy_list.pending_op]) scheduler.compute(self.pending_op) + self.train_set_cached = True self.loaded = True else: return NotImplemented - def train(self, n_iter=10, alpha=0.1): + def train(self, n_iter=10, lam=0.1, C=None): assert self.loaded assert isinstance(n_iter, int) - assert isinstance(alpha, float) param = {"n_iter" : str(n_iter), - "alpha" : str(alpha), + "is_sparse" : self.is_sparse, + "lambda" : str(lam), OperationParam.list_str : self.list_name, "Type" : "cpp"} - op = Operation("SVMModel#SVM_train_py", param, []) - param_list = scheduler.compute_collect(op) - self.param = np.array(param_list[:-1]) - self.intercept = param_list[-1] - self.loaded = False + if C is not None: + param["C"] = str(C) + self.pending_op = Operation("SVMModel#SVM_train_py", param, [self.pending_op]\ + if not self.train_set_cached else [Operation("SVMModel#SVM_init_py", {"Type":"cpp"}, [])]) + self.train_set_cached = True + param_list = scheduler.compute_collect(self.pending_op) + self.param = np.array(param_list[1:]) + self.intercept = param_list[0] self.trained = True def get_param(self): @@ -94,3 +104,18 @@ def predict(self, X): assert hasattr(X, '__iter__') return np.dot(np.array(X), self.param) + self.intercept + + def test(self, url, fmat, is_sparse = None): + assert isinstance(url, str) + assert self.trained + + if is_sparse is None: + is_sparse = self.is_sparse + + param = {"url" : url, + "format" : str(fmat), + "is_sparse" : str(self.is_sparse), + OperationParam.list_str : self.list_name, + "Type" : "cpp"} + op = Operation("SVMModel#SVM_test_py", param, []) + return scheduler.compute_collect(op) diff --git a/python/pyhusky/frontend/library/svm_receiver.py b/python/pyhusky/frontend/library/svm_receiver.py index c697a46..cfc4dcc 100644 --- a/python/pyhusky/frontend/library/svm_receiver.py +++ b/python/pyhusky/frontend/library/svm_receiver.py @@ -19,6 +19,7 @@ def __init__(self): @staticmethod def register(receiver_map): receiver_map["SVMModel#SVM_train_py"] = SVMReceiver.train_receiver + receiver_map["SVMModel#SVM_test_py"] = SVMReceiver.test_receiver @staticmethod def train_receiver(reply): @@ -30,3 +31,8 @@ def train_receiver(reply): param_v = reply.load_double() res.append(param_v) return res + + @staticmethod + def test_receiver(reply): + reply.load_int64() + return reply.load_double()