Skip to content

Naturalcandy noli/placeholder #241

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: chhwang/torch
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 82 additions & 54 deletions ark/api/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "ark/planner.hpp"
#include "codegen.hpp"
#include "env.h"
#include "external_buffer_registry.hpp"
#include "file_io.h"
#include "gpu/gpu.hpp"
#include "gpu/gpu_event.hpp"
Expand All @@ -25,7 +26,6 @@
#include "model/model_buffer.hpp"
#include "model/model_data_type.hpp"
#include "model/model_tensor.hpp"
#include "model_buffer_manager.hpp"
#include "utils/utils_net.hpp"

#if defined(ARK_CUDA)
Expand Down Expand Up @@ -143,7 +143,7 @@ static size_t tensor_stride_bytes(const Json &tensor) {

class Executor::Impl {
public:
Impl() : plan_json_(), device_id_(-1) {};
Impl() : plan_json_(), device_id_(-1){};
~Impl();

int device_id() const { return device_id_; }
Expand All @@ -159,7 +159,8 @@ class Executor::Impl {
const std::string &name() const { return name_; }

void compile(const std::string &plan, int device_id,
const std::string &name);
const std::string &name,
const std::unordered_map<Tensor, void *> &external_tensors);
void launch(Stream stream, bool loop_mode);
void run(int iter);
void wait(int64_t max_spin_count);
Expand Down Expand Up @@ -203,6 +204,7 @@ class Executor::Impl {
bool is_recording_ = false;
float elapsed_msec_ = -1;

ModelBufferManager &buffer_manager_;
std::vector<void *> external_buffers_;
std::vector<std::string> external_args_;
std::map<size_t, std::string> buffer_id_to_name_;
Expand Down Expand Up @@ -323,8 +325,8 @@ std::map<size_t, void *> Executor::Impl::init_buffer_addrs(
if (!buffer_id_to_addr_.empty()) {
buffer_id_to_addr = buffer_id_to_addr_;
}
for (const auto &kv : buffer_id_to_offset) {
buffer_id_to_addr[kv.first] = buffer->ref(kv.second);
for (const auto &[id, offset] : buffer_id_to_offset) {
buffer_id_to_addr[id] = buffer->ref(offset);
}
return buffer_id_to_addr;
}
Expand Down Expand Up @@ -403,50 +405,51 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
std::map<int, std::map<int, size_t>> remote_rank_to_send_tag_to_buffer_id;
std::map<int, std::map<int, size_t>> remote_rank_to_recv_tag_to_buffer_id;

auto &ext_buf_reg = ExternalBufferRegistry::get_instance();

// TODO: improve memory planning
size_t offset = 0;
for (auto &kv : buffer_id_to_info) {
auto &buf_info = kv.second;
int r = buf_info->buffer->rank();
const size_t buf_id = buf_info->buffer->id();
if (r != rank_ && r != -1) {
// this is a remote buffer
for (const auto &tag_info : buf_info->buffer->send_tags()) {
remote_rank_to_send_tag_to_buffer_id[buf_info->buffer->rank()]
[tag_info.second] =
buf_info->buffer->id();
[tag_info.second] = buf_id;
}
for (const auto &tag_info : buf_info->buffer->recv_tags()) {
remote_rank_to_recv_tag_to_buffer_id[buf_info->buffer->rank()]
[tag_info.second] =
buf_info->buffer->id();
[tag_info.second] = buf_id;
}
continue;
}
if (buf_info->buffer->is_external()) {
if (buf_info->buffer->device_id() != device_id_) {
void *ext_data = ext_buf_reg.get(buf_id);
if (ext_data) {
gpuPointerAttributes attr;
GLOG(gpuPointerGetAttributes(&attr, ext_data));
if (attr.device != device_id_) {
ERR(InvalidUsageError,
"PyTorch tensor and model execution are on different GPUs");
"External data provided is on a different GPU: ",
attr.device, " vs ", device_id_);
}
external_buffers_.push_back(buf_info->buffer->external_data());
const auto [it, inserted] = buffer_id_to_name_.try_emplace(
buf_info->buffer->id(),
"extern_buf_" + std::to_string(buf_info->buffer->id()));
external_args_.push_back(it->second);
external_buffers_.push_back(ext_data);
const std::string name = "extern_buf_" + std::to_string(buf_id);
external_args_.push_back(name);
buffer_id_to_name_[buf_id] = name;
continue;
}
// if we are adding a plan and come across a buffer from a previous
// plan, we utilize the buffer offset from the previous plan
if (buffer_id_to_offset_.find(buf_info->buffer->id()) !=
buffer_id_to_offset_.end()) {
external_buffers_.push_back(
buffer_id_to_addr_[buf_info->buffer->id()]);
const std::string name =
"extern_buf_" + std::to_string(buf_info->buffer->id());
if (buffer_id_to_offset_.find(buf_id) != buffer_id_to_offset_.end()) {
external_buffers_.push_back(buffer_id_to_addr_[buf_id]);
const std::string name = "extern_buf_" + std::to_string(buf_id);
external_args_.push_back(name);
buffer_id_to_name_[buf_info->buffer->id()] = name;
buffer_id_to_name_[buf_id] = name;
continue;
} else {
buffer_id_to_offset[buf_info->buffer->id()] = offset;
buffer_id_to_offset[buf_id] = offset;
for (const auto &tag_info : buf_info->buffer->send_tags()) {
remote_rank_to_send_tags_and_offsets[tag_info.first]
.first.push_back(tag_info.second);
Expand Down Expand Up @@ -536,8 +539,10 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
bootstrap->recv(tags.data(), len * sizeof(int), remote_rank, 1);
bootstrap->recv(offsets.data(), len * sizeof(size_t), remote_rank, 2);
for (int i = 0; i < len; ++i) {
if (!buffer_id_to_info[send_tag_to_buffer_id[tags[i]]]
->buffer->is_external()) {
const size_t buf_id =
buffer_id_to_info[send_tag_to_buffer_id[tags[i]]]->buffer->id();
void *buf_data = ext_buf_reg.get(buf_id);
if (buf_data == nullptr) {
buffer_id_to_offset[send_tag_to_buffer_id[tags[i]]] =
offsets[i];
}
Expand All @@ -556,8 +561,10 @@ std::map<size_t, size_t> Executor::Impl::init_buffers(const Json &plan_json) {
bootstrap->recv(tags.data(), len * sizeof(int), remote_rank, 4);
bootstrap->recv(offsets.data(), len * sizeof(size_t), remote_rank, 5);
for (int i = 0; i < len; ++i) {
if (!buffer_id_to_info[recv_tag_to_buffer_id[tags[i]]]
->buffer->is_external()) {
const size_t buf_id =
buffer_id_to_info[recv_tag_to_buffer_id[tags[i]]]->buffer->id();
void *buf_data = ext_buf_reg.get(buf_id);
if (buf_data == nullptr) {
buffer_id_to_offset[recv_tag_to_buffer_id[tags[i]]] =
offsets[i];
}
Expand Down Expand Up @@ -688,8 +695,9 @@ void Executor::Impl::init_channels(const std::set<int> &remote_ranks) {
}
}

void Executor::Impl::compile(const std::string &plan, int device_id,
const std::string &name) {
void Executor::Impl::compile(
const std::string &plan, int device_id, const std::string &name,
const std::unordered_map<Tensor, void *> &external_tensors) {
if (is_launched_) {
ERR(InvalidUsageError, "Need to stop before re-compiling.");
return;
Expand All @@ -700,10 +708,28 @@ void Executor::Impl::compile(const std::string &plan, int device_id,
} catch (const ::nlohmann::json::parse_error &e) {
ERR(InvalidUsageError, "Failed to parse the plan JSON: ", e.what());
}
for (auto &[tns, addr] : external_tensors) {
const size_t buf_id = tns.ref()->buffer()->id();
if (buffer_manager_.is_staged(buf_id)) {
buffer_manager_.set_buffer_address(buf_id, addr);
external_buffers_.push_back(addr);
const std::string name = "extern_buf_" + std::to_string(buf_id);
external_args_.push_back(name);
buffer_id_to_name_[buf_id] = name;
} else {
ERR(InvalidUsageError,
"Cannot set the buffer address for tensor with buffer:", buf_id,
" the address is already bound. "
"Address setting is only allowed for delayed binding of "
"uninitialized buffers.");
}
}
kernel_->compile();
}

void Executor::Impl::launch(Stream stream, bool loop_mode) {
void Executor::Impl::launch(
Stream stream, bool loop_mode,
const std::unordered_map<const Tensor, const void *> &placeholder_data) {
if ((kernel_ == nullptr) || !kernel_->is_compiled()) {
ERR(InvalidUsageError, "Need to compile first before launch.");
}
Expand Down Expand Up @@ -796,7 +822,9 @@ void Executor::Impl::launch(Stream stream, bool loop_mode) {
is_launched_ = true;
}

void Executor::Impl::run(int iter) {
void Executor::Impl::run(
int iter,
const std::unordered_map<const Tensor, const void *> &placeholder_data) {
if (iter <= 0) return;
if (loop_mode_) {
while (atomicLoadRelaxed(flag_->ref<int>()) > 0) {
Expand Down Expand Up @@ -876,6 +904,11 @@ void Executor::Impl::barrier() {

void *Executor::Impl::tensor_address(const Tensor &tensor) const {
size_t buffer_id = tensor.ref()->buffer()->id();
auto &ext_buf_reg = ExternalBufferRegistry::get_instance();
void *ext_data = ext_buf_reg.get(buffer_id);
if (ext_data) {
return ext_data;
}
if (buffer_id_to_addr_.find(buffer_id) == buffer_id_to_addr_.end()) {
ERR(InvalidUsageError, "Tensor has an unknown buffer ID ", buffer_id,
". This is likely caused by accessing a tensor that is optimized "
Expand All @@ -888,11 +921,6 @@ void *Executor::Impl::tensor_address(const Tensor &tensor) const {
void Executor::Impl::tensor_read(const Tensor &tensor, void *data, size_t bytes,
Stream stream, bool is_d2d) const {
GLOG(gpuSetDevice(device_id_));
if (tensor.ref()->buffer()->is_external()) {
ERR(InvalidUsageError,
"Reading data from a tensor preallocated by PyTorch is not "
"supported. Use PyTorch's native methods.");
}
std::shared_ptr<GpuStream> copy_stream;
gpuStream copy_stream_raw;
if (stream) {
Expand Down Expand Up @@ -944,11 +972,6 @@ void Executor::Impl::tensor_write(const Tensor &tensor, const void *data,
size_t bytes, Stream stream,
bool is_d2d) const {
GLOG(gpuSetDevice(device_id_));
if (tensor.ref()->buffer()->is_external()) {
ERR(InvalidUsageError,
"Writing data to a tensor preallocated by PyTorch is not "
"supported. Use PyTorch's native methods.");
}
std::shared_ptr<GpuStream> copy_stream;
gpuStream copy_stream_raw;
if (stream) {
Expand Down Expand Up @@ -1014,16 +1037,23 @@ std::string Executor::plan() const { return impl_->plan(); }

const std::string &Executor::name() const { return impl_->name(); }

void Executor::compile(const std::string &plan, int device_id,
const std::string &name) {
impl_->compile(plan, device_id, name);
void Executor::compile(
const std::string &plan, int device_id, const std::string &name,
const std::unordered_map<Tensor, void *> &external_tensors) {
impl_->compile(plan, device_id, name, external_tensors);
}

void Executor::launch(Stream stream, bool loop_mode) {
impl_->launch(stream, loop_mode);
void Executor::launch(
Stream stream, bool loop_mode,
const std::unordered_map<const Tensor, const void *> &placeholder_data) {
impl_->launch(stream, loop_mode, placeholder_data);
}

void Executor::run(int iter) { impl_->run(iter); }
void Executor::run(
int iter,
const std::unordered_map<const Tensor, const void *> &placeholder_data) {
impl_->run(iter, placeholder_data);
}

void Executor::wait(int64_t max_spin_count) { impl_->wait(max_spin_count); }

Expand All @@ -1033,10 +1063,7 @@ float Executor::stop(int64_t max_spin_count) {

void Executor::barrier() { impl_->barrier(); }

void Executor::destroy() {
ModelBufferManager::get_instance().clear_buffers();
impl_.reset(nullptr);
}
void Executor::destroy() { impl_.reset(nullptr); }

bool Executor::destroyed() const { return impl_.get() == nullptr; }

Expand Down Expand Up @@ -1071,7 +1098,8 @@ DefaultExecutor::DefaultExecutor(
}

void DefaultExecutor::launch() {
Executor::launch(reinterpret_cast<Stream>(impl_->stream_raw_), impl_->loop_mode_);
Executor::launch(reinterpret_cast<Stream>(impl_->stream_raw_),
impl_->loop_mode_);
}

} // namespace ark
12 changes: 0 additions & 12 deletions ark/api/tensor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,6 @@

namespace ark {

Tensor::Tensor(void* data_ptr, int32_t device_id,
const std::vector<int64_t>& shape, const DataType& dtype) {
size_t external_data_size = std::accumulate(shape.begin(), shape.end(), 1,
std::multiplies<int64_t>()) *
dtype.bytes();
auto buffer =
std::make_shared<ModelBuffer>(data_ptr, external_data_size, device_id);
auto tensor = std::make_shared<ModelTensor>(
dtype.ref(), buffer, Dims(shape), Dims(shape), Dims(), Dims());
ref_ = tensor;
}

size_t Tensor::id() const {
if (ref_) {
return ref_->id();
Expand Down
2 changes: 1 addition & 1 deletion ark/codegen.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

#include "ark/data_type.hpp"
#include "env.h"
#include "external_buffer_registry.hpp"
#include "file_io.h"
#include "logging.hpp"
#include "model/model_buffer.hpp"
#include "model/model_data_type.hpp"
#include "model/model_op.hpp"
#include "model/model_tensor.hpp"
#include "model_buffer_manager.hpp"
#include "range.hpp"
#include "utils/utils_math.hpp"

Expand Down
1 change: 0 additions & 1 deletion ark/codegen.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <string>

#include "model/model_json.hpp"
#include "model_buffer_manager.hpp"

namespace ark {

Expand Down
16 changes: 0 additions & 16 deletions ark/cpu_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,4 @@ double cpu_timer(void) {
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}

// Sleep in second.
int cpu_timer_sleep(double sec) {
struct timespec tspec;
tspec.tv_sec = (time_t)sec;
tspec.tv_nsec = (long)((sec - tspec.tv_sec) * 1.0e9);
return nanosleep(&tspec, 0);
}

// Sleep in nanosecond.
int cpu_ntimer_sleep(long nsec) {
struct timespec tspec;
tspec.tv_sec = 0;
tspec.tv_nsec = nsec;
return nanosleep(&tspec, 0);
}

} // namespace ark
4 changes: 0 additions & 4 deletions ark/cpu_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@ namespace ark {

// Measure current time in second.
double cpu_timer(void);
// Sleep in second.
int cpu_timer_sleep(double sec);
// Sleep in nanosecond.
int cpu_ntimer_sleep(long nsec);

} // namespace ark

Expand Down
32 changes: 32 additions & 0 deletions ark/external_buffer_registry.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

#include "external_buffer_registry.hpp"

#include "logging.hpp"

namespace ark {

ExternalBufferRegistry &ExternalBufferRegistry::get_instance() {
static ExternalBufferRegistry instance;
return instance;
}

void ExternalBufferRegistry::set(const size_t id, void *data) {
if (data == nullptr) {
ERR(InternalError, "data is nullptr.");
}
buffers_[id] = data;
}

void *ExternalBufferRegistry::get(const size_t id) const {
auto it = buffers_.find(id);
if (it != buffers_.end()) {
return it->second;
}
return nullptr;
}

void ExternalBufferRegistry::clear() { buffers_.clear(); }

} // namespace ark
Loading
Loading