diff --git a/example/config.yaml b/example/config.yaml index 1992f97b..bb82a33d 100644 --- a/example/config.yaml +++ b/example/config.yaml @@ -1,5 +1,7 @@ general: name: NameFromConfig + base_path: /tmp/jam_node + modules_dir: modules metrics: enabled: true @@ -17,11 +19,16 @@ logging: groups: - name: main sink: console - level: info + level: trace is_fallback: true children: - name: jam children: + - name: modules + children: + - name: example_module + - name: synchronizer_module + - name: networking_module - name: injector - name: application - name: rpc diff --git a/scripts/asn1.py b/scripts/asn1.py index b92967cb..f39a121a 100755 --- a/scripts/asn1.py +++ b/scripts/asn1.py @@ -185,10 +185,10 @@ def asn_sequence_of(t): if T == "U8": if fixed: if type(size) is int: - return "qtils::BytesN<%u>" % size + return "qtils::ByteArr<%u>" % size else: return "::jam::ConfigVec" % c_dash(size) - return "qtils::Bytes" + return "qtils::ByteVec" if fixed: if isinstance(size, str): return "::jam::ConfigVec<%s, Config::Field::%s>" % (T, c_dash(size)) @@ -412,7 +412,8 @@ def __init__(self, cpp_namespace: str, path: str): "#include ", "#include ", "", - "#include ", + "#include ", + "#include ", "#include ", "#include ", "", @@ -468,7 +469,8 @@ def __init__(self, cpp_namespace: str, name: str, path: str, module: str): "#include ", "#include ", "", - "#include ", + "#include ", + "#include ", "", *includes, "#include ", diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 87a038bb..45ee63fe 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -4,7 +4,9 @@ # SPDX-License-Identifier: Apache-2.0 # -include_directories(${CMAKE_CURRENT_SOURCE_DIR}) +include_directories(${CMAKE_SOURCE_DIR}) +include_directories(${CMAKE_SOURCE_DIR}/src) +include_directories(${CMAKE_BINARY_DIR}/generated) # Executables (should contain `main()` function) add_subdirectory(executable) diff --git a/src/app/CMakeLists.txt b/src/app/CMakeLists.txt index 023e1824..c1b37f01 100644 --- a/src/app/CMakeLists.txt +++ b/src/app/CMakeLists.txt @@ -42,6 +42,7 @@ target_link_libraries(app_configurator add_library(app_state_manager SHARED impl/state_manager_impl.cpp) target_link_libraries(app_state_manager + qtils::qtils logger ) diff --git a/src/app/configuration.cpp b/src/app/configuration.cpp index d2f1aa10..9c9ab1ac 100644 --- a/src/app/configuration.cpp +++ b/src/app/configuration.cpp @@ -8,19 +8,31 @@ namespace jam::app { Configuration::Configuration() - : version_("undefined"), name_("unnamed"), metrics_endpoint_() {} + : version_("undefined"), + name_("unnamed"), + metrics_{ + .endpoint{}, + .enabled{}, + } {} - std::string Configuration::nodeVersion() const { + const std::string &Configuration::nodeVersion() const { return version_; } - std::string Configuration::nodeName() const { + const std::string &Configuration::nodeName() const { return name_; } - std::optional Configuration::metricsEndpoint() - const { - return metrics_endpoint_; + const std::filesystem::path &Configuration::basePath() const { + return base_path_; + } + + const std::filesystem::path &Configuration::modulesDir() const { + return modules_dir_; + } + + const Configuration::MetricsConfig &Configuration::metrics() const { + return metrics_; } } // namespace jam::app diff --git a/src/app/configuration.hpp b/src/app/configuration.hpp index 2132356b..9e335bca 100644 --- a/src/app/configuration.hpp +++ b/src/app/configuration.hpp @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -16,24 +17,32 @@ namespace jam::app { public: using Endpoint = boost::asio::ip::tcp::endpoint; + struct MetricsConfig { + Endpoint endpoint; + std::optional enabled; + }; + Configuration(); // /// Generate yaml-file with actual config // virtual void generateConfigFile() const = 0; - [[nodiscard]] std::string nodeVersion() const; - [[nodiscard]] std::string nodeName() const; + [[nodiscard]] virtual const std::string &nodeVersion() const; + [[nodiscard]] virtual const std::string &nodeName() const; + [[nodiscard]] virtual const std::filesystem::path &basePath() const; + [[nodiscard]] virtual const std::filesystem::path &modulesDir() const; - [[nodiscard]] std::optional metricsEndpoint() const; + [[nodiscard]] virtual const MetricsConfig &metrics() const; private: friend class Configurator; // for external configure std::string version_; std::string name_; + std::filesystem::path base_path_; + std::filesystem::path modules_dir_; - Endpoint metrics_endpoint_; - std::optional metrics_enabled_; + MetricsConfig metrics_; }; } // namespace jam::app diff --git a/src/app/configurator.cpp b/src/app/configurator.cpp index 32cc5325..b2b9ce6b 100644 --- a/src/app/configurator.cpp +++ b/src/app/configurator.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include "app/build_version.hpp" #include "app/configuration.hpp" @@ -26,6 +27,8 @@ OUTCOME_CPP_DEFINE_CATEGORY(jam::app, Configurator::Error, e) { return "CLI Arguments parse failed"; case E::ConfigFileParseFailed: return "Config file parse failed"; + case E::InvalidValue: + return "Result config has invalid values"; } BOOST_UNREACHABLE_RETURN("Unknown log::Error"); } @@ -74,8 +77,8 @@ namespace jam::app { config_->version_ = buildVersion(); config_->name_ = "noname"; - config_->metrics_endpoint_ = {boost::asio::ip::address_v4::any(), 9615}; - config_->metrics_enabled_ = std::nullopt; + config_->metrics_.endpoint = {boost::asio::ip::address_v4::any(), 9615}; + config_->metrics_.enabled = std::nullopt; namespace po = boost::program_options; @@ -83,23 +86,25 @@ namespace jam::app { po::options_description general_options("General options", 120, 100); general_options.add_options() - ("help,h", "show this help message") - ("version,v", "show version information") - ("name,n", po::value(), "set name of node") - ("config,c", po::value(), "optional, filepath to load configuration from. Overrides default config values") + ("help,h", "Show this help message.") + ("version,v", "Show version information.") + ("base_path", po::value(), "Set base path. All relative paths will be resolved based on this path.") + ("config,c", po::value(), "Optional. Filepath to load configuration from. Overrides default configuration values.") + ("modules_dir", po::value(), "Set path to directory containing modules.") + ("name,n", po::value(), "Set name of node.") ("log,l", po::value>(), "Sets a custom logging filter.\n" - "Syntax is `=`, e.g. -llibp2p=off.\n" - "Log levels (most to least verbose) are trace, debug, verbose, info, warn, error, critical, off.\n" - "By default, all targets log `info`.\n" - "The global log level can be set with -l.") + "Syntax: =, e.g., -llibp2p=off.\n" + "Log levels: trace, debug, verbose, info, warn, error, critical, off.\n" + "Default: all targets log at `info`.\n" + "Global log level can be set with: -l.") ; po::options_description metrics_options("Metric options"); metrics_options.add_options() - ("prometheus-disable", "set to disable OpenMetrics") - ("prometheus-host", po::value(), "address for OpenMetrics over HTTP") - ("prometheus-port", po::value(), "port for OpenMetrics over HTTP") + ("prometheus_disable", "Set to disable OpenMetrics.") + ("prometheus_host", po::value(), "Set address for OpenMetrics over HTTP.") + ("prometheus_port", po::value(), "Set port for OpenMetrics over HTTP.") ; // clang-format on @@ -191,7 +196,8 @@ namespace jam::app { } outcome::result> Configurator::calculateConfig( - std::shared_ptr logger) { + qtils::SharedRef logger) { + logger_ = std::move(logger); OUTCOME_TRY(initGeneralConfig()); OUTCOME_TRY(initOpenMetricsConfig()); @@ -214,13 +220,49 @@ namespace jam::app { file_has_error_ = true; } } + auto base_path = section["base_path"]; + if (base_path.IsDefined()) { + if (base_path.IsScalar()) { + auto value = base_path.as(); + config_->base_path_ = value; + } else { + file_errors_ << "E: Value 'general.base_path' must be scalar\n"; + file_has_error_ = true; + } + } + auto modules_dir = section["modules_dir"]; + if (modules_dir.IsDefined()) { + if (modules_dir.IsScalar()) { + auto value = modules_dir.as(); + config_->modules_dir_ = value; + } else { + file_errors_ << "E: Value 'general.modules_dir' must be scalar\n"; + file_has_error_ = true; + } + } } else { - file_errors_ << "E: Section 'general' defined, but is not scalar\n"; + file_errors_ << "E: Section 'general' defined, but is not map\n"; file_has_error_ = true; } } } + if (file_has_error_) { + std::string path; + find_argument( + cli_values_map_, "config", [&](const std::string &value) { + path = value; + }); + SL_ERROR(logger_, "Config file `{}` has some problems:", path); + std::istringstream iss(file_errors_.str()); + std::string line; + while (std::getline(iss, line)) { + SL_ERROR(logger_, " {}", std::string_view(line).substr(3)); + } + return Error::ConfigFileParseFailed; + } + + // Adjust by CLI arguments bool fail; fail = false; @@ -228,10 +270,47 @@ namespace jam::app { cli_values_map_, "name", [&](const std::string &value) { config_->name_ = value; }); + find_argument( + cli_values_map_, "base_path", [&](const std::string &value) { + config_->base_path_ = value; + }); + find_argument( + cli_values_map_, "modules_dir", [&](const std::string &value) { + config_->modules_dir_ = value; + }); if (fail) { return Error::CliArgsParseFailed; } + // Check values + if (not config_->base_path_.is_absolute()) { + SL_ERROR(logger_, + "The 'base_path' must be defined as absolute: {}", + config_->base_path_.c_str()); + return Error::InvalidValue; + } + if (not is_directory(config_->base_path_)) { + SL_ERROR(logger_, + "The 'base_path' does not exist or is not a directory: {}", + config_->base_path_.c_str()); + return Error::InvalidValue; + } + current_path(config_->base_path_); + + auto make_absolute = [&](const std::filesystem::path &path) { + return weakly_canonical(config_->base_path_.is_absolute() + ? path + : (config_->base_path_ / path)); + }; + + config_->modules_dir_ = make_absolute(config_->modules_dir_); + if (not is_directory(config_->modules_dir_)) { + SL_ERROR(logger_, + "The 'modules_dir' does not exist or is not a directory: {}", + config_->modules_dir_.c_str()); + return Error::InvalidValue; + } + return outcome::success(); } @@ -245,9 +324,9 @@ namespace jam::app { if (enabled.IsScalar()) { auto value = enabled.as(); if (value == "true") { - config_->metrics_enabled_ = true; + config_->metrics_.enabled = true; } else if (value == "false") { - config_->metrics_enabled_ = false; + config_->metrics_.enabled = false; } else { file_errors_ << "E: Value 'network.metrics.enabled' has wrong value. " @@ -267,10 +346,10 @@ namespace jam::app { boost::beast::error_code ec; auto address = boost::asio::ip::address::from_string(value, ec); if (!ec) { - config_->metrics_endpoint_ = { - address, config_->metrics_endpoint_.port()}; - if (not config_->metrics_enabled_.has_value()) { - config_->metrics_enabled_ = true; + config_->metrics_.endpoint = { + address, config_->metrics_.endpoint.port()}; + if (not config_->metrics_.enabled.has_value()) { + config_->metrics_.enabled = true; } } else { file_errors_ << "E: Value 'network.metrics.host' defined, " @@ -288,11 +367,11 @@ namespace jam::app { if (port.IsScalar()) { auto value = port.as(); if (value > 0 and value <= 65535) { - config_->metrics_endpoint_ = { - config_->metrics_endpoint_.address(), + config_->metrics_.endpoint = { + config_->metrics_.endpoint.address(), static_cast(value)}; - if (not config_->metrics_enabled_.has_value()) { - config_->metrics_enabled_ = true; + if (not config_->metrics_.enabled.has_value()) { + config_->metrics_.enabled = true; } } else { file_errors_ << "E: Value 'network.metrics.port' defined, " @@ -317,17 +396,17 @@ namespace jam::app { fail = false; find_argument( - cli_values_map_, "prometheus-host", [&](const std::string &value) { + cli_values_map_, "prometheus_host", [&](const std::string &value) { boost::beast::error_code ec; auto address = boost::asio::ip::address::from_string(value, ec); if (!ec) { - config_->metrics_endpoint_ = {address, - config_->metrics_endpoint_.port()}; - if (not config_->metrics_enabled_.has_value()) { - config_->metrics_enabled_ = true; + config_->metrics_.endpoint = {address, + config_->metrics_.endpoint.port()}; + if (not config_->metrics_.enabled.has_value()) { + config_->metrics_.enabled = true; } } else { - std::cerr << "Option --prometheus-host has invalid value\n" + std::cerr << "Option --prometheus_host has invalid value\n" << "Try run with option '--help' for more information\n"; fail = true; } @@ -338,15 +417,15 @@ namespace jam::app { fail = false; find_argument( - cli_values_map_, "prometheus-port", [&](const uint16_t &value) { + cli_values_map_, "prometheus_port", [&](const uint16_t &value) { if (value > 0 and value <= 65535) { - config_->metrics_endpoint_ = {config_->metrics_endpoint_.address(), + config_->metrics_.endpoint = {config_->metrics_.endpoint.address(), static_cast(value)}; - if (not config_->metrics_enabled_.has_value()) { - config_->metrics_enabled_ = true; + if (not config_->metrics_.enabled.has_value()) { + config_->metrics_.enabled = true; } } else { - std::cerr << "Option --prometheus-port has invalid value\n" + std::cerr << "Option --prometheus_port has invalid value\n" << "Try run with option '--help' for more information\n"; fail = true; } @@ -355,11 +434,11 @@ namespace jam::app { return Error::CliArgsParseFailed; } - if (find_argument(cli_values_map_, "prometheus-disabled")) { - config_->metrics_enabled_ = false; + if (find_argument(cli_values_map_, "prometheus_disabled")) { + config_->metrics_.enabled = false; }; - if (not config_->metrics_enabled_.has_value()) { - config_->metrics_enabled_ = false; + if (not config_->metrics_.enabled.has_value()) { + config_->metrics_.enabled = false; } return outcome::success(); diff --git a/src/app/configurator.hpp b/src/app/configurator.hpp index 7e76e2bc..e403bb58 100644 --- a/src/app/configurator.hpp +++ b/src/app/configurator.hpp @@ -5,11 +5,11 @@ #pragma once -#include - #include +#include #include #include +#include #include #include "injector/dont_inject.hpp" @@ -29,6 +29,7 @@ namespace jam::app { enum class Error : uint8_t { CliArgsParseFailed, ConfigFileParseFailed, + InvalidValue, }; DONT_INJECT(Configurator); @@ -51,7 +52,7 @@ namespace jam::app { outcome::result getLoggingConfig(); outcome::result> calculateConfig( - std::shared_ptr logger); + qtils::SharedRef logger); private: outcome::result initGeneralConfig(); @@ -62,6 +63,7 @@ namespace jam::app { const char **env_; std::shared_ptr config_; + std::shared_ptr logger_; std::optional config_file_; bool file_has_warn_ = false; diff --git a/src/app/impl/application_impl.cpp b/src/app/impl/application_impl.cpp index 34b6e1e5..d17649d9 100644 --- a/src/app/impl/application_impl.cpp +++ b/src/app/impl/application_impl.cpp @@ -27,12 +27,12 @@ namespace jam::app { } ApplicationImpl::ApplicationImpl( - std::shared_ptr logsys, - std::shared_ptr config, - std::shared_ptr state_manager, - std::shared_ptr watchdog, - std::shared_ptr metrics_exposer, - std::shared_ptr system_clock, + qtils::SharedRef logsys, + qtils::SharedRef config, + qtils::SharedRef state_manager, + qtils::SharedRef watchdog, + qtils::SharedRef metrics_exposer, + qtils::SharedRef system_clock, std::shared_ptr) : logger_(logsys->getLogger("Application", "application")), app_config_(std::move(config)), @@ -42,15 +42,13 @@ namespace jam::app { system_clock_(std::move(system_clock)), metrics_registry_(metrics::createRegistry()) { // Metric for exposing name and version of node - constexpr auto buildInfoMetricName = "jam_build_info"; - metrics_registry_->registerGaugeFamily( - buildInfoMetricName, - "A metric with a constant '1' value labeled by name, version"); - auto metric_build_info = metrics_registry_->registerGaugeMetric( - buildInfoMetricName, - {{"name", app_config_->nodeName()}, - {"version", app_config_->nodeVersion()}}); - metric_build_info->set(1); + metrics::GaugeHelper( + "jam_build_info", + "A metric with a constant '1' value labeled by name, version", + std::map{ + {"name", app_config_->nodeName()}, + {"version", app_config_->nodeVersion()}}) + ->set(1); } void ApplicationImpl::run() { diff --git a/src/app/impl/application_impl.hpp b/src/app/impl/application_impl.hpp index dab28749..73dcf5e2 100644 --- a/src/app/impl/application_impl.hpp +++ b/src/app/impl/application_impl.hpp @@ -8,6 +8,8 @@ #include +#include + #include #include "app/application.hpp" @@ -75,23 +77,23 @@ namespace jam::app { class ApplicationImpl final : public Application { public: - ApplicationImpl(std::shared_ptr logsys, - std::shared_ptr config, - std::shared_ptr state_manager, - std::shared_ptr watchdog, - std::shared_ptr metrics_exposer, - std::shared_ptr system_clock, + ApplicationImpl(qtils::SharedRef logsys, + qtils::SharedRef config, + qtils::SharedRef state_manager, + qtils::SharedRef watchdog, + qtils::SharedRef metrics_exposer, + qtils::SharedRef system_clock, std::shared_ptr); void run() override; private: - std::shared_ptr logger_; - std::shared_ptr app_config_; - std::shared_ptr state_manager_; - std::shared_ptr watchdog_; - std::shared_ptr metrics_exposer_; - std::shared_ptr system_clock_; + qtils::SharedRef logger_; + qtils::SharedRef app_config_; + qtils::SharedRef state_manager_; + qtils::SharedRef watchdog_; + qtils::SharedRef metrics_exposer_; + qtils::SharedRef system_clock_; // Metrics std::unique_ptr metrics_registry_; diff --git a/src/app/impl/state_manager_impl.cpp b/src/app/impl/state_manager_impl.cpp index f5e2b73e..2d3b4b97 100644 --- a/src/app/impl/state_manager_impl.cpp +++ b/src/app/impl/state_manager_impl.cpp @@ -92,7 +92,7 @@ namespace jam::app { } StateManagerImpl::StateManagerImpl( - std::shared_ptr logging_system) + qtils::SharedRef logging_system) : logger_(logging_system->getLogger("StateManager", "application")), logging_system_(std::move(logging_system)) { shuttingDownSignalsEnable(); diff --git a/src/app/impl/state_manager_impl.hpp b/src/app/impl/state_manager_impl.hpp index 1965e3f8..9f39388f 100644 --- a/src/app/impl/state_manager_impl.hpp +++ b/src/app/impl/state_manager_impl.hpp @@ -6,12 +6,14 @@ #pragma once +#include "app/state_manager.hpp" + #include -#include #include #include -#include "app/state_manager.hpp" +#include + #include "utils/ctor_limiters.hpp" namespace soralog { @@ -28,7 +30,7 @@ namespace jam::app { public StateManager, public std::enable_shared_from_this { public: - StateManagerImpl(std::shared_ptr logging_system); + StateManagerImpl(qtils::SharedRef logging_system); ~StateManagerImpl() override; @@ -65,8 +67,8 @@ namespace jam::app { void shutdownRequestWaiting(); - std::shared_ptr logger_; - std::shared_ptr logging_system_; + qtils::SharedRef logger_; + qtils::SharedRef logging_system_; std::atomic state_ = State::Init; diff --git a/src/crypto/bandersnatch.hpp b/src/crypto/bandersnatch.hpp index aa2ed006..bf80220c 100644 --- a/src/crypto/bandersnatch.hpp +++ b/src/crypto/bandersnatch.hpp @@ -10,13 +10,13 @@ #include #include -#include +#include namespace jam::crypto::bandersnatch { - using Output = qtils::BytesN; - using Public = qtils::BytesN; - using RingCommitment = qtils::BytesN; - using RingSignature = qtils::BytesN; + using Output = qtils::ByteArr; + using Public = qtils::ByteArr; + using RingCommitment = qtils::ByteArr; + using RingSignature = qtils::ByteArr; inline std::optional output(qtils::BytesIn signature) { Output output; diff --git a/src/crypto/blake.hpp b/src/crypto/blake.hpp index cd228697..1d9f2851 100644 --- a/src/crypto/blake.hpp +++ b/src/crypto/blake.hpp @@ -8,10 +8,11 @@ #include #include +#include namespace jam::crypto { struct Blake { - using Hash = qtils::BytesN<32>; + using Hash = qtils::ByteArr<32>; blake2b_state state; Blake() { blake2b_init(&state, sizeof(Hash)); diff --git a/src/crypto/ed25519.hpp b/src/crypto/ed25519.hpp index afa39d3e..d765335c 100644 --- a/src/crypto/ed25519.hpp +++ b/src/crypto/ed25519.hpp @@ -5,13 +5,13 @@ */ #include -#include +#include namespace jam::crypto::ed25519 { - using Secret = qtils::BytesN; - using Public = qtils::BytesN; - using KeyPair = qtils::BytesN; - using Signature = qtils::BytesN; + using Secret = qtils::ByteArr; + using Public = qtils::ByteArr; + using KeyPair = qtils::ByteArr; + using Signature = qtils::ByteArr; using Message = qtils::BytesIn; inline std::optional sign(const KeyPair &keypair, diff --git a/src/crypto/keccak.hpp b/src/crypto/keccak.hpp index f9ab2f57..5cf086a6 100644 --- a/src/crypto/keccak.hpp +++ b/src/crypto/keccak.hpp @@ -4,7 +4,7 @@ #pragma once -#include +#include /** * Keccak hash @@ -12,7 +12,7 @@ namespace jam::crypto { struct Keccak { - using Hash32 = qtils::BytesN<32>; + using Hash32 = qtils::ByteArr<32>; uint64_t state[5][5] = {}; size_t blockOff = 0; static constexpr size_t HASH_LEN = 32; diff --git a/src/executable/jam_node.cpp b/src/executable/jam_node.cpp index 23f06493..60023ee4 100644 --- a/src/executable/jam_node.cpp +++ b/src/executable/jam_node.cpp @@ -7,8 +7,6 @@ #include #include #include -#include -#include #include #include @@ -18,8 +16,8 @@ #include "app/configuration.hpp" #include "app/configurator.hpp" #include "injector/node_injector.hpp" -#include "loaders/impl/example_loader.hpp" #include "log/logger.hpp" +#include "loaders/loader.hpp" #include "modules/module_loader.hpp" #include "se/subscription.hpp" @@ -41,25 +39,48 @@ namespace { int run_node(std::shared_ptr logsys, std::shared_ptr appcfg) { auto injector = std::make_unique(logsys, appcfg); + // Load modules std::deque> loaders; { auto logger = logsys->getLogger("Modules", "jam"); - const std::string path("modules"); + const std::string path(appcfg->modulesDir()); jam::modules::ModuleLoader module_loader(path); - auto modules = module_loader.get_modules(); - if (modules.has_error()) { + auto modules_res = module_loader.get_modules(); + if (modules_res.has_error()) { SL_CRITICAL(logger, "Failed to load modules from path: {}", path); return EXIT_FAILURE; } + auto &modules = modules_res.value(); + + for (const auto &module : modules) { + SL_INFO(logger, + "Found module '{}', path: {}", + module->get_module_info(), + module->get_path()); - for (const auto &module : modules.value()) { auto loader = injector->register_loader(module); - if (loader) { - loaders.emplace_back(std::move(loader)); + + // Skip unsupported + if (not loader) { + SL_WARN(logger, + "Module '{}' has unsupported loader '{}'; Skipped", + module->get_module_info(), + module->get_loader_id()); + continue; } + + // Init module + SL_INFO(logger, + "Module '{}' loaded by '{}'", + module->get_module_info(), + module->get_loader_id()); + loaders.emplace_back(std::move(loader)); } + + // Notify about all modules are loaded + // se_manager->notify(jam::EventTypes::LoadingIsFinished); } auto logger = logsys->getLogger("Main", jam::log::defaultGroupName); @@ -134,7 +155,7 @@ int main(int argc, const char **argv, const char **env) { std::make_shared(std::move(logging_system)); }); - // Parse CLI args for help, version and config + // Parse remaining args if (auto res = app_configurator->step2(); res.has_value()) { if (res.value()) { return EXIT_SUCCESS; diff --git a/src/injector/node_injector.cpp b/src/injector/node_injector.cpp index 0989844c..213b2470 100644 --- a/src/injector/node_injector.cpp +++ b/src/injector/node_injector.cpp @@ -17,6 +17,8 @@ #include #include #include +#include +#include #include "app/configuration.hpp" #include "app/impl/application_impl.hpp" @@ -30,7 +32,7 @@ #include "metrics/impl/prometheus/handler_impl.hpp" #include "modules/module.hpp" #include "se/impl/async_dispatcher_impl.hpp" -#include "se/subscription_fwd.hpp" +#include "se/subscription.hpp" namespace { namespace di = boost::di; @@ -60,7 +62,7 @@ namespace { di::bind.to(logsys), di::bind.to(), di::bind.to(), - di::bind.to>(), + di::bind.to>(), di::bind.to([](const auto &injector) { return metrics::Exposer::Configuration{ {boost::asio::ip::address_v4::from_string("127.0.0.1"), 7777} @@ -116,27 +118,35 @@ namespace jam::injector { .template create>(); auto logger = logsys->getLogger("Modules", "jam"); + std::unique_ptr loader{}; + if ("ExampleLoader" == module->get_loader_id()) { - auto loader = - pimpl_->injector_ - .template create>(); - loader->start(module); - - if (auto info = loader->module_info()) { - SL_INFO(logger, "> Module: {} [{}]", *info, module->get_path()); - } else { - SL_ERROR(logger, - "> No module info for: {} [{}]", - module->get_loader_id(), - module->get_path()); - } - return std::unique_ptr(loader.release()); + loader = pimpl_->injector_ + .create>(); + } else if ("NetworkingLoader" == module->get_loader_id()) { + loader = pimpl_->injector_ + .create>(); + } else if ("SynchronizerLoader" == module->get_loader_id()) { + loader = pimpl_->injector_ + .create>(); + } else { + SL_CRITICAL(logger, + "> No loader found for: {} [{}]", + module->get_loader_id(), + module->get_path()); + return {}; } - SL_CRITICAL(logger, - "> No loader found for: {} [{}]", - module->get_loader_id(), - module->get_path()); - return {}; + loader->start(module); + + if (auto info = loader->module_info()) { + SL_INFO(logger, "> Module: {} [{}]", *info, module->get_path()); + } else { + SL_ERROR(logger, + "> No module info for: {} [{}]", + module->get_loader_id(), + module->get_path()); + } + return std::unique_ptr(loader.release()); } } // namespace jam::injector diff --git a/src/injector/node_injector.hpp b/src/injector/node_injector.hpp index 8538df0e..7fd7ed2e 100644 --- a/src/injector/node_injector.hpp +++ b/src/injector/node_injector.hpp @@ -21,7 +21,6 @@ namespace jam::app { namespace jam::loaders { class Loader; - class ExampleLoader; } // namespace jam::loaders namespace jam::modules { diff --git a/src/jam_types/types.tmp.hpp b/src/jam_types/types.tmp.hpp new file mode 100644 index 00000000..4c62c553 --- /dev/null +++ b/src/jam_types/types.tmp.hpp @@ -0,0 +1,86 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include + +namespace jam { + + // stub types. must be refactored in future + + struct Stub {}; + + // blockchain types + + using BlockNumber = uint32_t; + using BlockHash = test_vectors::HeaderHash; + + struct BlockIndex { + BlockNumber number; + BlockHash hash; + }; + + using Block = test_vectors::Block; + using BlockHeader = test_vectors::Header; + + // networking types + + using PeerId = qtils::Tagged; // STUB + + /// Direction, in which to retrieve ordered data + enum class Direction : uint8_t { + /// from child to parent + ASCENDING = 0, + /// from parent to canonical child + DESCENDING = 1 + }; + + /// Request for blocks to another peer + struct BlocksRequest { + /// start from this block + BlockIndex from{}; + /// sequence direction + Direction direction{}; + /// maximum number of blocks to return; an implementation defined maximum is + /// used when unspecified + std::optional max{}; + bool multiple_justifications = true; + }; + + struct BlockAnnounce { + BlockAnnounce(const BlockAnnounce &) = delete; + }; + +} // namespace jam + +SCALE_DEFINE_ENUM_VALUE_RANGE(jam, + Direction, + jam::Direction::ASCENDING, + jam::Direction::DESCENDING); + +template <> +struct fmt::formatter { + constexpr auto parse(format_parse_context &ctx) -> decltype(ctx.begin()) { + auto it = ctx.begin(), end = ctx.end(); + if (it != end && *it != '}') { + throw format_error("invalid format"); + } + return it; + } + + template + auto format(const jam::Stub &, FormatContext &ctx) const + -> decltype(ctx.out()) { + return fmt::format_to(ctx.out(), "stub"); + } +}; + +template +struct fmt::formatter> : formatter {}; + diff --git a/src/loaders/impl/example_loader.hpp b/src/loaders/impl/example_loader.hpp index 30855779..4d3b9a90 100644 --- a/src/loaders/impl/example_loader.hpp +++ b/src/loaders/impl/example_loader.hpp @@ -7,8 +7,8 @@ #pragma once #include -#include +#include #include #include "loaders/loader.hpp" @@ -22,46 +22,109 @@ namespace jam::loaders { : public std::enable_shared_from_this, public Loader, public modules::ExampleModuleLoader { - struct __T {}; - std::shared_ptr logsys_; + std::shared_ptr> on_init_complete_; - using InitCompleteSubscriber = BaseSubscriber<__T>; - std::shared_ptr on_init_complete_; + std::shared_ptr> on_loading_finished_; + + std::shared_ptr>> + on_request_; + + std::shared_ptr>> + on_response_; + + std::shared_ptr>> + on_notification_; public: - ExampleLoader(std::shared_ptr logsys, - std::shared_ptr se_manager) + ExampleLoader(qtils::SharedRef logsys, + qtils::SharedRef se_manager) : Loader(std::move(logsys), std::move(se_manager)) {} ExampleLoader(const ExampleLoader &) = delete; ExampleLoader &operator=(const ExampleLoader &) = delete; - ~ExampleLoader() = default; + ~ExampleLoader() override = default; - void start(std::shared_ptr module) { + void start(std::shared_ptr module) override { set_module(module); - auto function = + auto module_accessor = get_module() ->getFunctionFromLibrary< std::weak_ptr, - std::shared_ptr, + modules::ExampleModuleLoader &, std::shared_ptr>("query_module_instance"); - auto se_manager = get_se_manager(); - if (function) { - auto module_internal = (*function)(shared_from_this(), logsys_); - on_init_complete_ = se::SubscriberCreator<__T>::template create< - EventTypes::kOnTestOperationComplete>( - *se_manager, - SubscriptionEngineHandlers::kTest, - [module_internal](auto &) { - if (auto m = module_internal.lock()) { - m->on_loaded_success(); - } - }); - - se_manager->notify(jam::EventTypes::kOnTestOperationComplete); + if (not module_accessor) { + return; } + + auto module_internal = (*module_accessor)(*this, logsys_); + + on_init_complete_ = se::SubscriberCreator::template create< + EventTypes::ExampleModuleIsLoaded>( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal](auto &) { + if (auto m = module_internal.lock()) { + m->on_loaded_success(); + } + }); + + on_loading_finished_ = + se::SubscriberCreator::template create< + EventTypes::LoadingIsFinished>( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal](auto &) { + if (auto m = module_internal.lock()) { + m->on_loading_is_finished(); + } + }); + + on_request_ = se::SubscriberCreator>:: + template create( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal](auto &msg) { + if (auto m = module_internal.lock()) { + m->on_request(msg); + } + }); + + on_response_ = se::SubscriberCreator>:: + template create( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal](auto &msg) { + if (auto m = module_internal.lock()) { + m->on_response(msg); + } + }); + + on_notification_ = + se::SubscriberCreator>:: + template create( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal](auto &msg) { + if (auto m = module_internal.lock()) { + m->on_notify(msg); + } + }); + + se_manager_->notify(jam::EventTypes::ExampleModuleIsLoaded); + } + + void dispatch_request(std::shared_ptr s) override { + se_manager_->notify(jam::EventTypes::ExampleRequest, s); + } + + void dispatch_response(std::shared_ptr s) override { + se_manager_->notify(jam::EventTypes::ExampleResponse, s); + } + + void dispatch_notify(std::shared_ptr s) override { + se_manager_->notify(jam::EventTypes::ExampleNotification, s); } }; } // namespace jam::loaders diff --git a/src/loaders/impl/networking_loader.hpp b/src/loaders/impl/networking_loader.hpp new file mode 100644 index 00000000..abceee8a --- /dev/null +++ b/src/loaders/impl/networking_loader.hpp @@ -0,0 +1,128 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include + +#include "loaders/loader.hpp" +#include "log/logger.hpp" +#include "modules/networking/networking.hpp" +#include "modules/shared/networking_types.tmp.hpp" +#include "se/subscription.hpp" + +namespace jam::loaders { + + class NetworkingLoader final + : public std::enable_shared_from_this, + public Loader, + public modules::NetworkingLoader { + log::Logger logger_; + + std::shared_ptr> on_init_complete_; + + std::shared_ptr> on_loading_finished_; + + std::shared_ptr< + BaseSubscriber>> + on_block_request_; + + public: + NetworkingLoader(std::shared_ptr logsys, + std::shared_ptr se_manager) + : Loader(std::move(logsys), std::move(se_manager)), + logger_(logsys_->getLogger("Networking", "networking_module")) {} + + NetworkingLoader(const NetworkingLoader &) = delete; + NetworkingLoader &operator=(const NetworkingLoader &) = delete; + + ~NetworkingLoader() override = default; + + void start(std::shared_ptr module) override { + set_module(module); + auto module_accessor = + get_module() + ->getFunctionFromLibrary, + modules::NetworkingLoader &, + std::shared_ptr>( + "query_module_instance"); + + if (not module_accessor) { + return; + } + + auto module_internal = (*module_accessor)(*this, logsys_); + + on_init_complete_ = se::SubscriberCreator::template create< + EventTypes::NetworkingIsLoaded>( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal, this](auto &) { + if (auto m = module_internal.lock()) { + SL_TRACE(logger_, "Handle NetworkingIsLoaded"); + m->on_loaded_success(); + } + }); + + on_loading_finished_ = + se::SubscriberCreator::template create< + EventTypes::LoadingIsFinished>( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal, this](auto &) { + if (auto m = module_internal.lock()) { + SL_TRACE(logger_, "Handle LoadingIsFinished"); + m->on_loading_is_finished(); + } + }); + + on_block_request_ = se::SubscriberCreator< + qtils::Empty, + std::shared_ptr>:: + template create( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal, this](auto &, const auto &msg) { + if (auto m = module_internal.lock()) { + SL_TRACE( + logger_, "Handle BlockRequest; rid={}", msg->ctx.rid); + m->on_block_request(msg); + } + }); + + + se_manager_->notify(jam::EventTypes::NetworkingIsLoaded); + } + + void dispatch_peer_connected( + std::shared_ptr msg) override { + SL_TRACE(logger_, "Dispatch PeerConnected; peer={}", msg->peer); + se_manager_->notify(jam::EventTypes::PeerConnected, msg); + } + + void dispatch_peer_disconnected( + std::shared_ptr msg) override { + SL_TRACE(logger_, "Dispatch PeerDisconnected; peer={}", msg->peer); + se_manager_->notify(jam::EventTypes::PeerDisconnected, msg); + } + + void dispatch_block_announce( + std::shared_ptr msg) override { + SL_TRACE(logger_, "Dispatch BlockAnnounceReceived"); + se_manager_->notify(jam::EventTypes::BlockAnnounceReceived, msg); + } + + void dispatch_block_response( + std::shared_ptr msg) override { + SL_TRACE(logger_, "Dispatch BlockResponse; rid={}", msg->ctx.rid); + se_manager_->notify(jam::EventTypes::BlockResponse, std::move(msg)); + } + }; +} // namespace jam::loaders diff --git a/src/loaders/impl/synchronizer_loader.hpp b/src/loaders/impl/synchronizer_loader.hpp new file mode 100644 index 00000000..bc1eb2b5 --- /dev/null +++ b/src/loaders/impl/synchronizer_loader.hpp @@ -0,0 +1,114 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +#include +#include + +#include "loaders/loader.hpp" +#include "log/logger.hpp" +#include "modules/synchronizer/synchronizer.hpp" +#include "se/subscription.hpp" + +namespace jam::loaders { + + class SynchronizerLoader final + : public std::enable_shared_from_this, + public Loader, + public modules::SynchronizerLoader { + log::Logger logger_; + + using InitCompleteSubscriber = BaseSubscriber; + std::shared_ptr on_init_complete_; + + std::shared_ptr< + BaseSubscriber>> + on_block_announce_; + + std::shared_ptr< + BaseSubscriber>> + on_block_response_; + + public: + SynchronizerLoader(std::shared_ptr logsys, + std::shared_ptr se_manager) + : Loader(std::move(logsys), std::move(se_manager)), + logger_(logsys_->getLogger("Synchronizer", "synchronizer_module")) {} + + SynchronizerLoader(const SynchronizerLoader &) = delete; + SynchronizerLoader &operator=(const SynchronizerLoader &) = delete; + + ~SynchronizerLoader() override = default; + + void start(std::shared_ptr module) override { + set_module(module); + auto module_accessor = + get_module() + ->getFunctionFromLibrary< + std::weak_ptr, + modules::SynchronizerLoader &, + std::shared_ptr>("query_module_instance"); + + if (not module_accessor) { + return; + } + + auto module_internal = (*module_accessor)(*this, logsys_); + + on_init_complete_ = se::SubscriberCreator::template create< + EventTypes::SynchronizerIsLoaded>( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal, this](auto &) { + if (auto m = module_internal.lock()) { + SL_TRACE(logger_, "Handle SynchronizerIsLoaded"); + m->on_loaded_success(); + } + }); + + on_block_announce_ = se::SubscriberCreator< + qtils::Empty, + std::shared_ptr>:: + template create( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal, this](auto &, const auto &msg) { + if (auto m = module_internal.lock()) { + SL_TRACE(logger_, "Handle BlockAnnounceReceived"); + m->on_block_announce(msg); + } + }); + + on_block_response_ = se::SubscriberCreator< + qtils::Empty, + std::shared_ptr>:: + template create( + *se_manager_, + SubscriptionEngineHandlers::kTest, + [module_internal, this](auto &, const auto &msg) { + if (auto m = module_internal.lock()) { + SL_TRACE( + logger_, "Handle BlockResponse; rid={}", msg->ctx.rid); + m->on_block_response(msg); + } + }); + + se_manager_->notify(jam::EventTypes::SynchronizerIsLoaded); + } + + void dispatch_block_request( + std::shared_ptr msg) override { + SL_TRACE(logger_, "Dispatch BlockRequest; rid={}", msg->ctx.rid); + se_manager_->notify(jam::EventTypes::BlockRequest, msg); + } + }; + +} // namespace jam::loaders diff --git a/src/loaders/loader.hpp b/src/loaders/loader.hpp index 6b99e817..dd8aa9ba 100644 --- a/src/loaders/loader.hpp +++ b/src/loaders/loader.hpp @@ -10,6 +10,8 @@ #include #include +#include + #include "modules/module.hpp" #include "se/subscription_fwd.hpp" @@ -61,13 +63,11 @@ namespace jam::loaders { module_ = module; } - std::shared_ptr get_se_manager() { - return se_manager_; - } + protected: + qtils::SharedRef logsys_; + qtils::SharedRef se_manager_; private: std::shared_ptr module_; - std::shared_ptr logsys_; - std::shared_ptr se_manager_; }; } // namespace jam::loaders diff --git a/src/log/logger.hpp b/src/log/logger.hpp index c35952e4..ec618276 100644 --- a/src/log/logger.hpp +++ b/src/log/logger.hpp @@ -11,7 +11,7 @@ #include #include -#include +#include #include #include #include @@ -23,7 +23,7 @@ namespace jam::log { using soralog::Level; - using Logger = qtils::StrictSharedPtr; + using Logger = qtils::SharedRef; enum class Error : uint8_t { WRONG_LEVEL = 1, WRONG_GROUP, WRONG_LOGGER }; diff --git a/src/metrics/impl/exposer_impl.cpp b/src/metrics/impl/exposer_impl.cpp index 3a4ec7f8..26d07d93 100644 --- a/src/metrics/impl/exposer_impl.cpp +++ b/src/metrics/impl/exposer_impl.cpp @@ -27,18 +27,21 @@ namespace jam::metrics { context_{std::make_shared()}, config_{std::move(config)}, session_config_{session_config} { - auto registry = metrics::createRegistry(); - registry->setHandler(*handler.get()); - setHandler(handler); + if (config_->metrics().enabled == true) { + auto registry = metrics::createRegistry(); + registry->setHandler(*handler.get()); + setHandler(handler); - state_manager->takeControl(*this); + state_manager->takeControl(*this); + } } bool ExposerImpl::prepare() { + BOOST_ASSERT(config_->metrics().enabled == true); try { acceptor_ = jam::api::acceptOnFreePort( context_, - config_->metricsEndpoint().value_or(Endpoint{}), + config_->metrics().endpoint, jam::api::kDefaultPortTolerance, logger_); } catch (const boost::wrapexcept &exception) { @@ -61,6 +64,7 @@ namespace jam::metrics { } bool ExposerImpl::start() { + BOOST_ASSERT(config_->metrics().enabled == true); BOOST_ASSERT(acceptor_); if (!acceptor_->is_open()) { @@ -70,7 +74,7 @@ namespace jam::metrics { logger_->info( "Listening for new connections on {}:{}", - config_->metricsEndpoint().value_or(Endpoint{}).address().to_string(), + config_->metrics().endpoint.address().to_string(), acceptor_->local_endpoint().port()); acceptOnce(); diff --git a/src/modules/CMakeLists.txt b/src/modules/CMakeLists.txt index 59a0069b..8aef530b 100644 --- a/src/modules/CMakeLists.txt +++ b/src/modules/CMakeLists.txt @@ -90,4 +90,10 @@ target_link_libraries(modules # -------------- Modules -------------- # Example module -add_subdirectory(example) \ No newline at end of file +add_subdirectory(example) + +# Networking module +add_subdirectory(networking) + +# Blockchain synchronizer module +add_subdirectory(synchronizer) diff --git a/src/modules/example/example.cpp b/src/modules/example/example.cpp index 5eb1c505..9bc1306a 100644 --- a/src/modules/example/example.cpp +++ b/src/modules/example/example.cpp @@ -7,12 +7,31 @@ #include "modules/example/example.hpp" namespace jam::modules { - // ExampleModule::ExampleModule( - // qtils::StrictSharedPtr loader, - // qtils::StrictSharedPtr logging_system) - // : loader_(loader), - // logger_(logging_system->getLogger("ExampleModule", "example_module")) + ExampleModuleImpl::ExampleModuleImpl( + ExampleModuleLoader &loader, + qtils::SharedRef logging_system) + : loader_(loader), + logsys_(std::move(logging_system)), + logger_(logsys_->getLogger("ExampleModule", "example_module")) {} - // {} + void ExampleModuleImpl::on_loaded_success() { + SL_INFO(logger_, "Loaded success"); + } + + void ExampleModuleImpl::on_loading_is_finished() { + SL_INFO(logger_, "Loading is finished"); + } + + void ExampleModuleImpl::on_request(std::shared_ptr s) { + SL_INFO(logger_, "Received request: {}", *s); + } + + void ExampleModuleImpl::on_response(std::shared_ptr s) { + SL_INFO(logger_, "Received response: {}", *s); + } + + void ExampleModuleImpl::on_notify(std::shared_ptr s) { + SL_INFO(logger_, "Received notification: {}", *s); + } } // namespace jam::modules diff --git a/src/modules/example/example.hpp b/src/modules/example/example.hpp index ad5fe697..ab550c86 100644 --- a/src/modules/example/example.hpp +++ b/src/modules/example/example.hpp @@ -6,36 +6,30 @@ #pragma once -#include -#include -#include +#include +#include namespace jam::modules { - class ExampleModule; - struct ExampleModuleLoader { - virtual ~ExampleModuleLoader() = default; - }; - struct ExampleModule { - virtual ~ExampleModule() = default; - virtual void on_loaded_success() = 0; - }; -} // namespace jam::modules + class ExampleModuleImpl final : public jam::modules::ExampleModule { + jam::modules::ExampleModuleLoader &loader_; + qtils::SharedRef logsys_; + jam::log::Logger logger_; -// class BlockTree; + public: + ExampleModuleImpl(jam::modules::ExampleModuleLoader &loader, + qtils::SharedRef logsys); -namespace jam::modules { + void on_loaded_success() override; - // class ExampleModule : public Singleton { - // public: - // static std::shared_ptr instance; - // CREATE_SHARED_METHOD(ExampleModule); + void on_loading_is_finished() override; - // ExampleModule(qtils::StrictSharedPtr loader, - // qtils::StrictSharedPtr logging_system); + void on_request(std::shared_ptr s) override; + + void on_response(std::shared_ptr s) override; + + void on_notify(std::shared_ptr s) override; + }; - // qtils::StrictSharedPtr loader_; - // log::Logger logger_; - // }; } // namespace jam::modules diff --git a/src/modules/example/interfaces.hpp b/src/modules/example/interfaces.hpp new file mode 100644 index 00000000..518d662b --- /dev/null +++ b/src/modules/example/interfaces.hpp @@ -0,0 +1,31 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace jam::modules { + + struct ExampleModuleLoader { + virtual ~ExampleModuleLoader() = default; + + virtual void dispatch_request(std::shared_ptr) = 0; + virtual void dispatch_response(std::shared_ptr) = 0; + virtual void dispatch_notify(std::shared_ptr) = 0; + }; + + struct ExampleModule { + virtual ~ExampleModule() = default; + virtual void on_loaded_success() = 0; + virtual void on_loading_is_finished() = 0; + + virtual void on_request(std::shared_ptr) = 0; + virtual void on_response(std::shared_ptr) = 0; + virtual void on_notify(std::shared_ptr) = 0; + }; + +} // namespace jam::modules diff --git a/src/modules/example/module.cpp b/src/modules/example/module.cpp index 7120918f..df396fc0 100644 --- a/src/modules/example/module.cpp +++ b/src/modules/example/module.cpp @@ -17,33 +17,23 @@ MODULE_C_API const char *module_info() { return "ExampleModule v1.0"; } -class ExampleModuleImpl final : public jam::modules::ExampleModule { - std::shared_ptr loader_; - std::shared_ptr logger_; - - public: - ExampleModuleImpl(std::shared_ptr loader, - std::shared_ptr logger) - : loader_(std::move(loader)), logger_(std::move(logger)) {} - - void on_loaded_success() override { - auto l = logger_->getLogger("ExampleModule", "jam"); - SL_INFO(l, "Loaded success"); - } -}; -static std::shared_ptr exmpl_mod; +static std::shared_ptr module_instance; +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wreturn-type-c-linkage" MODULE_C_API std::weak_ptr query_module_instance( - std::shared_ptr loader, + jam::modules::ExampleModuleLoader& loader, std::shared_ptr logger) { - if (!exmpl_mod) { - exmpl_mod = std::make_shared(std::move(loader), - std::move(logger)); + if (!module_instance) { + module_instance = std::make_shared( + loader, std::move(logger)); } - return exmpl_mod; + return module_instance; } MODULE_C_API void release_module_instance() { - exmpl_mod.reset(); + module_instance.reset(); } + +#pragma GCC diagnostic pop diff --git a/src/modules/module.hpp b/src/modules/module.hpp index 496a4fe5..0e5b5d2e 100644 --- a/src/modules/module.hpp +++ b/src/modules/module.hpp @@ -13,26 +13,35 @@ #include #include +#include + namespace jam::modules { - class Module final : public std::enable_shared_from_this { + class Module final : public std::enable_shared_from_this, + NonCopyable { public: Module(Module &&) = default; // Static method for Module object creation static std::shared_ptr create( const std::string &path, + const std::string &module_info, std::unique_ptr handle, const std::string &loader_id) { return std::shared_ptr( - new Module(path, std::move(handle), loader_id)); + new Module(path, module_info, std::move(handle), loader_id)); } - // Getter for library path + // Getter for the library path const std::string &get_path() const { return path_; } + // Getter for module info + const std::string &get_module_info() const { + return module_info_; + } + // Getter for loader Id const std::string &get_loader_id() const { return loader_id_; @@ -51,16 +60,19 @@ namespace jam::modules { private: Module(const std::string &path, + const std::string &module_info, std::unique_ptr handle, const std::string &loader_id) - : path_(path), handle_(std::move(handle)), loader_id_(loader_id) {} + : path_(path), + module_info_(module_info), + handle_(std::move(handle)), + loader_id_(loader_id) {} std::string path_; // Library path + std::string module_info_; // Module Info std::unique_ptr handle_; // Library handle std::string loader_id_; // Loader ID - Module(const Module &) = delete; - Module &operator=(const Module &) = delete; Module &operator=(Module &&) = delete; }; diff --git a/src/modules/module_loader.cpp b/src/modules/module_loader.cpp index dbd7f3b6..35c6de70 100644 --- a/src/modules/module_loader.cpp +++ b/src/modules/module_loader.cpp @@ -19,6 +19,10 @@ OUTCOME_CPP_DEFINE_CATEGORY(jam::modules, ModuleLoader::Error, e) { return COMPONENT_NAME ": library doesn't provide loader_id function"; case E::UnexpectedLoaderId: return COMPONENT_NAME ": unexpected loader id"; + case E::NoModuleInfoExport: + return COMPONENT_NAME ": library doesn't provide module_info function"; + case E::UnexpectedModuleInfo: + return COMPONENT_NAME ": unexpected module info"; } return COMPONENT_NAME ": unknown error"; } @@ -71,7 +75,20 @@ namespace jam::modules { return Error::UnexpectedLoaderId; } - auto module = Module::create(module_path, std::move(handle), loader_id); + typedef const char *(*ModuleInfoFunc)(); + ModuleInfoFunc module_info_func = + (ModuleInfoFunc)dlsym(handle.get(), "module_info"); + + if (!loader_id_func) { + return Error::NoModuleInfoExport; + } + + const char *module_info = module_info_func(); + if (!module_info) { + return Error::UnexpectedModuleInfo; + } + + auto module = Module::create(module_path, module_info, std::move(handle), loader_id); modules.push_back(module); return outcome::success(); } diff --git a/src/modules/module_loader.hpp b/src/modules/module_loader.hpp index daae0d8e..a52bf88b 100644 --- a/src/modules/module_loader.hpp +++ b/src/modules/module_loader.hpp @@ -32,6 +32,8 @@ namespace jam::modules { OpenLibraryFailed, NoLoaderIdExport, UnexpectedLoaderId, + NoModuleInfoExport, + UnexpectedModuleInfo, }; explicit ModuleLoader(const std::string &dir_path) : dir_path_(dir_path) {} diff --git a/src/modules/networking/CMakeLists.txt b/src/modules/networking/CMakeLists.txt new file mode 100644 index 00000000..6f0993ac --- /dev/null +++ b/src/modules/networking/CMakeLists.txt @@ -0,0 +1,19 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_jam_module(networking + SOURCE + networking.cpp + INCLUDE_DIRS + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_BINARY_DIR}/generated + DEFINITIONS + SOME_FLAG=1 + LIBRARIES + qtils::qtils + soralog::soralog +) \ No newline at end of file diff --git a/src/modules/networking/interfaces.hpp b/src/modules/networking/interfaces.hpp new file mode 100644 index 00000000..6c292488 --- /dev/null +++ b/src/modules/networking/interfaces.hpp @@ -0,0 +1,40 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include + +namespace jam::modules { + + struct NetworkingLoader { + virtual ~NetworkingLoader() = default; + + virtual void dispatch_peer_connected( + std::shared_ptr msg) = 0; + + virtual void dispatch_peer_disconnected( + std::shared_ptr msg) = 0; + + virtual void dispatch_block_announce( + std::shared_ptr msg) = 0; + + virtual void dispatch_block_response( + std::shared_ptr msg) = 0; + }; + + struct Networking { + virtual ~Networking() = default; + + virtual void on_loaded_success() = 0; + + virtual void on_loading_is_finished() = 0; + + virtual void on_block_request( + std::shared_ptr msg) = 0; + }; + +} // namespace jam::modules \ No newline at end of file diff --git a/src/modules/networking/module.cpp b/src/modules/networking/module.cpp new file mode 100644 index 00000000..4ac11f20 --- /dev/null +++ b/src/modules/networking/module.cpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#define MODULE_C_API extern "C" __attribute__((visibility("default"))) +#define MODULE_API __attribute__((visibility("default"))) + +MODULE_C_API const char *loader_id() { + return "NetworkingLoader"; +} + +MODULE_C_API const char *module_info() { + return "Networking v0.0"; +} + +static std::shared_ptr module_instance; + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wreturn-type-c-linkage" + +MODULE_C_API std::weak_ptr query_module_instance( + jam::modules::NetworkingLoader &loader, + std::shared_ptr logsys) { + if (!module_instance) { + module_instance = std::make_shared( + loader, std::move(logsys)); + } + return module_instance; +} + +MODULE_C_API void release_module_instance() { + module_instance.reset(); +} + +#pragma GCC diagnostic pop diff --git a/src/modules/networking/networking.cpp b/src/modules/networking/networking.cpp new file mode 100644 index 00000000..d7418453 --- /dev/null +++ b/src/modules/networking/networking.cpp @@ -0,0 +1,41 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + + +#include "modules/networking/networking.hpp" + + +namespace jam::modules { + + NetworkingImpl::NetworkingImpl( + NetworkingLoader &loader, + qtils::SharedRef logging_system) + : loader_(loader), + logger_(logging_system->getLogger("Networking", "networking_module")) {} + + void NetworkingImpl::on_loaded_success() { + SL_INFO(logger_, "Loaded success"); + } + + void NetworkingImpl::on_loading_is_finished() { + SL_INFO(logger_, "Loading is finished"); + + // tmp entry point for experiments + auto x = std::make_shared(); + loader_.dispatch_block_announce(std::move(x)); + } + + void NetworkingImpl::on_block_request( + std::shared_ptr msg) { + SL_INFO(logger_, "Block requested"); + + // tmp entry point for experiments + auto x = std::make_shared( + messages::BlockResponseMessage{.ctx = msg->ctx, .result = Block{}}); + loader_.dispatch_block_response(std::move(x)); + }; + +} // namespace jam::modules diff --git a/src/modules/networking/networking.hpp b/src/modules/networking/networking.hpp new file mode 100644 index 00000000..5c5b1f0a --- /dev/null +++ b/src/modules/networking/networking.hpp @@ -0,0 +1,37 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace jam::modules { + + class NetworkingImpl final : public Singleton, public Networking { + public: + static std::shared_ptr instance; + CREATE_SHARED_METHOD(NetworkingImpl); + + NetworkingImpl(NetworkingLoader &loader, + qtils::SharedRef logging_system); + + void on_loaded_success() override; + + void on_loading_is_finished() override; + + void on_block_request( + std::shared_ptr msg) override; + + private: + NetworkingLoader &loader_; + log::Logger logger_; + }; + +} // namespace jam::modules diff --git a/src/modules/shared/networking_types.tmp.hpp b/src/modules/shared/networking_types.tmp.hpp new file mode 100644 index 00000000..703713df --- /dev/null +++ b/src/modules/shared/networking_types.tmp.hpp @@ -0,0 +1,47 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "jam_types/types.tmp.hpp" +#include "utils/request_id.hpp" + +namespace jam::messages { + + struct PeerConnectedMessage { + PeerId peer; + // address? + // initial view? + }; + + struct PeerDisconnectedMessage { + PeerId peer; + // reason? + }; + + struct BlockAnnounce { + BlockHeader header; + PeerId peer; + }; + + struct BlockAnnounceMessage { + BlockAnnounce header; + PeerId peer; + }; + + struct BlockRequestMessage { + RequestCxt ctx; + BlocksRequest request; + PeerId peer; + }; + + struct BlockResponseMessage { + RequestCxt ctx; + outcome::result result; + PeerId peer; + }; + +} \ No newline at end of file diff --git a/src/modules/shared/synchronizer_types.tmp.hpp b/src/modules/shared/synchronizer_types.tmp.hpp new file mode 100644 index 00000000..549d3348 --- /dev/null +++ b/src/modules/shared/synchronizer_types.tmp.hpp @@ -0,0 +1,18 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include "jam_types/types.tmp.hpp" + +namespace jam::messages { + + struct BlockDiscoveredMessage { + BlockIndex index; + PeerId peer; + }; + +} \ No newline at end of file diff --git a/src/modules/synchronizer/CMakeLists.txt b/src/modules/synchronizer/CMakeLists.txt new file mode 100644 index 00000000..11333fb7 --- /dev/null +++ b/src/modules/synchronizer/CMakeLists.txt @@ -0,0 +1,19 @@ +# +# Copyright Quadrivium LLC +# All Rights Reserved +# SPDX-License-Identifier: Apache-2.0 +# + +add_jam_module(synchronizer + SOURCE + synchronizer.cpp + INCLUDE_DIRS + ${CMAKE_SOURCE_DIR} + ${CMAKE_SOURCE_DIR}/src + ${CMAKE_BINARY_DIR}/generated + DEFINITIONS + SOME_FLAG=1 + LIBRARIES + qtils::qtils + soralog::soralog +) \ No newline at end of file diff --git a/src/modules/synchronizer/interfaces.hpp b/src/modules/synchronizer/interfaces.hpp new file mode 100644 index 00000000..0fe7baf6 --- /dev/null +++ b/src/modules/synchronizer/interfaces.hpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include + +namespace jam::modules { + + struct SynchronizerLoader { + virtual ~SynchronizerLoader() = default; + + virtual void dispatch_block_request( + std::shared_ptr msg) = 0; + }; + + struct Synchronizer { + virtual ~Synchronizer() = default; + virtual void on_loaded_success() = 0; + + /// New block discovered by block announce + /// Expected from a network subsystem + virtual void on_block_announce( + std::shared_ptr msg) = 0; + + /// New block discovered (i.e., by peer's state view update) + virtual void on_block_index_discovered( + std::shared_ptr msg) = 0; + + /// BlockResponse has received + virtual void on_block_response( + std::shared_ptr msg) = 0; + }; + +} // namespace jam::modules diff --git a/src/modules/synchronizer/module.cpp b/src/modules/synchronizer/module.cpp new file mode 100644 index 00000000..1859301d --- /dev/null +++ b/src/modules/synchronizer/module.cpp @@ -0,0 +1,39 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#define MODULE_C_API extern "C" __attribute__((visibility("default"))) +#define MODULE_API __attribute__((visibility("default"))) + +MODULE_C_API const char *loader_id() { + return "SynchronizerLoader"; +} + +MODULE_C_API const char *module_info() { + return "Synchronizer v0.0"; +} + +static std::shared_ptr module_instance; + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wreturn-type-c-linkage" + +MODULE_C_API std::weak_ptr query_module_instance( + jam::modules::SynchronizerLoader& loader, + std::shared_ptr logsys) { + if (!module_instance) { + module_instance = std::make_shared( + loader, std::move(logsys)); + } + return module_instance; +} + +MODULE_C_API void release_module_instance() { + module_instance.reset(); +} + +#pragma GCC diagnostic pop diff --git a/src/modules/synchronizer/synchronizer.cpp b/src/modules/synchronizer/synchronizer.cpp new file mode 100644 index 00000000..89e8c218 --- /dev/null +++ b/src/modules/synchronizer/synchronizer.cpp @@ -0,0 +1,61 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + + +#include "modules/synchronizer/synchronizer.hpp" + +#include "modules/shared/networking_types.tmp.hpp" +#include "modules/shared/synchronizer_types.tmp.hpp" + + +namespace jam::modules { + + SynchronizerImpl::SynchronizerImpl( + SynchronizerLoader &loader, + qtils::SharedRef logging_system) + : loader_(loader), + logger_( + logging_system->getLogger("Synchronizer", "synchronizer_module")) {} + + void SynchronizerImpl::on_loaded_success() { + SL_INFO(logger_, "Loaded success"); + } + + void SynchronizerImpl::on_block_index_discovered( + std::shared_ptr msg) { + SL_INFO(logger_, "Block discovered"); + }; + + void SynchronizerImpl::on_block_announce( + std::shared_ptr msg) { + SL_INFO(logger_, "Block announced"); + + // tmp + static const size_t s = reinterpret_cast(this); + static size_t n = 0; + auto x = std::make_shared( + messages::BlockRequestMessage{.ctx = {{s, ++n}}}); + + // block_response_callbacks_.emplace(x->ctx.rid, [&](auto& msg) { + // SL_INFO(logger_, "Block response has been handled; rid={}", + // msg->ctx.rid); + // }); + loader_.dispatch_block_request(std::move(x)); + }; + + void SynchronizerImpl::on_block_response( + std::shared_ptr msg) { + auto it = block_response_callbacks_.find(msg->ctx.rid); + if (it == block_response_callbacks_.end()) { + SL_TRACE(logger_, "Received a response to someone else's request"); + return; + } + + SL_INFO(logger_, "Block response is received; rid={}", msg->ctx.rid); + // it->second(msg); + } + +} // namespace jam::modules diff --git a/src/modules/synchronizer/synchronizer.hpp b/src/modules/synchronizer/synchronizer.hpp new file mode 100644 index 00000000..cc79629c --- /dev/null +++ b/src/modules/synchronizer/synchronizer.hpp @@ -0,0 +1,48 @@ +/** + * Copyright Quadrivium LLC + * All Rights Reserved + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace jam::modules { + + class SynchronizerImpl final : public Singleton, + public Synchronizer { + public: + static std::shared_ptr instance; + CREATE_SHARED_METHOD(SynchronizerImpl); + + SynchronizerImpl(SynchronizerLoader &loader, + qtils::SharedRef logging_system); + + void on_loaded_success() override; + + void on_block_index_discovered( + std::shared_ptr msg) override; + + void on_block_announce( + std::shared_ptr msg) override; + + void on_block_response( + std::shared_ptr msg) override; + + private: + SynchronizerLoader &loader_; + log::Logger logger_; + + std::unordered_map< + RequestId, + std::function msg)>> + block_response_callbacks_; + }; + +} // namespace jam::modules diff --git a/src/se/CMakeLists.txt b/src/se/CMakeLists.txt index fe524f69..0039e32a 100644 --- a/src/se/CMakeLists.txt +++ b/src/se/CMakeLists.txt @@ -6,16 +6,18 @@ add_library(se_async async_dispatcher.cpp - subscription.cpp - ) +) target_link_libraries(se_async - ) + logger + fmt::fmt +) add_library(se_sync sync_dispatcher.cpp - subscription.cpp - ) +) target_link_libraries(se_sync - ) + logger + fmt::fmt +) diff --git a/src/se/async_dispatcher.cpp b/src/se/async_dispatcher.cpp index a7426326..04d52089 100644 --- a/src/se/async_dispatcher.cpp +++ b/src/se/async_dispatcher.cpp @@ -12,9 +12,7 @@ namespace jam::se { std::shared_ptr getDispatcher() { - return std::make_shared< - AsyncDispatcher>(); + return std::make_shared>(); } } // namespace jam::se diff --git a/src/se/impl/async_dispatcher_impl.hpp b/src/se/impl/async_dispatcher_impl.hpp index b0283b6a..a7754de0 100644 --- a/src/se/impl/async_dispatcher_impl.hpp +++ b/src/se/impl/async_dispatcher_impl.hpp @@ -9,20 +9,13 @@ #include "common.hpp" #include "dispatcher.hpp" #include "thread_handler.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::se { template - class AsyncDispatcher final : public Dispatcher { + class AsyncDispatcher final : public Dispatcher, NonCopyable, NonMovable { public: - // Disable copying - AsyncDispatcher(const AsyncDispatcher &) = delete; - AsyncDispatcher &operator=(const AsyncDispatcher &) = delete; - - // Disable moving - AsyncDispatcher(AsyncDispatcher &&) = delete; - AsyncDispatcher &operator=(AsyncDispatcher &&) = delete; - static constexpr uint32_t kHandlersCount = kCount; static constexpr uint32_t kPoolThreadsCount = kPoolSize; diff --git a/src/se/impl/common.hpp b/src/se/impl/common.hpp index 0ec374d9..4dc0145a 100644 --- a/src/se/impl/common.hpp +++ b/src/se/impl/common.hpp @@ -11,6 +11,8 @@ #include #include +#include "utils/ctor_limiters.hpp" + namespace jam::se::utils { /** @@ -141,7 +143,7 @@ namespace jam::se::utils { * between threads. It's similar to a manual reset event, where one thread * can wait until another thread signals the event. */ - class WaitForSingleObject final { + class WaitForSingleObject final : NonCopyable, NonMovable { std::condition_variable wait_cv_; ///< Condition variable for waiting std::mutex wait_m_; ///< Mutex for synchronization bool flag_; ///< Flag that represents the state (true = not signaled, false @@ -153,12 +155,6 @@ namespace jam::se::utils { */ WaitForSingleObject() : flag_{true} {} - // Deleted copy and move operations to prevent improper synchronization - WaitForSingleObject(WaitForSingleObject &&) = delete; - WaitForSingleObject(const WaitForSingleObject &) = delete; - WaitForSingleObject &operator=(WaitForSingleObject &&) = delete; - WaitForSingleObject &operator=(const WaitForSingleObject &) = delete; - /** * @brief Waits for the object to be signaled with a timeout * diff --git a/src/se/impl/scheduler_impl.hpp b/src/se/impl/scheduler_impl.hpp index 9e8d6ed2..b02558be 100644 --- a/src/se/impl/scheduler_impl.hpp +++ b/src/se/impl/scheduler_impl.hpp @@ -19,10 +19,11 @@ #include "common.hpp" #include "scheduler.hpp" +#include "utils/ctor_limiters.hpp" namespace jam::se { - class SchedulerBase : public IScheduler { + class SchedulerBase : public IScheduler, NonCopyable, NonMovable { private: using Time = std::chrono::high_resolution_clock; using Timepoint = std::chrono::time_point