Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions cpp/src/common/allocator/my_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ struct String {
return common::E_OK;
}

FORCE_INLINE int dup_from(const char* str, common::PageArena &pa) {
len_ = strlen(str);
if (UNLIKELY(len_ == 0)) {
return common::E_OK;
}
buf_ = pa.alloc(len_);
if (IS_NULL(buf_)) {
return common::E_OOM;
}
memcpy(buf_, str, len_);
return common::E_OK;
}

FORCE_INLINE bool operator==(const String &other) const {
return equal_to(other);
}
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/common/container/bit_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ class BitMap {
*start_addr = (*start_addr) & (~bit_mask);
}

FORCE_INLINE void set_zero() {
memset(bitmap_, 0x00, size_);
}

FORCE_INLINE void set_bitmap(char* bitmap) {
memcpy(bitmap_, bitmap, size_);
}


FORCE_INLINE bool test(uint32_t index) {
uint32_t offset = index >> 3;
ASSERT(offset < size_);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/common/device_id.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class StringArrayDeviceID : public IDeviceID {
public:
explicit StringArrayDeviceID(const std::vector<std::string>& segments)
: segments_(formalize(segments)) {}
StringArrayDeviceID(const std::vector<std::string>& segments, bool fast)
:segments_(segments) {}

explicit StringArrayDeviceID(const std::string& device_id_string)
: segments_(split_device_id_string(device_id_string)) {}
Expand Down
33 changes: 31 additions & 2 deletions cpp/src/common/tablet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include <cstdlib>

#include "utils/errno_define.h"

using namespace common;

namespace storage {
Expand Down Expand Up @@ -285,6 +283,21 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name,
return add_value(row_index, measurement_name, String(val));
}

int Tablet::set_batch_data_char(uint32_t col_index, char **data) {
if (col_index > schema_vec_->size()) {
return common::E_INVALID_SCHEMA;
}

for (int i = 0; i < max_row_num_; i++) {
if (data[i] != nullptr) {
value_matrix_[col_index].string_data[i].dup_from(data[i],
page_arena_);
bitmaps_[col_index].clear(i);
}
}
return common::E_OK;
}

template int Tablet::add_value(uint32_t row_index, uint32_t schema_index,
bool val);
template int Tablet::add_value(uint32_t row_index, uint32_t schema_index,
Expand Down Expand Up @@ -323,8 +336,21 @@ void Tablet::set_column_categories(
}
}

int Tablet::set_null_value(uint32_t col_index, uint32_t row_index) {
if (col_index < 0 || col_index >= schema_vec_->size()) {
return common::E_INVALID_ARG;
}

if (row_index < 0 || row_index >= max_row_num_) {
return common::E_INVALID_ARG;
}
bitmaps_[col_index].set(row_index);
return common::E_OK;
}

std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
std::vector<std::string> id_array;
id_array.reserve(id_column_indexes_.size() + 1);
id_array.push_back(insert_target_name_);
for (auto id_column_idx : id_column_indexes_) {
common::TSDataType data_type = INVALID_DATATYPE;
Expand All @@ -339,6 +365,9 @@ std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
break;
}
}
if (id_array.size() == id_column_indexes_.size() + 1) {
return std::make_shared<StringArrayDeviceID>(id_array, true);
}
return std::make_shared<StringArrayDeviceID>(id_array);
}

Expand Down
112 changes: 84 additions & 28 deletions cpp/src/common/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "common/db_common.h"
#include "device_id.h"
#include "schema.h"
#include "utils/errno_define.h"

