Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
91812b9
Moves message handling to RAJA from Camp
bechols97 Apr 28, 2025
c88179c
Adds unit tests for message handler
bechols97 Apr 28, 2025
0096451
Adds additional assert's to tests
bechols97 May 12, 2025
028dd3b
Initial setup for queue view
bechols97 May 12, 2025
41172d8
Updates to separate functionality
bechols97 May 12, 2025
78aaa3d
Adds example and some comments
bechols97 May 13, 2025
9b7d615
Re-formats messaging code
bechols97 May 27, 2025
8a6d4b2
Fixes bug in example
bechols97 May 27, 2025
94114f8
Merge branch 'develop' into feature/bechols97/device_messages
bechols97 Jul 21, 2025
3c1a73f
Adds factory functions and deduction guides for message_handler
bechols97 Jul 21, 2025
9f57022
Runs clang-format
bechols97 Jul 21, 2025
b875e20
Adds spsc queue policy
bechols97 Aug 4, 2025
3c0fb38
Adds a fixed sized string to example
bechols97 Aug 4, 2025
53a2a70
Fixes formatting with clang-format
bechols97 Aug 4, 2025
13abf53
Adds factory function to use default resource with execution policy
bechols97 Aug 5, 2025
8f24bf2
Merge branch 'develop' into feature/bechols97/device_messages
bechols97 Nov 5, 2025
63da19f
Refactors storing messages to be more generic
bechols97 Nov 5, 2025
4afd67f
Fixes build with example
bechols97 Nov 5, 2025
b5bf8b0
Fixes message alignment issues
bechols97 Nov 11, 2025
72f4c42
Formats code
bechols97 Nov 11, 2025
4ff9f00
Merge branch 'develop' into feature/bechols97/device_messages
rhornung67 Dec 1, 2025
38f8ed5
Changes to message callbacks
bechols97 Jan 6, 2026
c816f67
Adds subscribe/unsubscribe functionality
bechols97 Jan 20, 2026
6962633
Adds tests for subscribe/unsubscribe member functions
bechols97 Jan 20, 2026
a1ac472
Fixes passing to callback list
bechols97 Jan 20, 2026
f388bcc
Removes extra size member variable
bechols97 Jan 20, 2026
c13d501
Supports getting a container of all messages
bechols97 Jan 20, 2026
a5688f7
Merge branch 'feature/bechols97/device_messages' of ssh://github.com/…
bechols97 Jan 20, 2026
3d6a37a
Merge branch 'develop' into feature/bechols97/device_messages
bechols97 Jan 20, 2026
6c6776c
Formats code
bechols97 Jan 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions include/RAJA/RAJA.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@
//
#include "RAJA/util/Span.hpp"

//
// Message handler to pass messages between host and device
//
#include "RAJA/util/messages.hpp"

//
// zip iterator to iterator over sequences simultaneously
//
Expand Down
216 changes: 216 additions & 0 deletions include/RAJA/util/messages.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
/*!
******************************************************************************
*
* \file
*
* \brief RAJA header file defining a GPU to CPU message handler class.
*
******************************************************************************
*/

//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~//
// Copyright (c) 2016-25, Lawrence Livermore National Security, LLC
// and RAJA project contributors. See the RAJA/LICENSE file for details.
//
// SPDX-License-Identifier: (BSD-3-Clause)
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~//

#ifndef RAJA_MESSAGES_HPP
#define RAJA_MESSAGES_HPP

#include <algorithm>
#include <functional>
#include "RAJA/pattern/atomic.hpp"

// TODO: should these use the RAJA headers instead?
#include "camp/tuple.hpp"
#include "camp/resource.hpp"

