diff --git a/FWCore/Services/src/SiteLocalConfigService.cc b/FWCore/Services/src/SiteLocalConfigService.cc index 4cd5d619ec0de..9f5e22b0cb198 100644 --- a/FWCore/Services/src/SiteLocalConfigService.cc +++ b/FWCore/Services/src/SiteLocalConfigService.cc @@ -576,10 +576,12 @@ namespace edm { "Specify the file containing the site local config. Empty string will load from default directory."); desc.addOptionalUntracked("overrideSourceCacheTempDir"); desc.addOptionalUntracked("overrideSourceCacheMinFree"); - desc.addOptionalUntracked("overrideSourceCacheHintDir"); + desc.addOptionalUntracked("overrideSourceCacheHintDir") + ->setComment("Set cache hint. See AdaptorConfig plugin for valid values."); desc.addOptionalUntracked("overrideSourceCloneCacheHintDir") ->setComment("Provide an alternate cache hint for fast cloning."); - desc.addOptionalUntracked("overrideSourceReadHint"); + desc.addOptionalUntracked("overrideSourceReadHint") + ->setComment("Set read hint. See AdaptorConfig plugin for valid values."); desc.addOptionalUntracked >("overrideSourceNativeProtocols"); desc.addOptionalUntracked("overrideSourceTTreeCacheSize"); desc.addOptionalUntracked("overrideSourceTimeout"); diff --git a/IOPool/TFileAdaptor/src/TFileAdaptor.cc b/IOPool/TFileAdaptor/src/TFileAdaptor.cc index cba5b7a91287f..34e02eaeea11e 100644 --- a/IOPool/TFileAdaptor/src/TFileAdaptor.cc +++ b/IOPool/TFileAdaptor/src/TFileAdaptor.cc @@ -5,11 +5,14 @@ #include "FWCore/ParameterSet/interface/ConfigurationDescriptions.h" #include "FWCore/ParameterSet/interface/ParameterSet.h" #include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/ParameterSet/interface/PluginDescription.h" #include "FWCore/Reflection/interface/SetClassParsing.h" #include "FWCore/ServiceRegistry/interface/Service.h" #include "FWCore/Utilities/interface/EDMException.h" #include "Utilities/StorageFactory/interface/StorageAccount.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" +#include "Utilities/StorageFactory/interface/StorageProxyMaker.h" +#include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h" #include #include @@ -68,32 +71,26 @@ bool TFileAdaptor::native(char const* proto) const { } TFileAdaptor::TFileAdaptor(edm::ParameterSet const& pset, edm::ActivityRegistry& ar) - : enabled_(true), - doStats_(true), + : enabled_(pset.getUntrackedParameter("enable")), + doStats_(pset.getUntrackedParameter("stats")), enablePrefetching_(false), - cacheHint_("auto-detect"), - readHint_("auto-detect"), - tempDir_(), - minFree_(0), + // values set in the site local config or in SiteLocalConfigService override + // any values set here for this service. + // These parameters here are needed only for backward compatibility + // for WMDM tools until we switch to only using the site local config for this info. + cacheHint_(pset.getUntrackedParameter("cacheHint")), + readHint_(pset.getUntrackedParameter("readHint")), + tempDir_(pset.getUntrackedParameter("tempDir")), + minFree_(pset.getUntrackedParameter("tempMinFree")), + native_(pset.getUntrackedParameter>("native")), + // end of section of values overridden by SiteLocalConfigService timeout_(0U), - debugLevel_(0U), - native_() { - if (!(enabled_ = pset.getUntrackedParameter("enable", enabled_))) + debugLevel_(0U) { + if (not enabled_) return; using namespace edm::storage; StorageFactory* f = StorageFactory::getToModify(); - doStats_ = pset.getUntrackedParameter("stats", doStats_); - - // values set in the site local config or in SiteLocalConfigService override - // any values set here for this service. - // These parameters here are needed only for backward compatibility - // for WMDM tools until we switch to only using the site local config for this info. - cacheHint_ = pset.getUntrackedParameter("cacheHint", cacheHint_); - readHint_ = pset.getUntrackedParameter("readHint", readHint_); - tempDir_ = pset.getUntrackedParameter("tempDir", f->tempPath()); - minFree_ = pset.getUntrackedParameter("tempMinFree", f->tempMinFree()); - native_ = pset.getUntrackedParameter >("native", native_); ar.watchPostEndJob(this, &TFileAdaptor::termination); @@ -161,6 +158,15 @@ TFileAdaptor::TFileAdaptor(edm::ParameterSet const& pset, edm::ActivityRegistry& // tell where to save files. f->setTempDir(tempDir_, minFree_); + // forward generic storage proxy makers + { + std::vector> makers; + for (auto const& pset : pset.getUntrackedParameter>("storageProxies")) { + makers.push_back(StorageProxyMakerFactory::get()->create(pset.getUntrackedParameter("type"), pset)); + } + f->setStorageProxyMakers(std::move(makers)); + } + // set our own root plugins TPluginManager* mgr = gROOT->GetPluginManager(); @@ -203,15 +209,49 @@ TFileAdaptor::TFileAdaptor(edm::ParameterSet const& pset, edm::ActivityRegistry& } void TFileAdaptor::fillDescriptions(edm::ConfigurationDescriptions& descriptions) { + using namespace edm::storage; edm::ParameterSetDescription desc; - desc.addOptionalUntracked("enable"); - desc.addOptionalUntracked("stats"); - desc.addOptionalUntracked("cacheHint"); - desc.addOptionalUntracked("readHint"); - desc.addOptionalUntracked("tempDir"); - desc.addOptionalUntracked("tempMinFree"); - desc.addOptionalUntracked >("native"); + desc.addUntracked("enable", true)->setComment("Enable or disable TFileAdaptor behavior"); + desc.addUntracked("stats", true); + desc.addUntracked("cacheHint", "auto-detect") + ->setComment( + "Hint for read caching. Possible values: 'application-only', 'storage-only', 'lazy-download', 'auto-detect'. " + "The value from the SiteLocalConfigService overrides the value set here. In addition, if the " + "SiteLocalConfigService has prefetching enabled, the default hint is 'application-only'."); + desc.addUntracked("readHint", "auto-detect") + ->setComment( + "Hint for reading itself. Possible values: 'direct-unbuffered', 'read-ahead-buffered', 'auto-detect'. The " + "value from SiteLocalConfigService overrides the value set here."); + desc.addUntracked("tempDir", StorageFactory::defaultTempDir()) + ->setComment( + "Colon-separated list of directories that storage implementations downloading the full file could place the " + "file. The value from SiteLocalConfigService overrides the value set here."); + desc.addUntracked("tempMinFree", StorageFactory::defaultMinTempFree()) + ->setComment( + "Minimum amount of space in GB required for a temporary data directory specified in tempDir. The value from " + "SiteLocalConfigService overrides the value set here."); + desc.addUntracked>("native", {}) + ->setComment( + "Set of protocols for which to use a native ROOT storage implementation instead of CMSSW's StorageFactory. " + "Valid " + "values are 'file', 'http', 'ftp', 'dcache', 'dcap', 'gsidcap', 'root', or 'all' to prefer ROOT for all " + "protocols. The value from SiteLocalConfigService overrides the value set here."); + + edm::ParameterSetDescription proxyMakerDesc; + proxyMakerDesc.addNode(edm::PluginDescription("type", false)); + std::vector proxyMakerDefaults; + desc.addVPSetUntracked("storageProxies", proxyMakerDesc, proxyMakerDefaults) + ->setComment( + "Ordered list of Storage proxies the real Storage object is wrapped into. The real Storage is wrapped into " + "the first element of the list, then that proxy is wrapped into the second element of the list and so on. " + "Only after this wrapping are the LocalCacheFile (lazy-download) and statistics accounting ('stats' " + "parameter) proxies applied."); + descriptions.add("AdaptorConfig", desc); + descriptions.setComment( + "AdaptorConfig Service is used to configure the TFileAdaptor. If enabled, the TFileAdaptor registers " + "TStorageFactoryFile as a handler for various protocols. The StorageFactory facility provides custom storage " + "access implementations for these protocols, as well as statistics accounting."); } // Write current Storage statistics on a ostream diff --git a/IOPool/TFileAdaptor/src/TFileAdaptor.h b/IOPool/TFileAdaptor/src/TFileAdaptor.h index 99f53d4844ab8..cc9c66c60d577 100644 --- a/IOPool/TFileAdaptor/src/TFileAdaptor.h +++ b/IOPool/TFileAdaptor/src/TFileAdaptor.h @@ -44,9 +44,9 @@ class TFileAdaptor { std::string readHint_; std::string tempDir_; double minFree_; + std::vector native_; unsigned int timeout_; unsigned int debugLevel_; - std::vector native_; }; namespace edm { diff --git a/Utilities/DCacheAdaptor/plugins/DCacheStorageMaker.cc b/Utilities/DCacheAdaptor/plugins/DCacheStorageMaker.cc index 57dd180c5147e..d259e9a297233 100644 --- a/Utilities/DCacheAdaptor/plugins/DCacheStorageMaker.cc +++ b/Utilities/DCacheAdaptor/plugins/DCacheStorageMaker.cc @@ -41,8 +41,7 @@ namespace edm::storage { else mode |= IOFlags::OpenUnbuffered; - auto file = std::make_unique(normalise(proto, path), mode); - return f->wrapNonLocalFile(std::move(file), proto, std::string(), mode); + return std::make_unique(normalise(proto, path), mode); } void stagein(const std::string &proto, const std::string &path, const AuxSettings &aux) const override { @@ -77,6 +76,8 @@ namespace edm::storage { return true; } + UseLocalFile usesLocalFile() const override { return UseLocalFile::kNo; } + private: void setTimeout(unsigned int timeout) const { if (timeout != 0) diff --git a/Utilities/DavixAdaptor/plugins/DavixStorageMaker.cc b/Utilities/DavixAdaptor/plugins/DavixStorageMaker.cc index 67ce3ad0212c8..bd8f949ab4b47 100644 --- a/Utilities/DavixAdaptor/plugins/DavixStorageMaker.cc +++ b/Utilities/DavixAdaptor/plugins/DavixStorageMaker.cc @@ -15,10 +15,8 @@ namespace edm::storage { const std::string &path, int mode, AuxSettings const &aux) const override { - const StorageFactory *f = StorageFactory::get(); std::string newurl((proto == "web" ? "http" : proto) + ":" + path); - auto file = std::make_unique(newurl, mode); - return f->wrapNonLocalFile(std::move(file), proto, std::string(), mode); + return std::make_unique(newurl, mode); } bool check(const std::string &proto, @@ -43,6 +41,8 @@ namespace edm::storage { } return true; } + + UseLocalFile usesLocalFile() const override { return UseLocalFile::kNo; } }; } // namespace edm::storage diff --git a/Utilities/StorageFactory/BuildFile.xml b/Utilities/StorageFactory/BuildFile.xml index 310b419ddef13..81ff78ffde11d 100644 --- a/Utilities/StorageFactory/BuildFile.xml +++ b/Utilities/StorageFactory/BuildFile.xml @@ -1,3 +1,4 @@ + diff --git a/Utilities/StorageFactory/README.md b/Utilities/StorageFactory/README.md index f5fc0d107f866..80f47073ba3d3 100644 --- a/Utilities/StorageFactory/README.md +++ b/Utilities/StorageFactory/README.md @@ -11,7 +11,7 @@ Factory interface for constructing `edm::storage::Storage` instances. Also provi `StorageFactory` provides two implementations of `edm::storage::Storage` classes which can be used to wrap around any other `Storage` object. ### `edm::storage::LocalCacheFile` -Does memory mapped caching of the wrapped `Storage` object. This is only applied if `CACHE_HINT_LAZY_DOWNLOAD` is set for `cacheHint` or the protocol handling code explicit passes `IOFlags::OpenWrap` to `StorageFactory::wrapNonLocalFile`. The wrapping does not happen if the Storage is open for writing nor if the Storage is associated with a file on the local file system. +Does memory mapped caching of the wrapped `Storage` object. This is only applied if `CACHE_HINT_LAZY_DOWNLOAD` is set for `cacheHint` or the protocol handling code explicit passes `IOFlags::OpenWrap` to `StorageFactory::wrapNonLocalFile`. The wrapping does not happen if the Storage is open for writing nor if the Storage is associated with a file on the local file system. Note that files using the `file:` protocol _can_ end up using `LocalCacheFile` if the path is determined to be on a non-local file system. ### `edm::storage::StorageAccountProxy` This wraps the `Storage` object and provides per protocol accounting information (e.g. number of bytes read) to `edm::storage::StorageAccount`. This is only used if `StorageFactory::accounting()` returns `true`. @@ -27,16 +27,66 @@ A singleton used to aggragate statistics about all storage calls for each protoc ### `edm::storage::StorageAccount::StorageClassToken` Each protocol is associated to a token for quick lookup. + +## Generic storage proxies + +This facility resembles the `edm::storage::LocalCacheFile` and `edm::storage::StorageAccountProxy` in the way that `edm::storage::Storage` objects constructed by the concrete `edm::storage::StorageMaker` are wrapped into other `edm::storage::Storage` objects. + +The proxies are configured via `TFileAdaptor`'s `storageProxies` `VPSet` configuration parameter. The proxies are wrapped in the order they are specified in the `VPSet`, i.e. the first element wraps the concrete `edm::storage::Storage`, second element wraps the first element etc. The `edm::storage::StorageAccountProxy` and `edm::storage::LocalCacheFile` wrap the last storage proxy according to their usual behavior. + +Each concrete proxy comes with two classes, the proxy class itself (inheriting from the `edm::storage::StorageProxyBase`) and a maker class (inheriting from the `edm::storage::StorageProxyMaker`). This "factory of factories" pattern is used because a maker is created once per job (in `TFileAdaptor`), and the maker object is used to create a proxy object for each file. + +### Concrete proxy classes + +The convention is to use the proxy class name as the plugin name for the maker, as the proxy is really what the user would care for. The headings of the subsections correspond to the plugin names. + +#### `StorageTracerProxy` + +The `edm::storage::StorageTracerProxy` (and the corresponding `edm::storage::StorageTracerProxyMaker`) produces a text file with a trace of all IO operations at the `StorageFactory` level. The behavior of each concrete `Storage` object (such as further splitting of read requests in `XrdAdaptor`) is not captured in these tracers. The structure of the trace file is described in a preamble in the trace file. + +The plugin has a configuration parameter for a pattern for the trace files. The pattern must contain at least one `%I`. The maker has an atomic counter for the files, and all occurrences of `%I` are replaced with the value of that counter for the given file. + +There is an `edmStorageTracer.py` script for doing some analyses of the traces. + +The `StorageTracerProxy` also provides a way to correlate the trace entries with the rest of the framework via [MessageLogger](../../FWCore/MessageService/Readme.md) messages. These messages are issued with the DEBUG severity and `IOTrace` category. There are additional, higher-level messages as part of the `PoolSource`. To see these messages, compile the `Utilities/Storage` and `IOPool/Input` packages with `USER_CXXFLAGS="-DEDM_ML_DEBUG", and customize the MessageLogger configuration along +```py +process.MessageLogger.cerr.threshold = "DEBUG" +process.MessageLogger.debugModules = ["*"] +process.MessageLogger.IOTrace = dict() +``` + +#### `StorageAddLatencyProxy` + +The `edm::storage::StorageAddLatencyProxy` (and the corresponding `edm::storage::StorageAddLatencyProxyMaker`) can be used to add artifical latency to the IO operations. The plugin has configuration parameters for latencies of singular reads, vector reads, singular writes, and vector writes. + +If used together with `StorageTracerProxy` to e.g. simulate the behavior of high-latency storage systems with e.g. local files, the `storageProxies` `VPSet` should have `StorageAddLatencyProxy` first, followed by `StorageTracerProxy`. + +### Other components + +#### `edm::storage::StorageProxyBase` + +Inherits from `edm::storage::Storage` and is the base class for the proxy classes. + +#### `edm::storage::StorageProxyMaker` + +Base class for the proxy makers. + + ## Related classes in other packages ### TStorageFactoryFile Inherits from `TFile` but uses `edm::storage::Storage` instances when doing the actual read/write operations. The class explicitly uses `"tstoragefile"` when communicating with `edm::storage::StorageAccount`. -### TFileAdaptor -TFileAdaptor is a cmsRun Service. It explicitly registers the use of `TStorageFactoryFile` with ROOT's `TFile::Open` system. The parameters passed to `TFileAdaptor` are relayed to `edm::storage::StorageFactory` to setup the defaults for the job. +### `TFileAdaptor` -### CondorStatusService +`TFileAdaptor` is a cmsRun Service (with a plugin name of `AdaptorConfig`, see [IOPool/TFileAdaptor/README.md](../../IOPool/TFileAdaptor/README.md)). It explicitly registers the use of `TStorageFactoryFile` with ROOT's `TFile::Open` system. The parameters passed to `TFileAdaptor` are relayed to `edm::storage::StorageFactory` to setup the defaults for the job. + +### `CondorStatusService` Sends condor _Chirp_ messages periodically from cmsRun. These include the most recent aggregated `edm::storage::StorageAccount` information for all protocols being used except for the `"tstoragefile"` protocol. -### StatisticsSenderService +### `StatisticsSenderService` A cmsRun Service which sends out UDP packets about the state of the system. The information is sent when a primary file closes and includes the recent aggregated `edm::storage::StorageAccount` information for all protocols being used except for the `"tstoragefile"` protocol. + +### `XrdAdaptor` + +A `edm::storage::Storage` implementation for xrootd (see [Utilities/XrdAdaptor/README.md](../../Utilities/XrdAdaptor/README.md)). diff --git a/Utilities/StorageFactory/interface/StorageFactory.h b/Utilities/StorageFactory/interface/StorageFactory.h index 87604ce4cac12..aa51cd50194c2 100644 --- a/Utilities/StorageFactory/interface/StorageFactory.h +++ b/Utilities/StorageFactory/interface/StorageFactory.h @@ -5,12 +5,16 @@ #include "Utilities/StorageFactory/interface/LocalFileSystem.h" #include "Utilities/StorageFactory/interface/IOTypes.h" #include "Utilities/StorageFactory/interface/IOFlags.h" -#include + #include +#include +#include + #include "oneapi/tbb/concurrent_unordered_map.h" namespace edm::storage { class Storage; + class StorageProxyMaker; class StorageFactory { public: enum CacheHint { CACHE_HINT_APPLICATION, CACHE_HINT_STORAGE, CACHE_HINT_LAZY_DOWNLOAD, CACHE_HINT_AUTO_DETECT }; @@ -20,6 +24,10 @@ namespace edm::storage { static const StorageFactory *get(void); static StorageFactory *getToModify(void); + // in GB + static double defaultMinTempFree() { return 4.; } + static std::string defaultTempDir(); + ~StorageFactory(void); // implicit copy constructor @@ -45,15 +53,12 @@ namespace edm::storage { std::string tempPath(void) const; double tempMinFree(void) const; + void setStorageProxyMakers(std::vector> makers); + void stagein(const std::string &url) const; - std::unique_ptr open(const std::string &url, int mode = IOFlags::OpenRead) const; + std::unique_ptr open(const std::string &url, const int mode = IOFlags::OpenRead) const; bool check(const std::string &url, IOOffset *size = nullptr) const; - std::unique_ptr wrapNonLocalFile(std::unique_ptr s, - const std::string &proto, - const std::string &path, - int mode) const; - private: typedef oneapi::tbb::concurrent_unordered_map> MakerTable; @@ -61,6 +66,14 @@ namespace edm::storage { StorageMaker *getMaker(const std::string &proto) const; StorageMaker *getMaker(const std::string &url, std::string &protocol, std::string &rest) const; + // Returns + // - Storage 's' possibly wrapped in LocalCacheFile + // - bool telling if LocalCacheFile is used + std::tuple, bool> wrapNonLocalFile(std::unique_ptr s, + const std::string &proto, + const std::string &path, + const int mode) const; + mutable MakerTable m_makers; CacheHint m_cacheHint; ReadHint m_readHint; @@ -72,6 +85,7 @@ namespace edm::storage { unsigned int m_timeout; unsigned int m_debugLevel; LocalFileSystem m_lfs; + std::vector> m_storageProxyMakers_; static StorageFactory s_instance; }; } // namespace edm::storage diff --git a/Utilities/StorageFactory/interface/StorageMaker.h b/Utilities/StorageFactory/interface/StorageMaker.h index db16784c62351..a35f85120fdad 100644 --- a/Utilities/StorageFactory/interface/StorageMaker.h +++ b/Utilities/StorageFactory/interface/StorageMaker.h @@ -24,6 +24,8 @@ namespace edm::storage { } }; + enum class UseLocalFile { kYes, kCheckFromPath, kNo }; + StorageMaker() = default; virtual ~StorageMaker() = default; // implicit copy constructor @@ -38,6 +40,9 @@ namespace edm::storage { const std::string &path, const AuxSettings &aux, IOOffset *size = nullptr) const; + + // If the file being read in the end is a local file or not + virtual UseLocalFile usesLocalFile() const = 0; }; } // namespace edm::storage #endif // STORAGE_FACTORY_STORAGE_MAKER_H diff --git a/Utilities/StorageFactory/interface/StorageProxyMaker.h b/Utilities/StorageFactory/interface/StorageProxyMaker.h new file mode 100644 index 0000000000000..ee70158836b23 --- /dev/null +++ b/Utilities/StorageFactory/interface/StorageProxyMaker.h @@ -0,0 +1,20 @@ +#ifndef Utilities_StorageFactory_StorageProxyMaker_h +#define Utilities_StorageFactory_StorageProxyMaker_h + +#include +#include + +namespace edm::storage { + class Storage; + + // Base class for makers of generic Storage proxies + class StorageProxyMaker { + public: + StorageProxyMaker() = default; + virtual ~StorageProxyMaker(); + + virtual std::unique_ptr wrap(std::string const& url, std::unique_ptr storage) const = 0; + }; +} // namespace edm::storage + +#endif diff --git a/Utilities/StorageFactory/interface/StorageProxyMakerFactory.h b/Utilities/StorageFactory/interface/StorageProxyMakerFactory.h new file mode 100644 index 0000000000000..de9e6a6cc58e8 --- /dev/null +++ b/Utilities/StorageFactory/interface/StorageProxyMakerFactory.h @@ -0,0 +1,14 @@ +#ifndef Utilities_StorageFactory_StorageProxyMakerFactory_h +#define Utilities_StorageFactory_StorageProxyMakerFactory_h + +#include "FWCore/ParameterSet/interface/ParameterSetfwd.h" +#include "FWCore/PluginManager/interface/PluginFactory.h" + +#include + +namespace edm::storage { + class StorageProxyMaker; + using StorageProxyMakerFactory = edmplugin::PluginFactory; +}; // namespace edm::storage + +#endif diff --git a/Utilities/StorageFactory/plugins/BuildFile.xml b/Utilities/StorageFactory/plugins/BuildFile.xml index 6de5d7bb6ecb7..c8791c77d1445 100644 --- a/Utilities/StorageFactory/plugins/BuildFile.xml +++ b/Utilities/StorageFactory/plugins/BuildFile.xml @@ -3,3 +3,14 @@ + + + + + + + + + + + diff --git a/Utilities/StorageFactory/plugins/HttpStorageMaker.cc b/Utilities/StorageFactory/plugins/HttpStorageMaker.cc index 7cb52bfc68498..503372c04594f 100644 --- a/Utilities/StorageFactory/plugins/HttpStorageMaker.cc +++ b/Utilities/StorageFactory/plugins/HttpStorageMaker.cc @@ -18,6 +18,8 @@ namespace edm::storage { return RemoteFile::get(localfd, temp, (char **)curlopts, mode); } + + UseLocalFile usesLocalFile() const override { return UseLocalFile::kYes; } }; } // namespace edm::storage diff --git a/Utilities/StorageFactory/plugins/LocalStorageMaker.cc b/Utilities/StorageFactory/plugins/LocalStorageMaker.cc index 5df2105f989d7..b45ebf6ab7ffa 100644 --- a/Utilities/StorageFactory/plugins/LocalStorageMaker.cc +++ b/Utilities/StorageFactory/plugins/LocalStorageMaker.cc @@ -24,8 +24,7 @@ namespace edm::storage { else mode |= IOFlags::OpenUnbuffered; - auto file = std::make_unique(path, mode); - return f->wrapNonLocalFile(std::move(file), proto, path, mode); + return std::make_unique(path, mode); } bool check(const std::string & /*proto*/, @@ -41,6 +40,8 @@ namespace edm::storage { return true; } + + UseLocalFile usesLocalFile() const override { return UseLocalFile::kCheckFromPath; } }; } // namespace edm::storage diff --git a/Utilities/StorageFactory/plugins/StorageAddLatencyProxy.cc b/Utilities/StorageFactory/plugins/StorageAddLatencyProxy.cc new file mode 100644 index 0000000000000..9bf125644a278 --- /dev/null +++ b/Utilities/StorageFactory/plugins/StorageAddLatencyProxy.cc @@ -0,0 +1,130 @@ +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/transform.h" +#include "Utilities/StorageFactory/interface/Storage.h" +#include "Utilities/StorageFactory/interface/StorageProxyMaker.h" + +#include +#include +#include + +namespace edm::storage { + class StorageAddLatencyProxy : public Storage { + public: + struct LatencyConfig { + unsigned int read; + unsigned int readv; + unsigned int write; + unsigned int writev; + }; + + StorageAddLatencyProxy(LatencyConfig latency, std::unique_ptr storage) + : latency_(latency), baseStorage_(std::move(storage)) {} + + IOSize read(void* into, IOSize n) override { + auto const result = baseStorage_->read(into, n); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.read)); + return result; + } + + IOSize read(void* into, IOSize n, IOOffset pos) override { + auto const result = baseStorage_->read(into, n, pos); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.read)); + return result; + } + + IOSize readv(IOBuffer* into, IOSize n) override { + auto const result = baseStorage_->readv(into, n); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.readv)); + return result; + } + + IOSize readv(IOPosBuffer* into, IOSize n) override { + auto const result = baseStorage_->readv(into, n); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.readv)); + return result; + } + + IOSize write(const void* from, IOSize n) override { + auto const result = baseStorage_->write(from, n); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.write)); + return result; + } + + IOSize write(const void* from, IOSize n, IOOffset pos) override { + auto const result = baseStorage_->write(from, n, pos); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.write)); + return result; + } + + IOSize writev(const IOBuffer* from, IOSize n) override { + auto const result = baseStorage_->writev(from, n); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.writev)); + return result; + } + + IOSize writev(const IOPosBuffer* from, IOSize n) override { + auto const result = baseStorage_->writev(from, n); + std::this_thread::sleep_for(std::chrono::microseconds(latency_.writev)); + return result; + } + + IOOffset position(IOOffset offset, Relative whence) override { return baseStorage_->position(offset, whence); } + + void resize(IOOffset size) override { return baseStorage_->resize(size); } + + void flush() override { return baseStorage_->flush(); } + + void close() override { return baseStorage_->close(); } + + bool prefetch(const IOPosBuffer* what, IOSize n) override { return baseStorage_->prefetch(what, n); } + + private: + LatencyConfig latency_; + std::unique_ptr baseStorage_; + }; + + class StorageAddLatencyProxyMaker : public StorageProxyMaker { + public: + StorageAddLatencyProxyMaker(edm::ParameterSet const& pset) + : latency_{.read = pset.getUntrackedParameter("read"), + .readv = pset.getUntrackedParameter("readv"), + .write = pset.getUntrackedParameter("write"), + .writev = pset.getUntrackedParameter("writev")}, + exclude_(vector_transform(pset.getUntrackedParameter>("exclude"), + [](std::string const& pattern) { return std::regex(pattern); })) {} + + static void fillPSetDescription(edm::ParameterSetDescription& iDesc) { + iDesc.addUntracked("read", 0)->setComment( + "Add this many microseconds of latency to singular reads"); + iDesc.addUntracked("readv", 0)->setComment("Add this many microseconds of latency to vector reads"); + iDesc.addUntracked("write", 0) + ->setComment("Add this many microseconds of latency to singular writes"); + iDesc.addUntracked("writev", 0) + ->setComment("Add this many microseconds of latency to vector writes"); + iDesc.addUntracked>("exclude", {}) + ->setComment( + "Latency is not added to the operations on the files whose URLs have a part that matches to any of the " + "regexes in this parameter"); + } + + std::unique_ptr wrap(std::string const& url, std::unique_ptr storage) const override { + for (auto const& pattern : exclude_) { + if (std::regex_search(url, pattern)) { + return storage; + } + } + return std::make_unique(latency_, std::move(storage)); + } + + private: + StorageAddLatencyProxy::LatencyConfig const latency_; + std::vector const exclude_; + }; +} // namespace edm::storage + +#include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h" +#include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h" +DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory, + edm::storage::StorageAddLatencyProxyMaker, + "StorageAddLatencyProxy"); diff --git a/Utilities/StorageFactory/plugins/StorageTracerProxy.cc b/Utilities/StorageFactory/plugins/StorageTracerProxy.cc new file mode 100644 index 0000000000000..e7a74ddb91004 --- /dev/null +++ b/Utilities/StorageFactory/plugins/StorageTracerProxy.cc @@ -0,0 +1,261 @@ +#include "FWCore/Concurrency/interface/ThreadSafeOutputFileStream.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" +#include "FWCore/ParameterSet/interface/ParameterSet.h" +#include "FWCore/ParameterSet/interface/ParameterSetDescription.h" +#include "FWCore/Utilities/interface/EDMException.h" +#include "Utilities/StorageFactory/interface/Storage.h" +#include "Utilities/StorageFactory/interface/StorageProxyMaker.h" + +#include +#include +#include +#include +#include +#include + +#include +#include + +namespace edm::storage { + class StorageTracerProxy : public Storage { + static constexpr std::string_view kOpen = "o"; + static constexpr std::string_view kRead = "r"; + static constexpr std::string_view kReadv = "rv"; + static constexpr std::string_view kReadvElement = "rve"; + static constexpr std::string_view kWrite = "w"; + static constexpr std::string_view kWritev = "wv"; + static constexpr std::string_view kWritevElement = "wve"; + static constexpr std::string_view kPosition = "s"; + static constexpr std::string_view kPrefetch = "p"; + static constexpr std::string_view kPrefetchElement = "pe"; + static constexpr std::string_view kResize = "rsz"; + static constexpr std::string_view kFlush = "f"; + static constexpr std::string_view kClose = "c"; + + public: + StorageTracerProxy(unsigned id, + std::string const& tracefile, + std::string const& storageUrl, + std::unique_ptr storage) + : file_(tracefile), baseStorage_(std::move(storage)), traceId_(id) { + using namespace std::literals::string_literals; + file_.write( + "# Format\n"s + "# --------\n"s + "# prefixes\n"s + "# #: comment\n"s + + fmt::format("# {}: file open\n", kOpen) + fmt::format("# {}: singular read\n", kRead) + + fmt::format("# {}: vector read\n", kReadv) + + fmt::format("# {}: vector read element of the preceding '{}' line\n", kReadvElement, kReadv) + + fmt::format("# {}: singular write\n", kWrite) + fmt::format("# {}: vector write\n", kWritev) + + fmt::format("# {}: vector write element of the preceding '{}' line\n", kWritevElement, kWritev) + + fmt::format("# {}: position (seek)\n", kPosition) + fmt::format("# {}: prefetch\n", kPrefetch) + + fmt::format("# {}: prefetch element of the preceding '{}' line\n", kPrefetch, kPrefetchElement) + + fmt::format("# {}: resize\n", kResize) + fmt::format("# {}: flush\n", kFlush) + + fmt::format("# {}: close\n", kClose) + "# --------\n"s + "# line formats\n"s + + fmt::format("# {} \n", kOpen) + + fmt::format("# {} \n", kRead) + + fmt::format( + "# {} \n", + kReadv) + + fmt::format("# {} \n", kReadvElement) + + fmt::format("# {} \n", kWrite) + + fmt::format( + "# {} \n", + kWritev) + + fmt::format("# {} \n", kWritevElement) + + fmt::format("# {} \n", kPosition) + + fmt::format("# {} \n", + kPrefetch) + + fmt::format("# {} \n", kPrefetchElement) + + fmt::format("# {} \n", kResize) + + fmt::format("# {} \n", kFlush) + + fmt::format("# {} \n", kClose) + "# --------\n"s); + auto const entryId = idCounter_.fetch_add(1); + file_.write(fmt::format("{} {} {} {}\n", + kOpen, + entryId, + std::chrono::round(now().time_since_epoch()).count(), + storageUrl)); + LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, entryId); + } + + IOSize read(void* into, IOSize n) override { + auto const offset = baseStorage_->position(); + auto const [result, message] = operate([this, into, n]() { return baseStorage_->read(into, n); }); + file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, offset, n, result)); + return result; + } + + IOSize read(void* into, IOSize n, IOOffset pos) override { + auto const [result, message] = operate([this, into, n, pos]() { return baseStorage_->read(into, n, pos); }); + file_.write(fmt::format("{} {} {} {} {}\n", kRead, message, pos, n, result)); + return result; + } + + IOSize readv(IOBuffer* into, IOSize n) override { + auto offset = baseStorage_->position(); + auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); }); + std::string elements; + IOSize total = 0; + for (IOSize i = 0; i < n; ++i) { + elements += fmt::format("{} {} {} {}\n", kReadvElement, i, offset, into[i].size()); + total += into[i].size(); + offset += into[i].size(); + } + file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements); + return result; + } + + IOSize readv(IOPosBuffer* into, IOSize n) override { + auto const [result, message] = operate([this, into, n]() { return baseStorage_->readv(into, n); }); + std::string elements; + IOSize total = 0; + for (IOSize i = 0; i < n; ++i) { + elements += fmt::format("{} {} {} {}\n", kReadvElement, i, into[i].offset(), into[i].size()); + total += into[i].size(); + } + file_.write(fmt::format("{} {} {} {} {}\n", kReadv, message, total, result, n) + elements); + return result; + } + + IOSize write(const void* from, IOSize n) override { + auto const offset = baseStorage_->position(); + auto const [result, message] = operate([this, from, n]() { return baseStorage_->write(from, n); }); + file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, offset, n, result)); + return result; + } + + IOSize write(const void* from, IOSize n, IOOffset pos) override { + auto const [result, message] = operate([this, from, n, pos]() { return baseStorage_->write(from, n, pos); }); + file_.write(fmt::format("{} {} {} {} {}\n", kWrite, message, pos, n, result)); + return result; + } + + IOSize writev(const IOBuffer* from, IOSize n) override { + auto offset = baseStorage_->position(); + auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); }); + std::string elements; + IOSize total = 0; + for (IOSize i = 0; i < n; ++i) { + elements += fmt::format("{} {} {} {}\n", kWritevElement, i, offset, from[i].size()); + total += from[i].size(); + offset += from[i].size(); + } + file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements); + return result; + } + + IOSize writev(const IOPosBuffer* from, IOSize n) override { + auto const [result, message] = operate([this, from, n]() { return baseStorage_->writev(from, n); }); + std::string elements; + IOSize total = 0; + for (IOSize i = 0; i < n; ++i) { + elements += fmt::format("{} {} {} {}\n", kWritevElement, i, from[i].offset(), from[i].size()); + total += from[i].size(); + } + file_.write(fmt::format("{} {} {} {} {}\n", kWritev, message, total, result, n) + elements); + return result; + } + + IOOffset position(IOOffset offset, Relative whence) override { + auto const [result, message] = + operate([this, offset, whence]() { return baseStorage_->position(offset, whence); }); + file_.write(fmt::format("{} {} {} {}\n", kPosition, message, offset, static_cast(whence))); + return result; + } + + void resize(IOOffset size) override { + auto const message = operate([this, size]() { return baseStorage_->resize(size); }); + file_.write(fmt::format("{} {} {}\n", kResize, message, size)); + } + + void flush() override { + auto const message = operate([this]() { return baseStorage_->flush(); }); + file_.write(fmt::format("{} {}\n", kFlush, message)); + } + + void close() override { + auto const message = operate([this]() { return baseStorage_->close(); }); + file_.write(fmt::format("{} {}\n", kClose, message)); + } + + bool prefetch(const IOPosBuffer* what, IOSize n) override { + auto const [value, message] = operate([this, what, n]() { return baseStorage_->prefetch(what, n); }); + std::string elements; + IOSize total = 0; + for (IOSize i = 0; i < n; ++i) { + elements += fmt::format("{} {} {} {}\n", kPrefetchElement, i, what[i].offset(), what[i].size()); + total += what[i].size(); + } + file_.write(fmt::format("{} {} {} {} {}\n", kPrefetch, message, total, n, value) + elements); + return value; + } + + private: + template + auto operate(F&& func) -> std::tuple { + auto const id = idCounter_.fetch_add(1); + auto const begin = now(); + auto const result = func(); + auto const end = now(); + LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id); + return std::tuple(result, + fmt::format("{} {} {}", + id, + std::chrono::round(begin.time_since_epoch()).count(), + std::chrono::round(end - begin).count())); + } + + template + requires std::is_same_v, void> + auto operate(F&& func) -> std::string { + auto const id = idCounter_.fetch_add(1); + auto const begin = now(); + func(); + auto const end = now(); + LogTrace("IOTrace").format("IOTrace {} id {}", traceId_, id); + return fmt::format("{} {} {}", + id, + std::chrono::round(begin.time_since_epoch()).count(), + std::chrono::round(end - begin).count()); + } + + static std::chrono::time_point now() { return std::chrono::steady_clock::now(); } + + ThreadSafeOutputFileStream file_; + std::unique_ptr baseStorage_; + std::atomic idCounter_{0}; + unsigned int const traceId_; + }; + + class StorageTracerProxyMaker : public StorageProxyMaker { + public: + StorageTracerProxyMaker(edm::ParameterSet const& pset) + : filenamePattern_(pset.getUntrackedParameter("traceFilePattern")) { + if (filenamePattern_.find("%I") == std::string::npos) { + throw edm::Exception(edm::errors::Configuration) << "traceFilePattern did not contain '%I'"; + } + } + + static void fillPSetDescription(edm::ParameterSetDescription& iDesc) { + iDesc.addUntracked("traceFilePattern", "trace_%I.txt") + ->setComment( + "Pattern for the output trace file names. Must contain '%I' for the counter of different files."); + } + + std::unique_ptr wrap(std::string const& url, std::unique_ptr storage) const override { + auto value = fileCounter_.fetch_add(1); + std::string fname = filenamePattern_; + boost::replace_all(fname, "%I", std::to_string(value)); + return std::make_unique(value, fname, url, std::move(storage)); + } + + private: + mutable std::atomic fileCounter_{0}; + std::string const filenamePattern_; + }; +} // namespace edm::storage + +#include "FWCore/ParameterSet/interface/ValidatedPluginMacros.h" +#include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h" +DEFINE_EDM_VALIDATED_PLUGIN(edm::storage::StorageProxyMakerFactory, + edm::storage::StorageTracerProxyMaker, + "StorageTracerProxy"); diff --git a/Utilities/StorageFactory/scripts/edmStorageTrace.py b/Utilities/StorageFactory/scripts/edmStorageTrace.py new file mode 100755 index 0000000000000..cd0a61e5e90e6 --- /dev/null +++ b/Utilities/StorageFactory/scripts/edmStorageTrace.py @@ -0,0 +1,466 @@ +#!/usr/bin/env python3 + +import argparse +from collections import namedtuple + +class Entries: + common = ["id", "timestamp"] + Open = namedtuple("Open", common+["filename"]) + common.append("duration") + Read = namedtuple("Read", common + ["offset", "requested", "actual"]) + Readv = namedtuple("Readv", common + ["requested", "actual", "elements"]) + ReadvElement = namedtuple("ReadvElements", ["index", "offset", "requested"]) + Write = namedtuple("Write", common + ["offset", "requested", "actual"]) + Writev = namedtuple("Writev", common + ["requested", "actual", "elements"]) + WritevElement = namedtuple("WritevElements", ["index", "offset", "requested"]) + Position = namedtuple("Position", common + ["offset", "whence"]) + Prefetch = namedtuple("Prefech", common + ["requested", "elements", "supported"]) + PrefetchElement = namedtuple("PrefetchElements", ["index", "offset", "requested"]) + Resize = namedtuple("Resize", common+["size"]) + Flush = namedtuple("Flush", common) + Close = namedtuple("Close", common) + + nameToType = dict( + o = Open, + r = Read, + rv = Readv, + rve = ReadvElement, + w = Write, + wv = Writev, + wve = WritevElement, + s = Position, + p = Prefetch, + pe = PrefetchElement, + rsz = Resize, + f = Flush, + c = Close + ) + + @staticmethod + def make(name, args): + def convert(x): + try: + return int(x) + except ValueError: + if x == "true" or x == "false": + return bool(x) + return x + return Entries.nameToType.get(name)._make([convert(x) for x in args]) + + @staticmethod + def isVector(obj): + return isinstance(obj, Entries.Readv) or isinstance(obj, Entries.Writev) or isinstance(obj, Entries.Prefetch) + @staticmethod + def isVectorElement(obj): + return isinstance(obj, Entries.ReadvElement) or isinstance(obj, Entries.WritevElement) or isinstance(obj, Entries.PrefetchElement) + +Chunk = namedtuple("Chunk", ("begin", "end")) + +def readlog(f): + ret = [] + vectorOp = None + vectorOpElements = [] + for line in f: + line = line.strip().rstrip() + if len(line) == 0: + continue + if line[0] == "#": + continue + content = line.split(" ") + entry = Entries.make(content[0], content[1:]) + + # Collect elements of vector operations into the vector operation objects + if Entries.isVectorElement(entry): + if vectorOp is None: + raise Exception("vectorOp should not have been None") + vectorOpElements.append(entry) + continue + if vectorOp is not None: + if vectorOp.elements != len(vectorOpElements): + raise Exception(f"Vector operation {vectorOp} should have {vectorOp.elements} elements, but {len(vectorOpElements)} were found from the trace log") + ret.append(vectorOp._replace(elements = vectorOpElements)) + vectorOp = None + vectorOpElements = [] + if Entries.isVector(entry): + if vectorOp is not None: + raise Exception(f"vectorOp should have been None, was {vectorOp}") + if len(vectorOpElements) != 0: + raise Exception("vectorOpElements should have been empty") + vectorOp = entry + continue + + # Non-vector entries + ret.append(entry) + # last vector op, if there is one + if vectorOp is not None: + ret.append(vectorOp._replace(elements = vectorOpElements)) + + if len(ret) == 0: + raise Exception("Trace is empty") + if not isinstance(ret[-1], Entries.Close): + raise Exception(f"Last trace entry was {ret[-1].__class__.__name__} instead of Close, the trace is likely incomplete") + + return ret + +def format_bytes(num): + for unit in ["B", "kB", "MB", "GB"]: + if num < 1024.0: + return f"{num:3.2f} {unit}" + num /= 1024.0 + return f"{num:.2f} TB" + +def format_duration(num, unit): + if unit == "us": + units = ["us", "ms"] + elif unit == "ms": + units = ["ms"] + else: + raise Exception(f"Unknown time unit {unit}") + + for unit in units: + if num < 1000.0: + return f"{num:3.2f} {unit}" + num /= 1000.0 + + if num < 60.0: + return f"{num:.1f} s" + + minutes, seconds = divmod(num, 60) + if minutes < 60.0: + return f"{minutes:.0f} min {seconds:.1f} s ({num:.1f} s)" + hours, minutes = divmod(minutes, 60) + return f"{hours:.0f} h {minutes:.0f} min {seconds:.1f} s ({num:.1f} s)" + +#################### +# Read order analysis +#################### +def analyzeReadOrder(logEntries): + read_total = 0 + read_backward = 0 + readv_total = 0 + readv_backward = 0 + prevOffset = 0 + for entry in logEntries: + if isinstance(entry, Entries.Read): + read_total += 1 + if entry.offset < prevOffset: + read_backward += 1 + prevOffset = entry.offset + entry.requested + elif isinstance(entry, Entries.Readv): + readv_total += 1 + if entry.elements[0].offset < prevOffset: + readv_backward += 1 + prevOffset = entry.elements[-1].offset + entry.elements[-1].requested + print(f"Read order analysis") + if read_total > 0: + print(f"Singular reads") + print(f" All reads {read_total}") + print(f" Reads with smaller offset than previous {read_backward}") + print(f" Backward fraction {read_backward/float(read_total)*100} %") + if readv_total > 0: + print(f"Vector reads") + print(f" All reads {readv_total}") + print(f" Reads with smaller offset than previous {readv_backward}") + print(f" Backward fraction {readv_backward/float(readv_total)*100} %") + +#################### +# Read overlaps analysis +#################### +def processReadOverlaps(read_chunks): + """Takes a list of Chunks + + Returns an OverlapResult, that has the following members + - total_bytes: Total number of bytes read + - overlap_bytes: Number of bytes that whose reading could theoretically be avoided + - total_count: Number of singular reads plus number of vector read elements + - overlap_count: Number of reads that could theoretically be avoided + + The unique amount of bytes read can be obtained as total_bytes - overlap_bytes + + N reads that overlap with each other, total_count is increased by N, + and overlap_count by N-1. + """ + # smallest begin first, and among them, largest end first + read_chunks.sort(key=lambda x: (x.begin, -x.end)) + + read_total_bytes = 0 + read_unique_bytes = 0 + + prev = read_chunks[0] + read_total_bytes = prev.end-prev.begin + read_total_count = 1 + + read_unique_bytes = 0 + read_overlap_count = 0 + + for chunk in read_chunks[1:]: + read_total_bytes += chunk.end-chunk.begin + read_total_count += 1 + if chunk.begin >= prev.end: + read_unique_bytes += prev.end-prev.begin + prev = chunk + else: + read_overlap_count += 1 + if chunk.end > prev.end: + prev = Chunk(prev.begin, chunk.end) + read_unique_bytes += prev.end-prev.begin + + OverlapResult = namedtuple("OverlapResult", ("total_bytes", "overlap_bytes", "total_count", "overlap_count")) + return OverlapResult(read_total_bytes, read_total_bytes-read_unique_bytes, read_total_count, read_overlap_count) + +def analyzeReadOverlaps(logEntries, args): + read_chunks = [] + for entry in logEntries: + if isinstance(entry, Entries.Read): + read_chunks.append(Chunk(entry.offset, entry.offset+entry.requested)) + elif isinstance(entry, Entries.Readv): + for element in entry.elements: + read_chunks.append(Chunk(element.offset, element.offset+element.requested)) + + result = processReadOverlaps(read_chunks) + print(f"Analysis of overlapping reads") + print(f"Total") + print(f" Number of reads (singular or vector elements) {result.total_count}") + print(f" Bytes read {format_bytes(result.total_bytes)}") + print(f"Overlaps") + print(f" Number of reads that could overlapped with another read {result.overlap_count}") + print(f" Fraction of all reads {result.overlap_count/float(result.total_count)*100} %") + print(f" Bytes read that had been already read {format_bytes(result.overlap_bytes)}") + print(f" Fraction of all bytes {result.overlap_bytes/float(result.total_bytes)*100} %") + +#################### +# Summary +#################### +class Counter(object): + def __init__(self, typ): + self._type = typ + self.count = 0 + self.duration = 0 + self.requested = 0 + self.actual = 0 + self.elements = 0 + + def type(self): + return self._type + + def accumulate(self, entry): + if isinstance(entry, self._type): + self.count += 1 + for f in ["duration", "requested", "actual"]: + if hasattr(entry, f): + setattr(self, f, getattr(self, f) + getattr(entry, f)) + if hasattr(entry, "elements"): + self.elements += len(entry.elements) + +def print_summary(header, counter): + if counter.count == 0: + return + + print(header) + print(f" Number {counter.count}") + if hasattr(counter.type(), "elements"): + print(f" Elements {counter.elements}") + if hasattr(counter.type(), "requested"): + print(f" Requested {format_bytes(counter.requested)}") + if hasattr(counter.type(), "actual"): + print(f" Actual {format_bytes(counter.actual)}") + + print(f" Duration {format_duration(counter.duration, 'us')}") + + if hasattr(counter.type(), "requested"): + print(f" Average size {format_bytes(counter.requested/float(counter.count))}") + print(f" Average bandwidth {format_bytes(counter.requested/(counter.duration/float(1000000)))}/s") + + print(f" Average latency {format_duration(counter.duration/float(counter.count), 'us')}") + + +def summary(logEntries): + quantities = [ + ("Singular reads", Counter(Entries.Read)), + ("Vector reads", Counter(Entries.Readv)), + ("Singular writes", Counter(Entries.Write)), + ("Vector writes", Counter(Entries.Writev)), + ("Seeks", Counter(Entries.Position)), + ("Prefetches", Counter(Entries.Prefetch)), + ("Flushes", Counter(Entries.Flush)) + ] + for entry in logEntries: + if isinstance(entry, Entries.Open): + print(f"Summary for file {entry.filename}") + + for h, q in quantities: + q.accumulate(entry) + + for h, q in quantities: + print_summary(h, q) + +#################### +# Main function +#################### +def main(logEntries, args): + if args.summary: + summary(logEntries) + print() + if args.readOrder: + analyzeReadOrder(logEntries) + print() + if args.readOverlaps: + analyzeReadOverlaps(logEntries, args) + print() + pass + +#################### +# Unit tests +#################### +import unittest +class TestHelper(unittest.TestCase): + def test_format_bytes(self): + self.assertEqual(format_bytes(10), "10.00 B") + self.assertEqual(format_bytes(1023), "1023.00 B") + self.assertEqual(format_bytes(1024), "1.00 kB") + self.assertEqual(format_bytes(1024*1023), "1023.00 kB") + self.assertEqual(format_bytes(1024*1024), "1.00 MB") + self.assertEqual(format_bytes(1024*1024*1023), "1023.00 MB") + self.assertEqual(format_bytes(1024*1024*1024), "1.00 GB") + + def test_format_duration(self): + self.assertEqual(format_duration(10, "us"), "10.00 us") + self.assertEqual(format_duration(999, "us"), "999.00 us") + self.assertEqual(format_duration(1000, "us"), "1.00 ms") + self.assertEqual(format_duration(1, "ms"), "1.00 ms") + self.assertEqual(format_duration(999, "ms"), "999.00 ms") + self.assertEqual(format_duration(1000, "ms"), "1.0 s") + self.assertEqual(format_duration(59*1000, "ms"), "59.0 s") + self.assertEqual(format_duration(60*1000, "ms"), "1 min 0.0 s (60.0 s)") + self.assertEqual(format_duration(59*60*1000, "ms"), "59 min 0.0 s (3540.0 s)") + self.assertEqual(format_duration(60*60*1000, "ms"), "1 h 0 min 0.0 s (3600.0 s)") + self.assertEqual(format_duration(90*60*1000, "ms"), "1 h 30 min 0.0 s (5400.0 s)") + self.assertEqual(format_duration(90*60*1000+345, "ms"), "1 h 30 min 0.3 s (5400.3 s)") + + def test_processReadOverlaps(self): + chunks = [ + Chunk(0, 5), + Chunk(5, 10), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 10) + self.assertEqual(result.overlap_bytes, 0) + self.assertEqual(result.total_count, 2) + self.assertEqual(result.overlap_count, 0) + + chunks = [ + Chunk(0, 10), + Chunk(5, 10), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 15) + self.assertEqual(result.overlap_bytes, 5) + self.assertEqual(result.total_count, 2) + self.assertEqual(result.overlap_count, 1) + + chunks = [ + Chunk(0, 10), + Chunk(5, 10), + Chunk(0, 5), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 20) + self.assertEqual(result.overlap_bytes, 10) + self.assertEqual(result.total_count, 3) + self.assertEqual(result.overlap_count, 2) + + chunks = [ + Chunk(0, 10), + Chunk(5, 15), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 20) + self.assertEqual(result.overlap_bytes, 5) + self.assertEqual(result.total_count, 2) + self.assertEqual(result.overlap_count, 1) + + chunks = [ + Chunk(0, 5), + Chunk(2, 10), + Chunk(9, 12), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 16) + self.assertEqual(result.overlap_bytes, 4) + self.assertEqual(result.total_count, 3) + self.assertEqual(result.overlap_count, 2) + + chunks = [ + Chunk(0, 5), + Chunk(2, 10), + Chunk(9, 12), + Chunk(5, 7), + Chunk(12, 13), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 19) + self.assertEqual(result.overlap_bytes, 6) + self.assertEqual(result.total_count, 5) + self.assertEqual(result.overlap_count, 3) + + chunks = [ + Chunk(2, 4), + Chunk(6, 8), + Chunk(10, 12), + Chunk(0, 20), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 26) + self.assertEqual(result.overlap_bytes, 6) + self.assertEqual(result.total_count, 4) + self.assertEqual(result.overlap_count, 3) + + chunks = [ + Chunk(0, 20), + Chunk(19, 21), + Chunk(20, 25), + ] + result = processReadOverlaps(chunks) + self.assertEqual(result.total_bytes, 27) + self.assertEqual(result.overlap_bytes, 2) + self.assertEqual(result.total_count, 3) + # Value 2 here is debatable + self.assertEqual(result.overlap_count, 2) + +def test(): + import sys + unittest.main(argv=sys.argv[:1]) + +#################### +# Command line arguments +#################### +def printHelp(): + return """The storage traces can be obtained by adding StorageTracerProxy to +the storage proxies of the AdaptorConfig Service, for example as +---- +process.add_(cms.Service("AdaptorConfig", + storageProxies = cms.untracked.VPSet( + cms.PSet(type = cms.untracked.string("StorageTracerProxy")) +)) +---- +""" + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Analyze storage trace", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=printHelp()) + parser.add_argument("filename", type=str, nargs="?", default=None, help="file to process") + parser.add_argument("--summary", action="store_true", help="Print high-level summary of storage operations") + parser.add_argument("--readOrder", action="store_true", help="Analyze ordering of reads") + parser.add_argument("--readOverlaps", action="store_true", help="Analyze overlaps of reads") + parser.add_argument("--test", action="store_true", help="Run internal tests") + + args = parser.parse_args() + if args.test: + test() + else: + if args.filename is None: + parser.error("filename argument is missing") + with open(args.filename) as f: + logEntries = readlog(f) + main(logEntries, args) diff --git a/Utilities/StorageFactory/src/StorageFactory.cc b/Utilities/StorageFactory/src/StorageFactory.cc index 0b6d7666e624d..5889e4fe05c24 100644 --- a/Utilities/StorageFactory/src/StorageFactory.cc +++ b/Utilities/StorageFactory/src/StorageFactory.cc @@ -4,6 +4,7 @@ #include "Utilities/StorageFactory/interface/StorageMakerFactory.h" #include "Utilities/StorageFactory/interface/StorageAccount.h" #include "Utilities/StorageFactory/interface/StorageAccountProxy.h" +#include "Utilities/StorageFactory/interface/StorageProxyMaker.h" #include "Utilities/StorageFactory/interface/LocalCacheFile.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/PluginManager/interface/PluginManager.h" @@ -18,8 +19,8 @@ StorageFactory::StorageFactory(void) : m_cacheHint(CACHE_HINT_AUTO_DETECT), m_readHint(READ_HINT_AUTO), m_accounting(false), - m_tempfree(4.), // GB - m_temppath(".:$TMPDIR"), + m_tempfree(defaultMinTempFree()), + m_temppath(defaultTempDir()), m_timeout(0U), m_debugLevel(0U) { setTempDir(m_temppath, m_tempfree); @@ -31,6 +32,8 @@ const StorageFactory *StorageFactory::get(void) { return &s_instance; } StorageFactory *StorageFactory::getToModify(void) { return &s_instance; } +std::string StorageFactory::defaultTempDir() { return ".:$TMPDIR"; } + bool StorageFactory::enableAccounting(bool enabled) { bool old = m_accounting; m_accounting = enabled; @@ -95,6 +98,10 @@ std::string StorageFactory::tempPath(void) const { return m_temppath; } double StorageFactory::tempMinFree(void) const { return m_tempfree; } +void StorageFactory::setStorageProxyMakers(std::vector> makers) { + m_storageProxyMakers_ = std::move(makers); +} + StorageMaker *StorageFactory::getMaker(const std::string &proto) const { auto itFound = m_makers.find(proto); if (itFound != m_makers.end()) { @@ -123,7 +130,7 @@ StorageMaker *StorageFactory::getMaker(const std::string &url, std::string &prot return getMaker(protocol); } -std::unique_ptr StorageFactory::open(const std::string &url, int mode /* = IOFlags::OpenRead */) const { +std::unique_ptr StorageFactory::open(const std::string &url, const int mode /* = IOFlags::OpenRead */) const { std::string protocol; std::string rest; std::unique_ptr ret; @@ -134,15 +141,31 @@ std::unique_ptr StorageFactory::open(const std::string &url, int mode / stats = std::make_unique(StorageAccount::counter(token, StorageAccount::Operation::open)); } try { - if (auto storage = maker->open( - protocol, rest, mode, StorageMaker::AuxSettings{}.setDebugLevel(m_debugLevel).setTimeout(m_timeout))) { - if (dynamic_cast(storage.get())) - protocol = "local-cache"; + ret = maker->open( + protocol, rest, mode, StorageMaker::AuxSettings{}.setDebugLevel(m_debugLevel).setTimeout(m_timeout)); + if (ret) { + // Inject proxy wrappers at the lowest level, in the order + // specified in the configuration + for (auto const &proxyMaker : m_storageProxyMakers_) { + ret = proxyMaker->wrap(url, std::move(ret)); + } + + // Wrap the storage to LocalCacheFile if storage backend is + // not already using a local file, and lazy-download is requested + if (auto const useLocalFile = maker->usesLocalFile(); useLocalFile != StorageMaker::UseLocalFile::kNo) { + bool useCacheFile; + std::string path; + if (useLocalFile == StorageMaker::UseLocalFile::kCheckFromPath) { + path = rest; + } + std::tie(ret, useCacheFile) = wrapNonLocalFile(std::move(ret), protocol, path, mode); + if (useCacheFile) { + protocol = "local-cache"; + } + } if (m_accounting) - ret = std::make_unique(protocol, std::move(storage)); - else - ret = std::move(storage); + ret = std::make_unique(protocol, std::move(ret)); if (stats) stats->tick(); @@ -206,11 +229,12 @@ bool StorageFactory::check(const std::string &url, IOOffset *size /* = 0 */) con return ret; } -std::unique_ptr StorageFactory::wrapNonLocalFile(std::unique_ptr s, - const std::string &proto, - const std::string &path, - int mode) const { +std::tuple, bool> StorageFactory::wrapNonLocalFile(std::unique_ptr s, + const std::string &proto, + const std::string &path, + const int mode) const { StorageFactory::CacheHint hint = cacheHint(); + bool useCacheFile = false; if ((hint == StorageFactory::CACHE_HINT_LAZY_DOWNLOAD) || (mode & IOFlags::OpenWrap)) { if (mode & IOFlags::OpenWrite) { // For now, issue no warning - otherwise, we'd always warn on output files. @@ -223,8 +247,9 @@ std::unique_ptr StorageFactory::wrapNonLocalFile(std::unique_ptr(proto, std::move(s)); } s = std::make_unique(std::move(s), m_tempdir); + useCacheFile = true; } } - return s; + return {std::move(s), useCacheFile}; } diff --git a/Utilities/StorageFactory/src/StorageProxyMaker.cc b/Utilities/StorageFactory/src/StorageProxyMaker.cc new file mode 100644 index 0000000000000..966727f04bd1f --- /dev/null +++ b/Utilities/StorageFactory/src/StorageProxyMaker.cc @@ -0,0 +1,5 @@ +#include "Utilities/StorageFactory/interface/StorageProxyMaker.h" + +namespace edm::storage { + StorageProxyMaker::~StorageProxyMaker() = default; +} diff --git a/Utilities/StorageFactory/src/StorageProxyMakerFactory.cc b/Utilities/StorageFactory/src/StorageProxyMakerFactory.cc new file mode 100644 index 0000000000000..3e477ddd8e23f --- /dev/null +++ b/Utilities/StorageFactory/src/StorageProxyMakerFactory.cc @@ -0,0 +1,6 @@ +#include "FWCore/ParameterSet/interface/ValidatedPluginFactoryMacros.h" +#include "Utilities/StorageFactory/interface/StorageProxyMakerFactory.h" + +using namespace edm::storage; + +EDM_REGISTER_VALIDATED_PLUGINFACTORY(StorageProxyMakerFactory, "CMS Storage Proxy Maker"); diff --git a/Utilities/StorageFactory/test/BuildFile.xml b/Utilities/StorageFactory/test/BuildFile.xml index 7d43de851ccec..2dc599d06fdd6 100644 --- a/Utilities/StorageFactory/test/BuildFile.xml +++ b/Utilities/StorageFactory/test/BuildFile.xml @@ -41,3 +41,7 @@ It's not that hard guard accesses to the PluginManager, but per Chris Jones, we to wait until the framework decides on a threading model to implement a fix. file="threadsafe.cpp" name="test_StorageFactory_threadsafe" --> + + + + diff --git a/Utilities/StorageFactory/test/test_storageproxies.sh b/Utilities/StorageFactory/test/test_storageproxies.sh new file mode 100755 index 0000000000000..e2d5b6c56e926 --- /dev/null +++ b/Utilities/StorageFactory/test/test_storageproxies.sh @@ -0,0 +1,27 @@ +#!/bin/bash + +# Pass in name and status +function die { echo $1: status $2 ; exit $2; } + +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_make_file_cfg.py || die "cmsRun test_storageproxy_make_file_cfg.py failed" $? + +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_test_cfg.py || die "cmsRun test_storageproxy_test_cfg.py failed" $? + +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_test_cfg.py --latencyRead || die "cmsRun test_storageproxy_test_cfg.py --latencyRead failed" $? +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_test_cfg.py --latencyWrite || die "cmsRun test_storageproxy_test_cfg.py --latencyWrite failed" $? + +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_test_cfg.py --trace || die "cmsRun test_storageproxy_test_cfg.py --trace failed" $? +grep -q "o .* test.root" trace_0.txt || die "File open entry missing in trace_0.txt" $? +grep -q "r " trace_0.txt || die "No read entries in trace_0.txt" $? +grep -q "o .* output.root" trace_1.txt || die "File open entry missing in trace_1.txt" $? +grep -q "w " trace_1.txt || die "No write entries in trace_0.txt" $? + +edmStorageTrace.py --summary trace_0.txt | grep -q "Singular reads" || die "No reads in summary for trace_0.txt" $? +edmStorageTrace.py --summary trace_1.txt | grep -q "Singular writes" || die "No reads in summary for trace_1.txt" $? + + +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_test_cfg.py --trace --latencyRead || die "cmsRun test_storageproxy_test_cfg.py --trace --latencyRead failed" $? +edmStorageTrace.py --summary trace_0.txt | grep -q "Duration .* ms" || die "Read duration has non-ms units in trace_1.txt" $? + +cmsRun ${SCRAM_TEST_PATH}/test_storageproxy_test_cfg.py --trace --latencyWrite || die "cmsRun test_storageproxy_test_cfg.py --trace --latencyWrite failed" $? +edmStorageTrace.py --summary trace_1.txt | grep -q "Duration .* ms" || die "Write duration has non-ms trace_1.txt" $? diff --git a/Utilities/StorageFactory/test/test_storageproxy_make_file_cfg.py b/Utilities/StorageFactory/test/test_storageproxy_make_file_cfg.py new file mode 100644 index 0000000000000..04003c8cae386 --- /dev/null +++ b/Utilities/StorageFactory/test/test_storageproxy_make_file_cfg.py @@ -0,0 +1,16 @@ +import FWCore.ParameterSet.Config as cms + +process = cms.Process("MAKE") + +process.source = cms.Source("EmptySource") +process.maxEvents.input = 10 + +process.out = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("test.root")) + +process.Thing = cms.EDProducer("ThingProducer") +process.OtherThing = cms.EDProducer("OtherThingProducer") +process.EventNumber = cms.EDProducer("EventNumberIntProducer") + + +process.o = cms.EndPath(process.out, cms.Task(process.Thing, process.OtherThing, process.EventNumber)) + diff --git a/Utilities/StorageFactory/test/test_storageproxy_test_cfg.py b/Utilities/StorageFactory/test/test_storageproxy_test_cfg.py new file mode 100644 index 0000000000000..565cda0e4fc8f --- /dev/null +++ b/Utilities/StorageFactory/test/test_storageproxy_test_cfg.py @@ -0,0 +1,40 @@ +import FWCore.ParameterSet.Config as cms + +import argparse + +parser = argparse.ArgumentParser(description="Test storage proxies") +parser.add_argument("--trace", action="store_true", help="Enable StorageTraceProxy") +parser.add_argument("--latencyRead", action="store_true", help="Add read latency") +parser.add_argument("--latencyWrite", action="store_true", help="Add write latency") +args = parser.parse_args() + +process = cms.Process("TEST") + +process.source = cms.Source("PoolSource", + fileNames = cms.untracked.vstring("file:test.root") +) + +process.out = cms.OutputModule("PoolOutputModule", fileName = cms.untracked.string("output.root")) + +adaptor = cms.Service("AdaptorConfig", storageProxies = cms.untracked.VPSet()) +if args.latencyRead: + adaptor.storageProxies.append(cms.PSet( + type = cms.untracked.string("StorageAddLatencyProxy"), + read = cms.untracked.uint32(100), + readv = cms.untracked.uint32(100), + )) +if args.latencyWrite: + adaptor.storageProxies.append(cms.PSet( + type = cms.untracked.string("StorageAddLatencyProxy"), + write = cms.untracked.uint32(100), + writev = cms.untracked.uint32(100), + )) +if args.trace: + adaptor.storageProxies.append(cms.PSet( + type = cms.untracked.string("StorageTracerProxy"), + traceFilePattern = cms.untracked.string("trace_%I.txt"), + )) + +process.add_(adaptor) + +process.ep = cms.EndPath(process.out) diff --git a/Utilities/XrdAdaptor/README.md b/Utilities/XrdAdaptor/README.md index 709e334e2e6f9..a90a80f4edb16 100644 --- a/Utilities/XrdAdaptor/README.md +++ b/Utilities/XrdAdaptor/README.md @@ -6,6 +6,31 @@ The `XrdAdaptor` package is the CMSSW implementation of CMS' AAA infrastructure. * Recovery from some errors via re-tries * Use of multiple XRootD sources (described further [here](doc/multisource_algorithm_design.md)) +The `XrdAdaptor` behavior can be simulated to some extent with local files with +```py +# application-only cache hint implies similar edm::storage::Storage::prefetch() +# behavior as in XrdFile::prefetch() +process.add_(cms.Service("SiteLocalConfigService", + overrideSourceCacheHintDir = cms.untracked.string("application-only") +)) + +# Add e.g. 10-millisecond latency to singular and vector reads +# If the job reads local files via TFile::Open() in addition to PoolSource, +# you want to exclude those from the latency addition +process.add_(cms.Service("AdaptorConfig", + storageProxies = cms.untracked.VPSet( + cms.PSet( + type = cms.untracked.string("StorageAddLatencyProxy"), + read = cms.untracked.uint32(10000), # microseconds + readv = cms.untracked.uint32(10000), # microseconds + exclude = cms.untracked.vstring(...), + ) + ) +)) +``` +The `StorageAddLatencyProxy` is described in [`Utilities/StorageFactory/README.md`](../../Utilities/StorageFactory/README.md). Another useful component in this context is `StorageTracerProxy` (e.g. to find out the other-than-`PoolSource`-accessed files mentioned above) + + ## Short description of components ### `ClientRequest` diff --git a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc index fda535654169b..282bdfa9da5b5 100644 --- a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc +++ b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc @@ -89,8 +89,7 @@ namespace edm::storage { mode |= IOFlags::OpenUnbuffered; std::string fullpath(proto + ":" + path); - auto file = std::make_unique(fullpath, mode); - return f->wrapNonLocalFile(std::move(file), proto, std::string(), mode); + return std::make_unique(fullpath, mode); } void stagein(const std::string &proto, const std::string &path, const AuxSettings &aux) const override { @@ -125,6 +124,8 @@ namespace edm::storage { return true; } + UseLocalFile usesLocalFile() const override { return UseLocalFile::kNo; } + void setDebugLevel(unsigned int level) const { auto oldLevel = m_lastDebugLevel.load(); if (level == oldLevel) {