namespace storage {

Expand All @@ -38,10 +39,12 @@ class TabletRowIterator;
class TabletColIterator;

/**
* @brief Represents a collection of data rows with associated metadata for insertion into a table.
* @brief Represents a collection of data rows with associated metadata for
* insertion into a table.
*
* This class is used to manage and organize data that will be inserted into a specific target table.
* It handles the storage of timestamps and values, along with their associated metadata such as column names and types.
* This class is used to manage and organize data that will be inserted into a
* specific target table. It handles the storage of timestamps and values, along
* with their associated metadata such as column names and types.
*/
class Tablet {
struct ValueMatrixEntry {
Expand Down Expand Up @@ -105,7 +108,8 @@ class Tablet {
[](const std::string &name, common::TSDataType type) {
return MeasurementSchema(name, type);
});
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>(measurement_vec);
schema_vec_ =
std::make_shared<std::vector<MeasurementSchema>>(measurement_vec);
init();
}

Expand All @@ -123,7 +127,8 @@ class Tablet {
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>();
for (size_t i = 0; i < column_names.size(); i++) {
schema_vec_->emplace_back(
MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]),
MeasurementSchema(column_names[i], data_types[i],
common::get_value_encoder(data_types[i]),
common::get_default_compressor()));
}
set_column_categories(column_categories);
Expand All @@ -133,45 +138,46 @@ class Tablet {
/**
* @brief Constructs a Tablet object with the given parameters.
*
* @param column_names A vector containing the names of the columns in the tablet.
* Each name corresponds to a column in the target table.
* @param column_names A vector containing the names of the columns in the
* tablet. Each name corresponds to a column in the target table.
* @param data_types A vector containing the data types of each column.
* These must match the schema of the target table.
* @param max_rows The maximum number of rows that this tablet can hold. Defaults to DEFAULT_MAX_ROWS.
* @param max_rows The maximum number of rows that this tablet can hold.
* Defaults to DEFAULT_MAX_ROWS.
*/
Tablet(const std::vector<std::string> &column_names,
const std::vector<common::TSDataType> &data_types,
uint32_t max_rows = DEFAULT_MAX_ROWS)
: max_row_num_(max_rows),
cur_row_size_(0),
timestamps_(nullptr),
value_matrix_(nullptr),
bitmaps_(nullptr) {
const std::vector<common::TSDataType> &data_types,
uint32_t max_rows = DEFAULT_MAX_ROWS)
: max_row_num_(max_rows),
cur_row_size_(0),
timestamps_(nullptr),
value_matrix_(nullptr),
bitmaps_(nullptr) {
schema_vec_ = std::make_shared<std::vector<MeasurementSchema>>();
for (size_t i = 0; i < column_names.size(); i++) {
schema_vec_->emplace_back(
MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]),
MeasurementSchema(column_names[i], data_types[i],
common::get_value_encoder(data_types[i]),
common::get_default_compressor()));
}
init();
}

~Tablet() { destroy(); }

const std::string& get_table_name() const{
return insert_target_name_;
}
const std::string &get_table_name() const { return insert_target_name_; }
void set_table_name(const std::string &table_name) {
insert_target_name_ = table_name;
}
size_t get_column_count() const { return schema_vec_->size(); }
uint32_t get_cur_row_size() const { return cur_row_size_; }

