diff --git a/system/Aggregator.cpp b/system/Aggregator.cpp index 1c5e230a6..fb7d71a1f 100644 --- a/system/Aggregator.cpp +++ b/system/Aggregator.cpp @@ -32,307 +32,9 @@ // DAMAGE. //////////////////////////////////////////////////////////////////////// -#include - #include "Aggregator.hpp" -#include "Grappa.hpp" -#include - -#include "PerformanceTools.hpp" // command line options for Aggregator DEFINE_int64( aggregator_autoflush_ticks, 50000, "number of ticks to wait before autoflushing aggregated active messages"); -DEFINE_int64( aggregator_max_flush, 0, "flush no more than this many buffers per poll (0 for unlimited)"); DEFINE_bool( aggregator_enable, true, "should we aggregate packets or just send them?"); DEFINE_bool( flush_on_idle, true, "flush all aggregated messages there's nothing better to do"); - - -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_0_to_255_bytes, 0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_256_to_511_bytes, 0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_512_to_767_bytes, 0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_768_to_1023_bytes, 0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_1024_to_1279_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_1280_to_1535_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_1536_to_1791_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_1792_to_2047_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_2048_to_2303_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_2304_to_2559_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_2560_to_2815_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_2816_to_3071_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_3072_to_3327_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_3328_to_3583_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_3584_to_3839_bytes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric,aggregator_3840_to_4095_bytes,0); - - -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_messages_aggregated_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_bytes_aggregated_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_messages_deaggregated_, 0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_bytes_deaggregated_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_messages_forwarded_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_bytes_forwarded_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_newest_wait_ticks_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_oldest_wait_ticks_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_polls_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_flushes_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_multiflushes_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_timeouts_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_idle_flushes,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_capacity_flushes_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_idle_poll_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_idle_poll_useful_,0); -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_bundles_received_,0); -GRAPPA_DEFINE_METRIC( SummarizingMetric, aggregator_bundle_bytes_received_, 0); - -/* Not currently useful because Communicator.cpp has one *//* -GRAPPA_DEFINE_METRIC( SimpleMetric, aggregator_start_time, []() { - // initialization value - return Grappa::walltime(); - }); -GRAPPA_DEFINE_METRIC( CallbackMetric, aggregator_end_time, []() { - // sampling value - return Grappa::walltime(); - }); - */ - -/// global Aggregator instance -Aggregator global_aggregator; - - -#ifdef STL_DEBUG_ALLOCATOR -DEFINE_int64( aggregator_access_control_signal, SIGUSR2, "signal used to toggle aggregator queue access control"); -bool aggregator_access_control_active = false; -static void aggregator_toggle_access_control_sighandler( int signum ) { - aggregator_access_control_active = ~aggregator_access_control_active; -} - -#endif - -/// Construct Aggregator -Aggregator::Aggregator( ) - : max_nodes_( -1 ) - , buffers_( ) - , previous_timestamp_( 0L ) - , least_recently_sent_( ) - , aggregator_deaggregate_am_handle_( -1 ) - , route_map_( ) -#ifdef VTRACE_FULL - , tag_( -1 ) - , vt_agg_commid_( VT_COMM_DEF( "Aggregator" ) ) -#endif - , stats() -{ -#ifdef STL_DEBUG_ALLOCATOR - struct sigaction access_control_toggle_sa; - sigemptyset( &access_control_toggle_sa.sa_mask ); - access_control_toggle_sa.sa_flags = 0; - access_control_toggle_sa.sa_handler = &aggregator_toggle_access_control_sighandler; - CHECK_EQ( 0, sigaction( FLAGS_aggregator_access_control_signal, &access_control_toggle_sa, 0 ) ) - << "Aggregator access control signal handler installation failed."; - if( aggregator_access_control_active ) STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadOnly); -#endif -} - -void Aggregator_deaggregate_am( void * buf, size_t size ); - -/// Initialize aggregator -void Aggregator::init() { - max_nodes_ = global_communicator.cores; - least_recently_sent_.resize( global_communicator.cores ); - - buffers_.resize( max_nodes_ - buffers_.size() ); - route_map_.resize( max_nodes_ - route_map_.size() ); - // initialize route map - for( Core i = 0; i < max_nodes_; ++i ) { - route_map_[i] = i; - } -#ifdef VTRACE_FULL - tag_ = global_communicator.mycore; -#endif -} - -/// Tear down aggregator -Aggregator::~Aggregator() { -#ifdef STL_DEBUG_ALLOCATOR - STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadWrite); -#endif -} - -/// After deaggregate handler is called to buffer an aggregated -/// message bundle, this method unpacks the bundle and executes the -/// Grappa-level active message handlers. -void Aggregator::deaggregate( ) { - GRAPPA_FUNCTION_PROFILE( GRAPPA_COMM_GROUP ); -#ifdef VTRACE_FULL - VT_TRACER("deaggregate"); -#endif - StateTimer::enterState_deaggregation(); - while( !received_AM_queue_.empty() ) { - DVLOG(5) << "deaggregating"; - // TODO: too much copying - ReceivedAM amp = received_AM_queue_.front(); - -#ifdef STL_DEBUG_ALLOCATOR - if( aggregator_access_control_active ) STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadWrite); -#endif - received_AM_queue_.pop(); -#ifdef STL_DEBUG_ALLOCATOR - if( aggregator_access_control_active ) STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadOnly); -#endif - - DVLOG(5) << "deaggregating message of size " << amp.size_; - uintptr_t msg_base = reinterpret_cast< uintptr_t >( amp.buf_ ); - for( unsigned int i = 0; i < amp.size_; ) { - AggregatorGenericCallHeader * header = reinterpret_cast< AggregatorGenericCallHeader * >( msg_base ); - AggregatorAMHandler fp = reinterpret_cast< AggregatorAMHandler >( header->function_pointer ); - void * args = reinterpret_cast< void * >( msg_base + - sizeof( AggregatorGenericCallHeader ) ); - void * payload = reinterpret_cast< void * >( msg_base + - sizeof( AggregatorGenericCallHeader ) + - header->args_size ); - if( header->destination == Grappa::mycore() ) { // for us? - - stats.record_deaggregation( sizeof( AggregatorGenericCallHeader ) + header->args_size + header->payload_size ); - // trace fine-grain communication -#ifdef GRAPPA_TRACE - if (FLAGS_record_grappa_events) { - // TODO: good candidate for TAU_CONTEXT_EVENT - int fn_p_tag = aggregator_trace_tag( fp ); - TAU_TRACE_RECVMSG(fn_p_tag, header->source, header->args_size + header->payload_size ); - } -#endif - -#ifdef VTRACE_FULL - { - VT_RECV( vt_agg_commid_, header->tag, sizeof( AggregatorGenericCallHeader ) + header->args_size + header->payload_size ); - } -#endif - - - DVLOG(5) << "calling " << *header - << " with args " << args - << " and payload " << payload; - - { - GRAPPA_PROFILE( deag_func_timer, "deaggregate execution", "", GRAPPA_USERAM_GROUP ); - fp( args, header->args_size, payload, header->payload_size ); // execute - } - } else { // not for us, so forward towards destination - DVLOG(5) << "forwarding " << *header - << " with args " << args - << " and payload " << payload; - stats.record_forward( sizeof( AggregatorGenericCallHeader ) + header->args_size + header->payload_size ); - aggregate( header->destination, fp, args, header->args_size, payload, header->payload_size ); - } - i += sizeof( AggregatorGenericCallHeader ) + header->args_size + header->payload_size; - msg_base += sizeof( AggregatorGenericCallHeader ) + header->args_size + header->payload_size; - } - } -} - -/// clean up aggregator before destruction -void Aggregator::finish() { -#ifdef STL_DEBUG_ALLOCATOR - LOG(INFO) << "Cleaning up access control...."; - STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadWrite); -#endif -} - -/// Deaggration active message handler. This receives an -/// aggregated message bundle and buffers it for later deaggregation. -void Aggregator_deaggregate_am( void * buf, size_t size ) { - GRAPPA_FUNCTION_PROFILE( GRAPPA_COMM_GROUP ); -#ifdef VTRACE_FULL - VT_TRACER("deaggregate AM"); -#endif - - DVLOG(5) << "received message with size " << size; - // TODO: too much copying - Aggregator::ReceivedAM am( size, buf ); - global_aggregator.stats.record_receive_bundle( size ); -#ifdef STL_DEBUG_ALLOCATOR - if( aggregator_access_control_active ) STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadWrite); -#endif - global_aggregator.received_AM_queue_.push( am ); -#ifdef STL_DEBUG_ALLOCATOR - if( aggregator_access_control_active ) STLMemDebug::BaseAllocator::getMemMgr().setAccessMode(STLMemDebug::memReadOnly); -#endif - -} - -/// proxy call to make it easier to integrate with scheduler -bool idle_flush_aggregator() { - return global_aggregator.idle_flush_poll(); -} - -/// proxy call to make it easier to integrate with scheduler -size_t Grappa_sizeof_header() { - return sizeof( AggregatorGenericCallHeader ); -} - - -/* metrics */ -AggregatorMetrics::AggregatorMetrics() - : histogram_() - { - histogram_[0] = &aggregator_0_to_255_bytes; - histogram_[1] = &aggregator_256_to_511_bytes; - histogram_[2] = &aggregator_512_to_767_bytes; - histogram_[3] = &aggregator_768_to_1023_bytes; - histogram_[4] = &aggregator_1024_to_1279_bytes; - histogram_[5] = &aggregator_1280_to_1535_bytes; - histogram_[6] = &aggregator_1536_to_1791_bytes; - histogram_[7] = &aggregator_1792_to_2047_bytes; - histogram_[8] = &aggregator_2048_to_2303_bytes; - histogram_[9] = &aggregator_2304_to_2559_bytes; - histogram_[10] = &aggregator_2560_to_2815_bytes; - histogram_[11] = &aggregator_2816_to_3071_bytes; - histogram_[12] = &aggregator_3072_to_3327_bytes; - histogram_[13] = &aggregator_3328_to_3583_bytes; - histogram_[14] = &aggregator_3584_to_3839_bytes; - histogram_[15] = &aggregator_3840_to_4095_bytes; - } - - - -void AggregatorMetrics::record_poll() { - aggregator_polls_++; -} - -void AggregatorMetrics::record_flush( Grappa::Timestamp oldest_ts, Grappa::Timestamp newest_ts ) { - Grappa::Timestamp ts = Grappa::timestamp(); - aggregator_oldest_wait_ticks_ += ts - oldest_ts; - aggregator_newest_wait_ticks_ += ts - newest_ts; - aggregator_flushes_++; -} - -void AggregatorMetrics::record_idle_flush() { - aggregator_idle_flushes++; -} - -void AggregatorMetrics::record_multiflush() { - aggregator_multiflushes_++; -} - -void AggregatorMetrics::record_timeout() { - aggregator_timeouts_++; -} - -void AggregatorMetrics::record_idle_poll( bool useful ) { - if ( useful ) aggregator_idle_poll_useful_++; - else aggregator_idle_poll_++; -} - -void AggregatorMetrics::record_capacity_flush() { - aggregator_capacity_flushes_++; -} - -void AggregatorMetrics::record_forward( size_t bytes ) { - ++aggregator_messages_forwarded_; - aggregator_bytes_forwarded_ += bytes; -} - -void AggregatorMetrics::record_receive_bundle( size_t bytes ) { - ++aggregator_bundles_received_; - aggregator_bundle_bytes_received_+= bytes ; -} diff --git a/system/Aggregator.hpp b/system/Aggregator.hpp index e9128897e..e7aabeb95 100644 --- a/system/Aggregator.hpp +++ b/system/Aggregator.hpp @@ -1,3 +1,4 @@ +#pragma once //////////////////////////////////////////////////////////////////////// // Copyright (c) 2010-2015, University of Washington and Battelle // Memorial Institute. All rights reserved. @@ -31,681 +32,36 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH // DAMAGE. //////////////////////////////////////////////////////////////////////// -#pragma once - -#ifdef STL_DEBUG_ALLOCATOR -#include "../tools/stl-debug-allocator/archive/Source/includes.h" -#include "../tools/stl-debug-allocator/archive/Source/allocator.h" -#endif - -#include -#include -#include - -#include -#include - -#include #include #include -#include "common.hpp" - #include "Communicator.hpp" -#include "Timestamp.hpp" -#include "StateTimer.hpp" -#include "MetricsTools.hpp" -#include "Metrics.hpp" - -#ifdef VTRACE -#include -#endif -#include "PerformanceTools.hpp" +#include "Message.hpp" -/// macro to compute a mpi-like tag for communication tracing -//#define aggregator_trace_tag(data) (int) (0x7FFF & reinterpret_cast(data)) -#define aggregator_trace_tag(data) (0xffffffff & reinterpret_cast(data)) /// number of ticks to wait before automatically flushing a buffer. DECLARE_int64( aggregator_autoflush_ticks ); -DECLARE_int64( aggregator_max_flush ); DECLARE_bool( aggregator_enable ); -DECLARE_bool(flush_on_idle ); - -/// Type of aggregated active message handler -typedef void (* AggregatorAMHandler)( void *, size_t, void *, size_t ); - -extern void Aggregator_deaggregate_am( void * buf, size_t size ); - -extern bool aggregator_access_control_active; - -/// Least recently used queue for tracking when to send buffered active messages -class LRQueue { -private: - struct LREntry { - LREntry * older_; - LREntry * newer_; - int key_; - uint64_t priority_; - LREntry() - : older_(NULL) - , newer_(NULL) - , key_( 0 ) - , priority_( 0 ) - { } - }; - - LREntry * entries_; - - LREntry oldest_; - LREntry newest_; - - int size_; - -public: - LRQueue( ) - : entries_( NULL ) - , oldest_( ) - , newest_( ) - , size_( 0 ) - { - oldest_.newer_ = &newest_; - oldest_.key_ = -1; - oldest_.priority_ = 0; - - newest_.older_ = &oldest_; - newest_.key_ = -1; - newest_.priority_ = 0; - } - - ~LRQueue() { - delete [] entries_; - } - - void resize( int size ) { - CHECK( entries_ == NULL ) << "may only resize once"; - entries_ = new LREntry[ size ]; - for( int i = size_; i < size; ++i ) { - entries_[i].key_ = i; - entries_[i].priority_ = 0; - } - size_ = size; - } - - inline void update_or_insert( int key, uint64_t priority ) { - // NB: update has no effect. - // TODO: change method name - - //DCHECK( key < size_ ) << "key too big"; - - // are we not already in the queue? - if( entries_[key].older_ == NULL ) { - //DCHECK( entries_[key].newer_ == NULL ) << "if older is NULL, newer must be NULL"; - - // insert at newer end - entries_[key].newer_ = &newest_; - entries_[key].older_ = newest_.older_; - newest_.older_ = &entries_[key]; - - // add back link - entries_[key].older_->newer_ = &entries_[key]; - - // keeping this is the whole point! - entries_[key].priority_ = priority; - } - } - - int top_key() { - return oldest_.newer_->key_; - } - - uint64_t top_priority() { - return oldest_.newer_->priority_; - } - - void remove_key( int key ) { - //DCHECK( key < size_ ) << "key too big"; - if( entries_[key].older_ != NULL ) { - //DCHECK( entries_[key].newer_ != NULL ) << "nullness should match"; - entries_[key].older_->newer_ = entries_[key].newer_; - entries_[key].newer_->older_ = entries_[key].older_; - entries_[key].older_ = NULL; - entries_[key].newer_ = NULL; - } - } - - bool empty() { - return oldest_.newer_ == &newest_; - } - -}; - - -GRAPPA_DECLARE_METRIC( SimpleMetric, aggregator_messages_aggregated_); -GRAPPA_DECLARE_METRIC( SimpleMetric, aggregator_bytes_aggregated_); -GRAPPA_DECLARE_METRIC( SimpleMetric, aggregator_messages_deaggregated_); -GRAPPA_DECLARE_METRIC( SimpleMetric, aggregator_bytes_deaggregated_); -GRAPPA_DECLARE_METRIC( SimpleMetric, aggregator_bundles_received_); - -/// stats class for aggregator -class AggregatorMetrics { -private: - Grappa::SimpleMetric * histogram_[16]; - -public: - - AggregatorMetrics(); - - void record_poll(); - - void record_flush( Grappa::Timestamp oldest_ts, Grappa::Timestamp newest_ts ); - void record_idle_flush(); - void record_multiflush(); - void record_timeout(); - void record_idle_poll( bool useful ); - void record_capacity_flush(); - - void record_aggregation( size_t bytes ) { - aggregator_messages_aggregated_++; - aggregator_bytes_aggregated_ += bytes; - (*(histogram_[ (bytes >> 8) & 0xf ]))++; - } - - void record_deaggregation( size_t bytes ) { - ++aggregator_messages_deaggregated_; - aggregator_bytes_deaggregated_ += bytes; - } - - void record_forward( size_t bytes ); - - void record_receive_bundle( size_t bytes ); - -}; - -/// Header for aggregated active messages. -struct AggregatorGenericCallHeader { - uintptr_t function_pointer; - Core destination; - uint16_t args_size; - uint16_t payload_size; -#ifdef GRAPPA_TRACE -// TODO: really don't want this transmitted even in tracing - Core source; -#endif -#ifdef VTRACE_FULL -// TODO: really don't want this transmitted even in tracing - uint64_t tag; -#endif -}; - -/// dump human-readable aggregated active message header -static std::ostream& operator<<( std::ostream& o, const AggregatorGenericCallHeader& h ) { - return o << "[f=" << (void*) h.function_pointer - << ",d=" << h.destination - << ",a=" << h.args_size - << ",p=" << h.payload_size - << "(t=" << sizeof(h) + h.args_size + h.payload_size << ")" - << "]"; -} - -/// Active message aggregation per-destination storage class. - /// use default copy constructor and assignment operator -template< const int max_size_ > -class AggregatorBuffer { -private: -public: - Grappa::Timestamp oldest_ts_; - Grappa::Timestamp newest_ts_; - int current_position_; - char buffer_[ max_size_ ]; - - AggregatorBuffer() - : current_position_( 0 ) - { } - - /// does a message of size fit in this buffer? - inline bool fits( size_t size ) const { - return (current_position_ + size) < max_size_; - } - - /// insert data into buffer. assumes it fits. - inline void insert( const void * data, size_t size ) { - newest_ts_ = Grappa::timestamp(); - if( current_position_ == 0 ) oldest_ts_ = newest_ts_; - DCHECK ( fits( size ) ); - memcpy( &buffer_[ current_position_ ], data, size ); - current_position_ += size; - } - - // reset buffer - inline void flush() { - current_position_ = 0; - } -}; - -template -size_t Grappa_sizeof_message( const ArgStruct * args, const size_t args_size = sizeof( ArgStruct ), - const void * payload = NULL, const size_t payload_size = 0) { - return payload_size + args_size + sizeof( AggregatorGenericCallHeader ); -} - - -/// Active message aggregation class. -class Aggregator { -private: - DISALLOW_COPY_AND_ASSIGN( Aggregator ); - - /// max node count. used to allocate buffers. - int max_nodes_; - - /// number of bytes in each aggregation buffer - /// TODO: this should track the IB MTU - static const unsigned int buffer_size_ = 4096 - 72; - - /// buffer for sending non-aggregated messages - char raw_send_buffer_[ buffer_size_ ]; - - /// buffers holding aggregated messages. - std::vector< AggregatorBuffer< buffer_size_ > > buffers_; - - /// current timestamp for autoflusher. - uint64_t previous_timestamp_; - - /// queue for autoflusher. - LRQueue least_recently_sent_; - - /// handle for deaggregation active message - int aggregator_deaggregate_am_handle_; - - /// routing table for hierarchical aggregation - std::vector< Core > route_map_; - -#ifdef VTRACE_FULL - /// Vampir trace message metadata - uint64_t tag_; - /// Vampir trace message metadata - unsigned vt_agg_commid_; -#endif - - /// storage for deaggregation - struct ReceivedAM { - size_t size_; - char buf_[buffer_size_]; - ReceivedAM( size_t size, void * buf ) - : size_( size ) - , buf_() - { - DCHECK( size < buffer_size_ ); - memcpy( buf_, buf, size ); - } - }; - -#ifdef STL_DEBUG_ALLOCATOR - /// Storage for received pre-deaggregation active messages - std::queue< ReceivedAM, std::deque< ReceivedAM, STLMemDebug::Allocator< ReceivedAM > > > received_AM_queue_; -#else - /// Storage for received pre-deaggregation active messages - std::queue< ReceivedAM > received_AM_queue_; -#endif - - /// Deaggregated buffered active messages - void deaggregate( ); - friend void Aggregator_deaggregate_am( void * buf, size_t size ); - -public: - /// statistics - AggregatorMetrics stats; - - /// Construct Aggregator. - Aggregator( ); - - /// Initialize aggregator. - void init(); - - /// Tear down aggegator. - ~Aggregator(); - - /// Clean up aggregator before destruction. - void finish(); - - /// route map lookup for hierarchical aggregation - inline Core get_target_for_node( Core n ) const { - return route_map_[ n ]; - } - - /// route map update for hierarchical aggregation - inline void update_target_for_node( Core node, Core target ) { - route_map_[ node ] = target; - } - - /// send aggregated messages for node - inline void flush( Core node ) { - GRAPPA_FUNCTION_PROFILE( GRAPPA_COMM_GROUP ); - DVLOG(5) << "flushing node " << node; - Core target = route_map_[ node ]; - stats.record_flush( buffers_[ target ].oldest_ts_, buffers_[ target ].newest_ts_ ); - size_t size = buffers_[ target ].current_position_; - global_communicator.send_immediate_with_payload( target, [] (void * buf, int size) { - Aggregator_deaggregate_am( buf, size ); - }, buffers_[ target ].buffer_, size ); - buffers_[ target ].flush(); - //DVLOG(5) << "heap before flush:\n" << least_recently_sent_.toString( ); - least_recently_sent_.remove_key( target ); - //DVLOG(5) << "heap after flush:\n" << least_recently_sent_.toString( ); - } - - /// poll and optionally flush on idle - inline bool idle_flush_poll() { - GRAPPA_FUNCTION_PROFILE( GRAPPA_COMM_GROUP ); -#ifdef VTRACE_FULL - VT_TRACER("idle_flush_poll"); -#endif - StateTimer::enterState_communication(); - if( FLAGS_flush_on_idle ) { - while ( !least_recently_sent_.empty() ) { - stats.record_idle_flush(); - DVLOG(5) << "idle flush Core " << least_recently_sent_.top_key(); - flush(least_recently_sent_.top_key()); - } - } - bool useful = poll(); - stats.record_idle_poll(useful); - return useful; - } - - - /// get timestamp. we avoid calling rdtsc for performance - inline uint64_t get_timestamp() { - //return previous_timestamp_ + 1; - Grappa::tick(); - return Grappa::timestamp(); - } - - /// get delayed timestamp. - /// TODO: remove. unused. - inline uint64_t get_previous_timestamp() { - return previous_timestamp_; - } - - /// poll communicator. send any aggregated messages that have been sitting for too long - inline bool poll() { - GRAPPA_FUNCTION_PROFILE( GRAPPA_COMM_GROUP ); -#ifdef VTRACE_FULL - VT_TRACER("poll"); -#endif - stats.record_poll(); - - uint64_t beforePoll = aggregator_bundles_received_; - global_communicator.poll(); - uint64_t afterPoll = aggregator_bundles_received_; - bool pollUseful = afterPoll > beforePoll; - - - uint64_t ts = get_timestamp(); - - uint64_t beforeDeaggregate = aggregator_messages_deaggregated_; - deaggregate(); - uint64_t afterDeaggregate = aggregator_messages_deaggregated_; - bool deagUseful = afterDeaggregate > beforeDeaggregate; - - // timestamp overflows are silently ignored. - // since it would take many many years to see one, I think that's okay for now. - int num_flushes = 0; - while( !least_recently_sent_.empty() && - ((-least_recently_sent_.top_priority() + FLAGS_aggregator_autoflush_ticks) < ts) && - ((FLAGS_aggregator_max_flush == 0) || (num_flushes < FLAGS_aggregator_max_flush)) ) { - stats.record_timeout(); - DVLOG(5) << "timeout for node " << least_recently_sent_.top_key() - << ": inserted at " << -least_recently_sent_.top_priority() - << " autoflush_ticks " << FLAGS_aggregator_autoflush_ticks - << " (current ts " << ts << ")"; - flush( least_recently_sent_.top_key() ); // send. - ++num_flushes; - } - if( num_flushes > 0 ) stats.record_multiflush(); - bool flushUseful = num_flushes > 0; - previous_timestamp_ = ts; - - return flushUseful || deagUseful || pollUseful; - } - - /// what's the largest message we can aggregate? - inline const size_t max_size() const { return buffer_size_; } - - /// how much space is available for aggregation to this destination? - inline const size_t remaining_size( Core destination ) const { - Core target = get_target_for_node( destination ); - return buffer_size_ - buffers_[ target ].current_position_; - } - - /// Aggregate a message. Do not call this directly; instead, call - /// Grappa_call_on(). - /// - /// @param destination core that will receive this message - /// @param fn_p function pointer of handler to run on reception - /// @param args pointer to arg struct - /// @param args_size size in bytes of arg struct - /// @param payload pointer to payload buffer - /// @param payload_size size in bytes of payload buffer - inline void aggregate( Core destination, AggregatorAMHandler fn_p, - const void * args, const size_t args_size, - const void * payload, const size_t payload_size ) { - GRAPPA_FUNCTION_PROFILE( GRAPPA_COMM_GROUP ); -#ifdef VTRACE_FULL - VT_TRACER("aggregate"); -#endif - CHECK( destination < max_nodes_ ) << "destination:" << destination << " max_nodes_:" << max_nodes_; - Core target = get_target_for_node( destination ); - CHECK( target < max_nodes_ ) << "target:" << target << " max_nodes_:" << max_nodes_; - - // make sure arg struct and payload aren't too big. - // in the future, this would lead us down a separate code path for large messages. - // for now, fail. - size_t total_call_size = Grappa_sizeof_message( args, args_size, payload, payload_size ); - DVLOG(5) << "aggregating " << total_call_size << " bytes to " - << destination << "(target " << target << ")"; - CHECK( total_call_size < buffer_size_ ) << "payload_size( " << payload_size << " )" - << "+args_size( " << args_size << " )" - << "+header_size( " << sizeof( AggregatorGenericCallHeader ) << " )" - << "= " << total_call_size << " of max( " << buffer_size_ << " )"; - - AggregatorGenericCallHeader header = { reinterpret_cast< uintptr_t >( fn_p ), - destination, - static_cast(args_size), - static_cast(payload_size) -#ifdef GRAPPA_TRACE - , global_communicator.mycore() -#endif -#ifdef VTRACE_FULL - , tag_ -#endif - }; - - if( FLAGS_aggregator_enable ) { - // does call fit in aggregation buffer? - if( !( buffers_[ target ].fits( total_call_size ) ) ) { - DVLOG(5) << "aggregating " << total_call_size << " bytes to " - << destination << "(target " << target - << "): didn't fit " - << "(current buffer position " << buffers_[ target ].current_position_ - << ", next buffer position " << buffers_[ target ].current_position_ + total_call_size << ")"; - // doesn't fit, so flush before inserting - stats.record_capacity_flush(); - flush( target ); - DCHECK( buffers_[ target ].fits( total_call_size )); - } - - // now call must fit, so just insert it - buffers_[ target ].insert( &header, sizeof( header ) ); - buffers_[ target ].insert( args, args_size ); - buffers_[ target ].insert( payload, payload_size ); - stats.record_aggregation( total_call_size ); - - } else { - - // aggregator is disabled; just send message - char * buf = raw_send_buffer_; - memcpy( buf, &header, sizeof( header ) ); - buf += sizeof( header ); - memcpy( buf, args, args_size ); - buf += args_size; - memcpy( buf, payload, payload_size ); - global_communicator.send_immediate_with_payload( target, [] (void * buf, int size) { - Aggregator_deaggregate_am( buf, size ); - }, raw_send_buffer_, total_call_size ); - } - - // trace fine-grain communication - if (FLAGS_record_grappa_events) { - // TODO: good candidate for TAU_CONTEXT_EVENT -#ifdef GRAPPA_TRACE - int fn_p_tag = aggregator_trace_tag( fn_p ); - TAU_TRACE_SENDMSG(fn_p_tag, destination, args_size + payload_size ); - // TODO: maybe add named communicators for separate function calls? -#endif -#ifdef VTRACE_FULL - VT_SEND( vt_agg_commid_, tag_, total_call_size ); -#endif - } - - uint64_t ts = get_timestamp(); - least_recently_sent_.update_or_insert( target, -ts ); - previous_timestamp_ = ts; -#ifdef VTRACE_FULL - tag_ += global_communicator.mycore(); -#endif - DVLOG(5) << "aggregated " << header; -} - -}; - - -extern Aggregator global_aggregator; - -// TODO: fix this so it works -// log deprecated call sites -#define Grappa_call_onx( ... ) \ - do { \ - LOG(WARNING) << "Using old aggregator bypass, which adds additional blocking"; \ - Grappa_call_on_m( __VA_ARGS__ ); \ - } while(0) - -#define Grappa_call_on_xx( ... ) \ - do { \ - LOG(WARNING) << "Using old aggregator bypass, which adds additional blocking"; \ - Grappa_call_on_x_m( __VA_ARGS__ ); \ - } while(0) - - -#ifdef ENABLE_RDMA_AGGREGATOR -#include "Message.hpp" -#endif - - - -/// Aggregate a message. -/// -/// @param destination core that will receive this message -/// @param fn_p function pointer of handler to run on reception -/// @param args pointer to arg struct -/// @param args_size size in bytes of arg struct -/// @param payload pointer to payload buffer -/// @param payload_size size in bytes of payload buffer -template< typename ArgsStruct > -inline void Grappa_call_on( Core destination, void (* fn_p)(ArgsStruct *, size_t, void *, size_t), - const ArgsStruct * args, const size_t args_size = sizeof( ArgsStruct ), - const void * payload = NULL, const size_t payload_size = 0) -{ - LOG(ERROR) << "Do not call this function!"; - exit(1); - StateTimer::start_communication(); -#if defined(OLD_MESSAGES_NEW_AGGREGATOR) && defined(ENABLE_RDMA_AGGREGATOR) - struct __attribute__((deprecated("Using old aggregator bypass"))) Warning {}; - CHECK_EQ( sizeof(ArgsStruct), args_size ) << "must add special-case for nonstandard ArgsStruct usage"; - typedef typename std::remove_const::type NonConstArgsStruct; - ArgsStruct& a = *(const_cast(args)); // HACK to work around some const disagreements at call sites - if( NULL == payload ) { - auto m = Grappa::send_message( destination, [a,fn_p] () mutable { - fn_p( &a, sizeof(a), NULL, 0 ); - }); - } else { - auto m = Grappa::send_message( destination, [a,fn_p] (void * payload, size_t size) mutable { - fn_p( &a, sizeof(a), payload, size ); - }, const_cast(payload), payload_size ); - } -#else - global_aggregator.aggregate( destination, - reinterpret_cast< AggregatorAMHandler >( fn_p ), - static_cast< const void * >( args ), args_size, - static_cast< const void * >( payload ), payload_size ); -#endif - StateTimer::stop_communication(); -} - - -/// Aggregate a message. Same as Grappa_call_on(), but with a -/// different payload type. -/// TODO: deprecated. remove this. -template< typename ArgsStruct, typename PayloadType > -inline void Grappa_call_on_x( Core destination, void (* fn_p)(ArgsStruct *, size_t, PayloadType *, size_t), - const ArgsStruct * args, const size_t args_size = sizeof( ArgsStruct ), - const PayloadType * payload = NULL, const size_t payload_size = 0) -{ - LOG(ERROR) << "Do not call this function!"; - exit(1); - StateTimer::start_communication(); -#if defined(OLD_MESSAGES_NEW_AGGREGATOR) && defined(ENABLE_RDMA_AGGREGATOR) - struct __attribute__((deprecated("Using old aggregator bypass, which adds additional blocking"))) Warning {}; - //LOG(WARNING) << "Using old aggregator bypass, which adds additional blocking"; - CHECK_EQ( sizeof(ArgsStruct), args_size ) << "must add special-case for nonstandard ArgsStruct usage"; - ArgsStruct& a = *(std::remove_const(args)); - if( NULL == payload ) { - auto m = Grappa::send_message( destination, [a,fn_p] { - fn_p( &a, sizeof(a), NULL, 0 ); - }); - } else { - auto m = Grappa::send_message( destination, [a,fn_p] (void * void_payload, size_t size) { - PayloadType * p = reinterpret_cast< PayloadType * >( void_payload ); - size_t psize = size / sizeof(PayloadType); - fn_p( &a, sizeof(a), p, psize ); - }, static_cast< void * >( payload ), payload_size ); - } -#else - global_aggregator.aggregate( destination, - reinterpret_cast< AggregatorAMHandler >( fn_p ), - static_cast< const void * >( args ), args_size, - static_cast< const void * >( payload ), payload_size ); -#endif - StateTimer::stop_communication(); -} +DECLARE_bool( flush_on_idle ); namespace Grappa { - namespace impl { /// Poll Grappa aggregation and communication layers. static inline void poll() { - global_aggregator.poll(); - #ifdef ENABLE_RDMA_AGGREGATOR - Grappa::impl::global_rdma_aggregator.poll(); - #endif + global_communicator.poll(); + global_rdma_aggregator.poll(); } /// Send waiting aggregated messages to a particular destination. static inline void flush( Core n ) { - global_aggregator.flush( n ); + global_rdma_aggregator.flush( n ); } - /// Meant to be called when there's no other work to be done, calls poll, - /// flushes any aggregator buffers with anything in them, and deaggregates. - static inline void idle_flush_poll() { - global_aggregator.idle_flush_poll(); - } - } } diff --git a/system/Aggregator_tests.cpp b/system/Aggregator_tests.cpp index f8f60ad23..ec49fbf1e 100644 --- a/system/Aggregator_tests.cpp +++ b/system/Aggregator_tests.cpp @@ -46,229 +46,4 @@ DECLARE_int64( aggregator_autoflush_ticks ); #include BOOST_AUTO_TEST_SUITE( Aggregator_tests ); - - -int64_t count = 0; -void function() { - ++count; -} - -struct first_call_args -{ - int arg1; - double arg2; -}; - -int first_int = 0; - -void first_call(first_call_args * args, size_t args_size, void * payload, size_t payload_size) -{ - //BOOST_MESSAGE( "first_call: arg1=" << args->arg1 << " arg2=" << args->arg2 ); - ++first_int; -} - - -struct second_call_args -{ - char arg1[ 100 ]; - int64_t i; -}; - -int second_int = 0; - -void second_call(second_call_args * args, size_t args_size, void * payload, size_t payload_size) -{ - //BOOST_MESSAGE( "second_call: arg1=" << args->arg1 ); - second_int += args->i; - BOOST_MESSAGE( "received i " << args-> i << " second_int " << second_int ); - BOOST_CHECK( payload_size == 0 || payload_size == args_size ); -} - -struct done_call_args -{ -}; - -bool done = false; - -void done_call(done_call_args * args, size_t args_size, void * payload, size_t payload_size) -{ - done = true; - BOOST_MESSAGE( "received done " ); - BOOST_CHECK( payload_size == 0 || payload_size == args_size ); -} - - -BOOST_AUTO_TEST_CASE( test1 ) { - google::ParseCommandLineFlags( &(boost::unit_test::framework::master_test_suite().argc), - &(boost::unit_test::framework::master_test_suite().argv), - true ); - google::InitGoogleLogging(boost::unit_test::framework::master_test_suite().argv[0]); - google::InstallFailureSignalHandler( ); - FLAGS_aggregator_autoflush_ticks = 1000000; - - Communicator& s = global_communicator; - s.init( &(boost::unit_test::framework::master_test_suite().argc), - &(boost::unit_test::framework::master_test_suite().argv) ); - BOOST_CHECK( s.cores() >= 2 ); - BOOST_MESSAGE( "We have " << s.cores() << " nodes." ); - Aggregator& a = global_aggregator; - a.init(); - - s.activate(); - - BOOST_CHECK( s.cores() >= 2 ); - if( s.mycore() == 0 ) { - - // make sure we can send something - first_call_args first_args = { 1, 2.3 }; - - // try with automagic arg size discovery - Grappa_call_on( 0, &first_call, &first_args ); - - a.flush( 0 ); - a.poll( ); - BOOST_CHECK_EQUAL( 1, first_int ); - - // make sure things get sent only after flushing - second_call_args second_args = { "Foo", 1 }; - // try with manual arg size discovery - Grappa_call_on( 0, &second_call, &second_args, sizeof(second_args) ); - - // try with null payload - Grappa_call_on( 0, &second_call, &second_args, sizeof(second_args), NULL, 0 ); - - // nothing has been sent yet - BOOST_CHECK_EQUAL( 0, second_int ); - - // send - a.flush( 0 ); - a.poll( ); - BOOST_CHECK_EQUAL( 2, second_int ); - - // try with non-null payload - Grappa_call_on( 0, &second_call, &second_args, sizeof(second_args), &second_args, sizeof(second_args) ); - a.flush( 0 ); - a.poll( ); - BOOST_CHECK_EQUAL( 3, second_int ); - - BOOST_MESSAGE( "make sure we flush when full" ); - int j = 0; - size_t second_message_size = sizeof(second_args) + sizeof( AggregatorGenericCallHeader ); - for( int i = 0; i < global_aggregator.max_size() - second_message_size; i += second_message_size) { - BOOST_MESSAGE( "sending " << second_args.i << " with sum " << j << " second_int " << second_int ); - //BOOST_CHECK_EQUAL( 3, second_int ); - //Grappa_call_on( 0, &second_call, &second_args ); - second_args.i = i; - Grappa_call_on( 0, &second_call, &second_args, sizeof(second_args), NULL, 0 ); - BOOST_CHECK_EQUAL( 3, second_int ); - a.poll( ); - BOOST_CHECK_EQUAL( 3, second_int ); - j += i; - BOOST_MESSAGE( "sent " << second_args.i << " with sum " << j << " second_int " << second_int ); - } - BOOST_CHECK_EQUAL( 3, second_int ); - a.poll( ); - BOOST_CHECK_EQUAL( 3, second_int ); - second_args.i = 1; - Grappa_call_on( 0, &second_call, &second_args, sizeof(second_args), NULL, 0 ); - a.poll( ); - BOOST_CHECK_EQUAL( j + 3, second_int ); - a.flush( 0 ); - a.poll( ); - BOOST_CHECK_EQUAL( j + 3 + 1, second_int ); - - BOOST_MESSAGE("make sure the timer works"); - Grappa_call_on( 0, &first_call, &first_args); - //Grappa_call_on( 0, &first_call, &first_args, NULL, 0 ); - BOOST_CHECK_EQUAL( 1, first_int ); - int64_t initial_ts, ts; - Grappa::tick(); - for( initial_ts = ts = Grappa::timestamp(); ts - initial_ts < FLAGS_aggregator_autoflush_ticks - FLAGS_aggregator_autoflush_ticks/2; ) { - a.poll(); - // watch out---the debug allocator slows things way down and makes this fail -#ifndef STL_DEBUG_ALLOCATOR - BOOST_CHECK_EQUAL( 1, first_int ); -#endif - BOOST_MESSAGE( "initial " << initial_ts << " current " << ts ); - Grappa::tick(); - ts = Grappa::timestamp(); - } - - // watch out---the debug allocator slows things way down and makes this fail -#ifndef STL_DEBUG_ALLOCATOR - BOOST_CHECK_EQUAL( 1, first_int ); -#endif - Grappa::tick(); - for( initial_ts = ts = Grappa::timestamp(); ts - initial_ts < FLAGS_aggregator_autoflush_ticks; ) { - Grappa::tick(); - ts = Grappa::timestamp(); - } - a.poll(); - BOOST_CHECK_EQUAL( 2, first_int ); - - - // make we flush in the correct order - BOOST_CHECK_EQUAL( j + 3 + 1, second_int ); - BOOST_CHECK_EQUAL( a.remaining_size( 0 ), a.max_size() ); - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() ); - - // send to node 1 - second_args.i = 5; - Grappa_call_on( 1, &second_call, &second_args, sizeof(second_args), NULL, 0 ); - BOOST_CHECK_EQUAL( j + 3 + 1, second_int ); - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() - second_message_size ); - - // send to node 0 - second_args.i = 1; - Grappa_call_on( 0, &second_call, &second_args, sizeof(second_args), NULL, 0 ); - BOOST_CHECK_EQUAL( j + 3 + 1, second_int ); - BOOST_CHECK_EQUAL( a.remaining_size( 0 ), a.max_size() - second_message_size ); - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() - second_message_size ); - - // wait until just before timeout - // for( int i = 0; i < FLAGS_aggregator_autoflush_ticks - 1; ++i ) { - // a.poll(); - // } - Grappa::tick(); - for( initial_ts = ts = Grappa::timestamp(); ts - initial_ts < FLAGS_aggregator_autoflush_ticks - 100000; ) { - BOOST_MESSAGE( "initial " << initial_ts << " current " << ts); - // nothing has flushed yet - BOOST_CHECK_EQUAL( j + 3 + 1, second_int ); - BOOST_CHECK_EQUAL( a.remaining_size( 0 ), a.max_size() - second_message_size ); - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() - second_message_size ); - a.poll(); - Grappa::tick(); - ts = Grappa::timestamp(); - } - - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() - second_message_size ); - // one more tick! node 1 flushes - Grappa::tick(); - for( initial_ts = ts = Grappa::timestamp(); ts - initial_ts < FLAGS_aggregator_autoflush_ticks - 1000; ) { - Grappa::tick(); - ts = Grappa::timestamp(); - } - a.poll(); - BOOST_CHECK_EQUAL( j + 3 + 1, second_int ); - BOOST_CHECK_EQUAL( a.remaining_size( 0 ), a.max_size() - second_message_size ); - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() ); - - // one more tick! node 0 flushes - a.poll(); - BOOST_CHECK_EQUAL( j + 3 + 2, second_int ); - BOOST_CHECK_EQUAL( a.remaining_size( 0 ), a.max_size() ); - BOOST_CHECK_EQUAL( a.remaining_size( 1 ), a.max_size() ); - - } - // else { - // while( !done ) { - // a.poll(); - // } - // } - s.barrier(); - a.finish(); - s.finish(); - -} - BOOST_AUTO_TEST_SUITE_END(); diff --git a/system/Grappa.cpp b/system/Grappa.cpp index c81b895e8..435d830df 100644 --- a/system/Grappa.cpp +++ b/system/Grappa.cpp @@ -512,8 +512,8 @@ void Grappa_init( int * argc_p, char ** argv_p[], int64_t global_memory_size_byt CHECK( global_communicator.locale_cores <= MAX_CORES_PER_LOCALE ); - // initializes system_wide global_aggregator - global_aggregator.init(); + // // initializes system_wide global_aggregator + // global_aggregator.init(); VLOG(2) << "Aggregator initialized."; @@ -717,7 +717,7 @@ int Grappa_finish( int retval ) StateTimer::finish(); global_task_manager.finish(); - global_aggregator.finish(); + //global_aggregator.finish(); if (global_memory) delete global_memory; diff --git a/system/Message_tests.cpp b/system/Message_tests.cpp index 53f91f891..be7f92cca 100644 --- a/system/Message_tests.cpp +++ b/system/Message_tests.cpp @@ -32,8 +32,6 @@ // DAMAGE. //////////////////////////////////////////////////////////////////////// -#define LEGACY_SEND - #include #include "Grappa.hpp" diff --git a/system/RDMAAggregator.cpp b/system/RDMAAggregator.cpp index e88a6692f..f4b78572a 100644 --- a/system/RDMAAggregator.cpp +++ b/system/RDMAAggregator.cpp @@ -163,13 +163,8 @@ namespace Grappa { rdma_idle_flushes++; Core c = Grappa::mycore(); - - ///////////////////////////////////////////////////// - // came from old Grappa::impl::poll() in Grappa.cpp - // (not sure why it was polling the other aggregator...) + global_communicator.poll(); - global_aggregator.poll(); - ///////////////////////////////////////////////////// // receive_poll(); @@ -271,10 +266,6 @@ namespace Grappa { void RDMAAggregator::init() { -#ifdef LEGACY_SEND -#warning RDMA Aggregator is bypassed! -#endif -#ifdef ENABLE_RDMA_AGGREGATOR //cores_.resize( global_communicator.cores ); mycore_ = global_communicator.mycore; mynode_ = -1; // gasnet supernode @@ -290,7 +281,6 @@ namespace Grappa { aggregate_counts_[i] = 0; deaggregate_counts_[i] = 0; } -#endif if( global_communicator.mycore == 0 ) { if( !FLAGS_enable_aggregation ) { @@ -322,7 +312,6 @@ namespace Grappa { void RDMAAggregator::activate() { -#ifdef ENABLE_RDMA_AGGREGATOR // one core on each locale initializes shared data if( global_communicator.locale_mycore == 0 ) { try { @@ -400,30 +389,24 @@ namespace Grappa { } // spawn send workers -#ifndef LEGACY_SEND for( int i = 0; i < core_partner_locale_count_; ++i ) { Grappa::spawn_worker( [this, i] { DVLOG(5) << "Spawning send worker " << i << " for locale " << core_partner_locales_[i]; send_worker( core_partner_locales_[i] ); }); } -#endif // spawn receive workers -#ifndef LEGACY_SEND for( int i = 0; i < FLAGS_rdma_workers_per_core; ++i ) { Grappa::spawn_worker( [this] { receive_worker(); }); } -#endif // spawn flusher -#ifndef LEGACY_SEND Grappa::spawn_worker( [this] { idle_flusher(); }); -#endif // // // // precache buffers @@ -455,12 +438,10 @@ namespace Grappa { // } // } -#endif } void RDMAAggregator::finish() { -#ifdef ENABLE_RDMA_AGGREGATOR global_communicator.barrier(); if( global_communicator.locale_mycore == 0 ) { Grappa::impl::locale_shared_memory.segment.destroy("Cores"); @@ -473,7 +454,6 @@ namespace Grappa { if( core_partner_locales_ ) delete [] core_partner_locales_; Grappa::impl::locale_shared_memory.deallocate( rdma_buffers_ ); -#endif } void RDMAAggregator::deserialize_buffer_am( void * buf, int size, CommunicatorContext * c ) { diff --git a/system/tasks/TaskingScheduler.hpp b/system/tasks/TaskingScheduler.hpp index 640808828..dfe5e207c 100644 --- a/system/tasks/TaskingScheduler.hpp +++ b/system/tasks/TaskingScheduler.hpp @@ -235,13 +235,6 @@ class TaskingScheduler : public Scheduler { Grappa::impl::idle_flush_rdma_aggregator(); } - if ( idle_flush_aggregator() ) { - stats.prev_state = TaskingSchedulerMetrics::StateIdleUseful; - } else { - stats.prev_state = TaskingSchedulerMetrics::StateIdle; - } - - StateTimer::enterState_scheduler(); } else { *(stats.state_timers[ stats.prev_state ]) += (current_ts - prev_ts) / tick_scale;