namespace RAJA
{
///
/// Queue for storing messages. Fills buffer up to capacity.
/// Once at capacity, messages are discarded.
///
/// TODO: Currently, messages can be discarded. This is fine
/// if the use case is storing the first error message(s). However,
/// are there other use cases that need to read and write at
/// the same time?
///
template <typename T>
class message_queue
{
public:
using value_type = T;
using size_type = unsigned long long;
using pointer = value_type*;
using const_pointer = const value_type*;
using iterator = pointer;
using const_iterator = const_pointer;

message_queue() : m_capacity{0}, m_size{0}, m_buf{nullptr}
{}
message_queue(size_type capacity, pointer buf) :
m_capacity{capacity}, m_size{0}, m_buf{buf}
{}

template <typename... Ts>
RAJA_HOST_DEVICE
bool try_emplace(Ts&&... args) {
// TODO: does this need to be moved to system atomics instead
// Currently, the general use case is just storing messages
// on device and performing callback on host (i.e. not storing
// messages on host and device at the same time).
auto local_size = RAJA::atomicInc<auto_atomic>(&m_size);
if (m_buf != nullptr && local_size < m_capacity) {
m_buf[local_size] = T(std::forward<Ts>(args)...);
return true;
}

return false;
}

constexpr pointer data() noexcept {
return m_buf;
}

constexpr const_pointer data() const noexcept {
return m_buf;
}

constexpr size_type capacity() const noexcept {
return m_capacity;
}

constexpr size_type size() const noexcept {
return std::min(m_capacity, m_size);
}

constexpr bool empty() const noexcept {
return size() == 0;
}

constexpr iterator begin() noexcept {
return data();
}

constexpr const_iterator begin() const noexcept {
return const_iterator(data());
}

constexpr const_iterator cbegin() const noexcept {
return const_iterator(data());
}

constexpr iterator end() noexcept {
return data()+size();
}

constexpr const_iterator end() const noexcept {
return const_iterator(data()+size());
}

constexpr const_iterator cend() const noexcept {
return const_iterator(data()+size());
}

void clear() noexcept
{
m_size = 0;
}
private:
size_type m_capacity;
size_type m_size;
pointer m_buf;
};

template <typename Callable>
class message_handler;

///
/// Provides a way to handle messages from a GPU. This currently
/// stores messages from the GPU and then calls a callback
/// function from the host.
///
/// Note:
/// Currently, this forces a synchronize prior to calling
/// the callback function or testing if there are any messages.
///
template <typename R, typename... Args>
class message_handler<R(Args...)>
{
public:
using message = camp::tuple<std::decay_t<Args>...>;
using msg_queue = message_queue<message>;
using callback_type = std::function<R(Args...)>;

public:
template <typename Callable>
message_handler(const std::size_t num_messages, Callable c)
: m_res{camp::resources::Host()},
m_queue{num_messages, m_res.allocate<message>(num_messages,
camp::resources::MemoryAccess::Pinned)},
m_callback{c}
{}

template <typename Resource, typename Callable>
message_handler(const std::size_t num_messages, Resource res,
Callable c)
: m_res{res},
m_queue{num_messages, m_res.allocate<message>(num_messages,
camp::resources::MemoryAccess::Pinned)},
m_callback{c}
{}

~message_handler()
{
m_res.wait();
m_res.deallocate(m_queue.data(), camp::resources::MemoryAccess::Pinned);
}

// Doesn't support copying
message_handler(const message_handler&) = delete;
message_handler& operator=(const message_handler&) = delete;

// TODO need proper move support
// Move ctor/operator
message_handler(message_handler&&) = delete;
message_handler& operator=(message_handler&&) = delete;

template <typename... Ts>
RAJA_HOST_DEVICE
bool try_post_message(Ts&&... args)
{
return m_queue.try_emplace(camp::make_tuple(std::forward<Ts>(args)...));
}

void clear()
{
m_res.wait();
m_queue.clear();
}

bool test_any()
{
m_res.wait();
return !m_queue.empty();
}

void wait_all()
{
if (test_any()) {
for (const auto& msg: m_queue) {
camp::apply(m_callback, msg);
}
clear();
}
}

private:
camp::resources::Resource m_res;
msg_queue m_queue;
callback_type m_callback;
};
}

#endif /* RAJA_MESSAGES_HPP */
1 change: 1 addition & 0 deletions test/unit/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ add_subdirectory(view-layout)
add_subdirectory(algorithm)
add_subdirectory(workgroup)
add_subdirectory(indexing)
add_subdirectory(messages)
11 changes: 11 additions & 0 deletions test/unit/messages/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
###############################################################################
# Copyright (c) 2016-25, Lawrence Livermore National Security, LLC
# and RAJA project contributors. See the RAJA/LICENSE file for details.
#
# SPDX-License-Identifier: (BSD-3-Clause)
###############################################################################

raja_add_test(
NAME test-messages
SOURCES test-messages.cpp)

100 changes: 100 additions & 0 deletions test/unit/messages/test-messages.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~//
// Copyright (c) 2018-25, Lawrence Livermore National Security, LLC
// and Camp project contributors. See the camp/LICENSE file for details.
//
// SPDX-License-Identifier: (BSD-3-Clause)
//~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~//

#include "camp/array.hpp"
#include "RAJA_test-base.hpp"

#include "gtest/gtest.h"

TEST(message_handler, initialize) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, [&](int val) {
test = val;
});

ASSERT_EQ(msg.test_any(), false);
}

TEST(message_handler, initialize_with_resource) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, camp::resources::Host(), [&](int val) {
test = val;
});

ASSERT_EQ(msg.test_any(), false);
}

TEST(message_handler, clear) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, [&](int val) {
test = val;
});
msg.try_post_message(5);
msg.clear();
msg.wait_all();

ASSERT_EQ(test, 0);
}

TEST(message_handler, try_post_message) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, [&](int val) {
test = val;
});

ASSERT_EQ(msg.try_post_message(5), true);
}

TEST(message_handler, try_post_message_overflow) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, [&](int val) {
test = val;
});
ASSERT_EQ(msg.try_post_message(5), true);
ASSERT_EQ(msg.try_post_message(7), false);
}

TEST(message_handler, wait_all) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, [&](int val) {
test = val;
});
ASSERT_EQ(msg.try_post_message(1), true);
msg.wait_all();

ASSERT_EQ(test, 1);
}

TEST(message_handler, wait_all_array) {
camp::array<int, 3> test = {0, 0, 0};
RAJA::message_handler<void(camp::array<int, 3>)> msg(1,
[&](camp::array<int, 3> val) {
test[0] = val[0];
test[1] = val[1];
test[2] = val[2];
}
);
camp::array<int, 3> a{1,2,3};
ASSERT_EQ(msg.try_post_message(a), true);
msg.wait_all();

ASSERT_EQ(test[0], 1);
ASSERT_EQ(test[1], 2);
ASSERT_EQ(test[2], 3);
}

TEST(message_handler, wait_all_overflow) {
int test = 0;
RAJA::message_handler<void(int)> msg(1, [&](int val) {
test = val;
});
ASSERT_EQ(msg.try_post_message(1), true);
ASSERT_EQ(msg.try_post_message(2), false);
msg.wait_all();

ASSERT_EQ(test, 1);
}
Loading