uint32_t get_max_row_size() const { return max_row_num_; }
/**
* @brief Adds a timestamp to the specified row.
*
* @param row_index The index of the row to which the timestamp will be added.
* Must be less than the maximum number of rows.
* @param row_index The index of the row to which the timestamp will be
* added. Must be less than the maximum number of rows.
* @param timestamp The timestamp value to add.
* @return Returns 0 on success, or a non-zero error code on failure.
*/
Expand All @@ -180,12 +186,14 @@ class Tablet {
void *get_value(int row_index, uint32_t schema_index,
common::TSDataType &data_type) const;
/**
* @brief Template function to add a value of type T to the specified row and column.
* @brief Template function to add a value of type T to the specified row
* and column.
*
* @tparam T The type of the value to add.
* @param row_index The index of the row to which the value will be added.
* Must be less than the maximum number of rows.
* @param schema_index The index of the column schema corresponding to the value being added.
* @param schema_index The index of the column schema corresponding to the
* value being added.
* @param val The value to add.
* @return Returns 0 on success, or a non-zero error code on failure.
*/
Expand All @@ -196,36 +204,84 @@ class Tablet {
const std::vector<common::ColumnCategory> &column_categories);
std::shared_ptr<IDeviceID> get_device_id(int i) const;
/**
* @brief Template function to add a value of type T to the specified row and column by name.
* @brief Template function to add a value of type T to the specified row
* and column by name.
*
* @tparam T The type of the value to add.
* @param row_index The index of the row to which the value will be added.
* Must be less than the maximum number of rows.
* @param measurement_name The name of the column to which the value will be added.
* Must match one of the column names provided during construction.
* @param measurement_name The name of the column to which the value will be
* added. Must match one of the column names provided during construction.
* @param val The value to add.
* @return Returns 0 on success, or a non-zero error code on failure.
*/
template <typename T>
int add_value(uint32_t row_index, const std::string &measurement_name,
T val);

FORCE_INLINE const std::string &get_column_name(uint32_t column_index) const {
FORCE_INLINE const std::string &get_column_name(
uint32_t column_index) const {
return schema_vec_->at(column_index).measurement_name_;
}

void set_column_name(uint32_t column_index, const std::string &name) {
schema_vec_->at(column_index).measurement_name_ = name;
}

const std::map<std::string, int>& get_schema_map() const {
const std::map<std::string, int> &get_schema_map() const {
return schema_map_;
}

void set_schema_map(const std::map<std::string, int> &schema_map) {
schema_map_ = schema_map;
}

int set_batch_data_char(uint32_t col_index, char **data);
template <typename T>
int set_batch_data(uint32_t col_index, T *data, char *mask) {
if (col_index > schema_vec_->size()) {
return common::E_INVALID_ARG;
}
auto schema = schema_vec_->at(col_index);
switch (schema.data_type_) {
case common::BOOLEAN:
memcpy(value_matrix_[col_index].bool_data, data,
max_row_num_ * get_data_type_size(schema.data_type_));
break;
case common::INT32:
memcpy(value_matrix_[col_index].int32_data, data,
max_row_num_ * get_data_type_size(schema.data_type_));
break;
case common::INT64:
memcpy(value_matrix_[col_index].int64_data, data,
max_row_num_ * get_data_type_size(schema.data_type_));
break;
case common::FLOAT:
memcpy(value_matrix_[col_index].float_data, data,
max_row_num_ * get_data_type_size(schema.data_type_));
break;
case common::DOUBLE:
memcpy(value_matrix_[col_index].double_data, data,
max_row_num_ * get_data_type_size(schema.data_type_));
break;
default:;
}

int size = (max_row_num_ + 7) / 8;
for (int i = 0; i < size; i++) {
mask[i] = ~mask[i];
}
bitmaps_[col_index].set_bitmap(mask);
return common::E_OK;
}

void set_batch_timestamp(int64_t* timestamp) {
memcpy(timestamps_, timestamp, max_row_num_);
cur_row_size_ = max_row_num_;
}

int set_null_value(uint32_t col_index, uint32_t row_index);

friend class TabletColIterator;
friend class TsFileWriter;
friend struct MeasurementNamesFromTablet;
Expand Down
32 changes: 32 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,38 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) {
const int ret = w->write_record(*record);
return ret;
}
void _tablet_set_target_name(Tablet tablet, char *target_name) {
auto tab = static_cast<storage::Tablet *>(tablet);
tab->set_table_name(target_name);
}

ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void *data,
char *mask) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = 0;
ret = tab->set_batch_data(col_index, data, mask);
return ret;
}

ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char **data) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = 0;
ret = tab->set_batch_data_char(col_index, data);
return ret;
};

ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t *timestamp) {
auto tab = static_cast<storage::Tablet *>(tablet);
tab->set_batch_timestamp(timestamp);
return common::E_OK;
}

ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index,
uint32_t col_index) {
auto tab = static_cast<storage::Tablet *>(tablet);
int ret = tab->set_null_value(col_index, row_index);
return ret;
}

ERRNO _tsfile_writer_close(TsFileWriter writer) {
auto *w = static_cast<storage::TsFileWriter *>(writer);
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/cwrapper/tsfile_cwrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,11 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(float);
INSERT_DATA_INTO_TS_RECORD_BY_NAME(double);
void _tablet_set_target_name(Tablet tablet, char* target_name);
ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void* data, char* mask);
ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char** data);
ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp);
ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index);

// Write a tablet into a device.
ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet);
Expand Down
Loading