Skip to content

Commit

Permalink
implement code to send the termination token to the upper mac thread
Browse files Browse the repository at this point in the history
  • Loading branch information
marenz2569 committed Jun 14, 2024
1 parent b1b5881 commit 61d359e
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 16 deletions.
7 changes: 5 additions & 2 deletions include/decoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ class Decoder {
void main_loop();

private:
std::shared_ptr<BitStreamDecoder> bit_stream_decoder_;
std::unique_ptr<IQStreamDecoder> iq_stream_decoder_;
/// The worker queue for the lower mac
std::shared_ptr<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>> lower_mac_work_queue_;

/// The reference to the upper mac thread class
std::unique_ptr<UpperMac> upper_mac_;

std::shared_ptr<BitStreamDecoder> bit_stream_decoder_;
std::unique_ptr<IQStreamDecoder> iq_stream_decoder_;

bool packed_ = false;

// input and output file descriptor
Expand Down
23 changes: 15 additions & 8 deletions include/streaming_ordered_output_thread_pool_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@
#include <optional>
#include <signal_handler.hpp>
#include <thread>
#include <variant>
#include <vector>
#if defined(__linux__)
#include <pthread.h>
#endif

struct TerminationToken {};

// thread pool executing work but outputting it the order of the input
template <typename ReturnType> class StreamingOrderedOutputThreadPoolExecutor {

public:
using ReturnTypeOrTerminationToken = std::variant<ReturnType, TerminationToken>;

StreamingOrderedOutputThreadPoolExecutor() = delete;

explicit StreamingOrderedOutputThreadPoolExecutor(int num_workers) {
Expand All @@ -43,12 +48,13 @@ template <typename ReturnType> class StreamingOrderedOutputThreadPoolExecutor {
};

~StreamingOrderedOutputThreadPoolExecutor() {
/// Queue up a termination token
for (auto& t : workers_)
t.join();
};

// append work to the queuetemplate <typename ReturnType>
void queue_work(std::function<ReturnType()> work) {
// append work to the queuetemplate <typename ReturnTypeOrTerminationToken>
void queue_work(std::function<ReturnTypeOrTerminationToken()> work) {
{
std::lock_guard<std::mutex> lock(cv_input_item_mutex_);
input_queue_.emplace_back(std::make_pair(input_counter_++, work));
Expand All @@ -57,11 +63,11 @@ template <typename ReturnType> class StreamingOrderedOutputThreadPoolExecutor {
};

// wait and get a finished item
auto get() -> ReturnType {
auto get() -> ReturnTypeOrTerminationToken {
using namespace std::chrono_literals;

for (;;) {
std::optional<ReturnType> result{};
std::optional<ReturnTypeOrTerminationToken> result{};

{
std::lock_guard<std::mutex> lk(cv_output_item_mutex_);
Expand Down Expand Up @@ -100,15 +106,16 @@ template <typename ReturnType> class StreamingOrderedOutputThreadPoolExecutor {
using namespace std::chrono_literals;

for (;;) {
std::optional<std::pair<uint64_t, std::function<ReturnType()>>> work{};
std::optional<std::pair<uint64_t, std::function<ReturnTypeOrTerminationToken()>>> work{};

{
std::lock_guard lk(cv_input_item_mutex_);
if (!input_queue_.empty()) {
work = input_queue_.front();
input_queue_.pop_front();
} else if (stop)
} else if (stop) {
break;
}
}

if (!work.has_value()) {
Expand Down Expand Up @@ -148,10 +155,10 @@ template <typename ReturnType> class StreamingOrderedOutputThreadPoolExecutor {
std::mutex cv_output_item_mutex_;

// queue_ of work with and incrementing index
std::deque<std::pair<uint64_t, std::function<ReturnType()>>> input_queue_{};
std::deque<std::pair<uint64_t, std::function<ReturnTypeOrTerminationToken()>>> input_queue_{};

// output queue_. this is a map so we can do a lookup on the current index for ordered output
std::map<uint64_t, ReturnType> output_map_{};
std::map<uint64_t, ReturnTypeOrTerminationToken> output_map_{};

// contains the value of the next input item
uint64_t input_counter_ = 0;
Expand Down
14 changes: 8 additions & 6 deletions src/decoder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,19 @@ Decoder::Decoder(unsigned receive_port, unsigned send_port, bool packed, std::op
std::optional<std::string> output_file, bool iq_or_bit_stream,
std::optional<unsigned int> uplink_scrambling_code,
const std::shared_ptr<PrometheusExporter>& prometheus_exporter)
: packed_(packed)
: lower_mac_work_queue_(std::make_shared<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>>(4))
, packed_(packed)
, uplink_scrambling_code_(uplink_scrambling_code)
, iq_or_bit_stream_(iq_or_bit_stream) {
auto lower_mac_work_queue = std::make_shared<StreamingOrderedOutputThreadPoolExecutor<LowerMac::return_type>>(4);
auto reporter = std::make_shared<Reporter>(send_port);
auto is_uplink = uplink_scrambling_code_.has_value();
auto lower_mac = std::make_shared<LowerMac>(prometheus_exporter, uplink_scrambling_code);
upper_mac_ = std::make_unique<UpperMac>(lower_mac_work_queue_, prometheus_exporter, reporter,
/*is_downlink=*/!is_uplink);
bit_stream_decoder_ =
std::make_shared<BitStreamDecoder>(lower_mac_work_queue, lower_mac, uplink_scrambling_code_.has_value());
std::make_shared<BitStreamDecoder>(lower_mac_work_queue_, lower_mac, uplink_scrambling_code_.has_value());
iq_stream_decoder_ =
std::make_unique<IQStreamDecoder>(lower_mac_work_queue, lower_mac, bit_stream_decoder_, is_uplink);
upper_mac_ = std::make_unique<UpperMac>(lower_mac_work_queue, prometheus_exporter, reporter,
/*is_downlink=*/!is_uplink);
std::make_unique<IQStreamDecoder>(lower_mac_work_queue_, lower_mac, bit_stream_decoder_, is_uplink);

// read input file from file or from socket
if (input_file.has_value()) {
Expand Down Expand Up @@ -82,6 +82,8 @@ Decoder::~Decoder() {
if (output_file_fd_.has_value()) {
close(*output_file_fd_);
}
/// Send the termination token to the upper mac worker
lower_mac_work_queue_->queue_work([]() { return TerminationToken{}; });
}

void Decoder::main_loop() {
Expand Down

0 comments on commit 61d359e

Please sign in to comment.