diff --git a/cpp/src/common/allocator/my_string.h b/cpp/src/common/allocator/my_string.h index 9f5d8a5a0..a236bb6ca 100644 --- a/cpp/src/common/allocator/my_string.h +++ b/cpp/src/common/allocator/my_string.h @@ -60,6 +60,19 @@ struct String { return common::E_OK; } + FORCE_INLINE int dup_from(const char* str, common::PageArena &pa) { + len_ = strlen(str); + if (UNLIKELY(len_ == 0)) { + return common::E_OK; + } + buf_ = pa.alloc(len_); + if (IS_NULL(buf_)) { + return common::E_OOM; + } + memcpy(buf_, str, len_); + return common::E_OK; + } + FORCE_INLINE bool operator==(const String &other) const { return equal_to(other); } diff --git a/cpp/src/common/container/bit_map.h b/cpp/src/common/container/bit_map.h index 9d0367449..5a2b2f265 100644 --- a/cpp/src/common/container/bit_map.h +++ b/cpp/src/common/container/bit_map.h @@ -55,6 +55,15 @@ class BitMap { *start_addr = (*start_addr) & (~bit_mask); } + FORCE_INLINE void set_zero() { + memset(bitmap_, 0x00, size_); + } + + FORCE_INLINE void set_bitmap(char* bitmap) { + memcpy(bitmap_, bitmap, size_); + } + + FORCE_INLINE bool test(uint32_t index) { uint32_t offset = index >> 3; ASSERT(offset < size_); diff --git a/cpp/src/common/device_id.h b/cpp/src/common/device_id.h index 021cb6aaf..071d6f249 100644 --- a/cpp/src/common/device_id.h +++ b/cpp/src/common/device_id.h @@ -67,6 +67,8 @@ class StringArrayDeviceID : public IDeviceID { public: explicit StringArrayDeviceID(const std::vector& segments) : segments_(formalize(segments)) {} + StringArrayDeviceID(const std::vector& segments, bool fast) + :segments_(segments) {} explicit StringArrayDeviceID(const std::string& device_id_string) : segments_(split_device_id_string(device_id_string)) {} diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index ac4a2708b..94f22db65 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -21,8 +21,6 @@ #include -#include "utils/errno_define.h" - using namespace common; namespace storage { @@ -285,6 +283,21 @@ int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, return add_value(row_index, measurement_name, String(val)); } +int Tablet::set_batch_data_char(uint32_t col_index, char **data) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_SCHEMA; + } + + for (int i = 0; i < max_row_num_; i++) { + if (data[i] != nullptr) { + value_matrix_[col_index].string_data[i].dup_from(data[i], + page_arena_); + bitmaps_[col_index].clear(i); + } + } + return common::E_OK; +} + template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, bool val); template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, @@ -323,8 +336,21 @@ void Tablet::set_column_categories( } } +int Tablet::set_null_value(uint32_t col_index, uint32_t row_index) { + if (col_index < 0 || col_index >= schema_vec_->size()) { + return common::E_INVALID_ARG; + } + + if (row_index < 0 || row_index >= max_row_num_) { + return common::E_INVALID_ARG; + } + bitmaps_[col_index].set(row_index); + return common::E_OK; +} + std::shared_ptr Tablet::get_device_id(int i) const { std::vector id_array; + id_array.reserve(id_column_indexes_.size() + 1); id_array.push_back(insert_target_name_); for (auto id_column_idx : id_column_indexes_) { common::TSDataType data_type = INVALID_DATATYPE; @@ -339,6 +365,9 @@ std::shared_ptr Tablet::get_device_id(int i) const { break; } } + if (id_array.size() == id_column_indexes_.size() + 1) { + return std::make_shared(id_array, true); + } return std::make_shared(id_array); } diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index e69d477e1..c7c5db47f 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -29,6 +29,7 @@ #include "common/db_common.h" #include "device_id.h" #include "schema.h" +#include "utils/errno_define.h" namespace storage { @@ -38,10 +39,12 @@ class TabletRowIterator; class TabletColIterator; /** - * @brief Represents a collection of data rows with associated metadata for insertion into a table. + * @brief Represents a collection of data rows with associated metadata for + * insertion into a table. * - * This class is used to manage and organize data that will be inserted into a specific target table. - * It handles the storage of timestamps and values, along with their associated metadata such as column names and types. + * This class is used to manage and organize data that will be inserted into a + * specific target table. It handles the storage of timestamps and values, along + * with their associated metadata such as column names and types. */ class Tablet { struct ValueMatrixEntry { @@ -105,7 +108,8 @@ class Tablet { [](const std::string &name, common::TSDataType type) { return MeasurementSchema(name, type); }); - schema_vec_ = std::make_shared>(measurement_vec); + schema_vec_ = + std::make_shared>(measurement_vec); init(); } @@ -123,7 +127,8 @@ class Tablet { schema_vec_ = std::make_shared>(); for (size_t i = 0; i < column_names.size(); i++) { schema_vec_->emplace_back( - MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]), + MeasurementSchema(column_names[i], data_types[i], + common::get_value_encoder(data_types[i]), common::get_default_compressor())); } set_column_categories(column_categories); @@ -133,24 +138,26 @@ class Tablet { /** * @brief Constructs a Tablet object with the given parameters. * - * @param column_names A vector containing the names of the columns in the tablet. - * Each name corresponds to a column in the target table. + * @param column_names A vector containing the names of the columns in the + * tablet. Each name corresponds to a column in the target table. * @param data_types A vector containing the data types of each column. * These must match the schema of the target table. - * @param max_rows The maximum number of rows that this tablet can hold. Defaults to DEFAULT_MAX_ROWS. + * @param max_rows The maximum number of rows that this tablet can hold. + * Defaults to DEFAULT_MAX_ROWS. */ Tablet(const std::vector &column_names, - const std::vector &data_types, - uint32_t max_rows = DEFAULT_MAX_ROWS) - : max_row_num_(max_rows), - cur_row_size_(0), - timestamps_(nullptr), - value_matrix_(nullptr), - bitmaps_(nullptr) { + const std::vector &data_types, + uint32_t max_rows = DEFAULT_MAX_ROWS) + : max_row_num_(max_rows), + cur_row_size_(0), + timestamps_(nullptr), + value_matrix_(nullptr), + bitmaps_(nullptr) { schema_vec_ = std::make_shared>(); for (size_t i = 0; i < column_names.size(); i++) { schema_vec_->emplace_back( - MeasurementSchema(column_names[i], data_types[i], common::get_value_encoder(data_types[i]), + MeasurementSchema(column_names[i], data_types[i], + common::get_value_encoder(data_types[i]), common::get_default_compressor())); } init(); @@ -158,20 +165,19 @@ class Tablet { ~Tablet() { destroy(); } - const std::string& get_table_name() const{ - return insert_target_name_; - } + const std::string &get_table_name() const { return insert_target_name_; } void set_table_name(const std::string &table_name) { insert_target_name_ = table_name; } size_t get_column_count() const { return schema_vec_->size(); } uint32_t get_cur_row_size() const { return cur_row_size_; } + uint32_t get_max_row_size() const { return max_row_num_; } /** * @brief Adds a timestamp to the specified row. * - * @param row_index The index of the row to which the timestamp will be added. - * Must be less than the maximum number of rows. + * @param row_index The index of the row to which the timestamp will be + * added. Must be less than the maximum number of rows. * @param timestamp The timestamp value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -180,12 +186,14 @@ class Tablet { void *get_value(int row_index, uint32_t schema_index, common::TSDataType &data_type) const; /** - * @brief Template function to add a value of type T to the specified row and column. + * @brief Template function to add a value of type T to the specified row + * and column. * * @tparam T The type of the value to add. * @param row_index The index of the row to which the value will be added. * Must be less than the maximum number of rows. - * @param schema_index The index of the column schema corresponding to the value being added. + * @param schema_index The index of the column schema corresponding to the + * value being added. * @param val The value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -196,13 +204,14 @@ class Tablet { const std::vector &column_categories); std::shared_ptr get_device_id(int i) const; /** - * @brief Template function to add a value of type T to the specified row and column by name. + * @brief Template function to add a value of type T to the specified row + * and column by name. * * @tparam T The type of the value to add. * @param row_index The index of the row to which the value will be added. * Must be less than the maximum number of rows. - * @param measurement_name The name of the column to which the value will be added. - * Must match one of the column names provided during construction. + * @param measurement_name The name of the column to which the value will be + * added. Must match one of the column names provided during construction. * @param val The value to add. * @return Returns 0 on success, or a non-zero error code on failure. */ @@ -210,7 +219,8 @@ class Tablet { int add_value(uint32_t row_index, const std::string &measurement_name, T val); - FORCE_INLINE const std::string &get_column_name(uint32_t column_index) const { + FORCE_INLINE const std::string &get_column_name( + uint32_t column_index) const { return schema_vec_->at(column_index).measurement_name_; } @@ -218,7 +228,7 @@ class Tablet { schema_vec_->at(column_index).measurement_name_ = name; } - const std::map& get_schema_map() const { + const std::map &get_schema_map() const { return schema_map_; } @@ -226,6 +236,52 @@ class Tablet { schema_map_ = schema_map; } + int set_batch_data_char(uint32_t col_index, char **data); + template + int set_batch_data(uint32_t col_index, T *data, char *mask) { + if (col_index > schema_vec_->size()) { + return common::E_INVALID_ARG; + } + auto schema = schema_vec_->at(col_index); + switch (schema.data_type_) { + case common::BOOLEAN: + memcpy(value_matrix_[col_index].bool_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::INT32: + memcpy(value_matrix_[col_index].int32_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::INT64: + memcpy(value_matrix_[col_index].int64_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::FLOAT: + memcpy(value_matrix_[col_index].float_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + case common::DOUBLE: + memcpy(value_matrix_[col_index].double_data, data, + max_row_num_ * get_data_type_size(schema.data_type_)); + break; + default:; + } + + int size = (max_row_num_ + 7) / 8; + for (int i = 0; i < size; i++) { + mask[i] = ~mask[i]; + } + bitmaps_[col_index].set_bitmap(mask); + return common::E_OK; + } + + void set_batch_timestamp(int64_t* timestamp) { + memcpy(timestamps_, timestamp, max_row_num_); + cur_row_size_ = max_row_num_; + } + + int set_null_value(uint32_t col_index, uint32_t row_index); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc b/cpp/src/cwrapper/tsfile_cwrapper.cc index 371e8ced4..b0088e9bd 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.cc +++ b/cpp/src/cwrapper/tsfile_cwrapper.cc @@ -666,6 +666,38 @@ ERRNO _tsfile_writer_write_ts_record(TsFileWriter writer, TsRecord data) { const int ret = w->write_record(*record); return ret; } +void _tablet_set_target_name(Tablet tablet, char *target_name) { + auto tab = static_cast(tablet); + tab->set_table_name(target_name); +} + +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void *data, + char *mask) { + auto tab = static_cast(tablet); + int ret = 0; + ret = tab->set_batch_data(col_index, data, mask); + return ret; +} + +ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char **data) { + auto tab = static_cast(tablet); + int ret = 0; + ret = tab->set_batch_data_char(col_index, data); + return ret; +}; + +ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t *timestamp) { + auto tab = static_cast(tablet); + tab->set_batch_timestamp(timestamp); + return common::E_OK; +} + +ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, + uint32_t col_index) { + auto tab = static_cast(tablet); + int ret = tab->set_null_value(col_index, row_index); + return ret; +} ERRNO _tsfile_writer_close(TsFileWriter writer) { auto *w = static_cast(writer); diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h b/cpp/src/cwrapper/tsfile_cwrapper.h index d43e5dce0..b01cc6a3d 100644 --- a/cpp/src/cwrapper/tsfile_cwrapper.h +++ b/cpp/src/cwrapper/tsfile_cwrapper.h @@ -548,6 +548,11 @@ INSERT_DATA_INTO_TS_RECORD_BY_NAME(int64_t); INSERT_DATA_INTO_TS_RECORD_BY_NAME(bool); INSERT_DATA_INTO_TS_RECORD_BY_NAME(float); INSERT_DATA_INTO_TS_RECORD_BY_NAME(double); +void _tablet_set_target_name(Tablet tablet, char* target_name); +ERRNO _tablet_set_batch_data(Tablet tablet, uint32_t col_index, void* data, char* mask); +ERRNO _tablet_set_batch_str(Tablet tablet, uint32_t col_index, char** data); +ERRNO _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp); +ERRNO _tablet_mark_null_value(Tablet tablet, uint32_t row_index, uint32_t col_index); // Write a tablet into a device. ERRNO _tsfile_writer_write_tablet(TsFileWriter writer, Tablet tablet); diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc index 71863f0c7..57de63ca6 100644 --- a/cpp/test/common/tablet_test.cc +++ b/cpp/test/common/tablet_test.cc @@ -61,4 +61,49 @@ TEST(TabletTest, LargeQuantities) { EXPECT_EQ(tablet.get_column_count(), schema_vec.size()); } +TEST(TabletTest, TabletBatchReadWrite) { + std::vector column_names = { + "id1", "id2", "id3", "id4","id5","id6" + }; + std::vector datatypes = { + common::TSDataType::BOOLEAN, common::TSDataType::INT32, + common::TSDataType::INT64, common::TSDataType::FLOAT, + common::TSDataType::DOUBLE, common::TSDataType::STRING + }; + Tablet tablet(column_names, datatypes, 100); + bool bool_vec[100] = {false}; + bool_vec[10] = true; + + common::TSDataType datatype; + char* mask = new char[(100 + 7)/8]; + for (int i = 0; i < (100 + 7)/8; i++) { + mask[i] = 0xff; + } + tablet.set_batch_data(0, bool_vec, mask); + ASSERT_TRUE(*(bool*)(tablet.get_value(10, 0, datatype))); + ASSERT_EQ(common::TSDataType::BOOLEAN, datatype); + int32_t i32_vec[100] = {false}; + i32_vec[99] = 123; + for (int i = 0; i < (100 + 7)/8; i++) { + mask[i] = 0xff; + } + tablet.set_batch_data(1, i32_vec, mask); + ASSERT_EQ(0, *(int32_t *)(tablet.get_value(10, 1, datatype))); + ASSERT_EQ(123, *(int32_t *)(tablet.get_value(99, 1, datatype))); + char** str = (char**) malloc(100 * sizeof(char*)); + for (int i = 0; i < 100; i++) { + str[i] = strdup(std::string("val" + std::to_string(i)).c_str()); + } + tablet.set_batch_data_char(5, str); + ASSERT_EQ(common::String("val10"), *(common::String*)tablet.get_value(10, 5, datatype)); + + tablet.set_null_value(5, 20); + ASSERT_EQ(nullptr, tablet.get_value(20, 5, datatype)); + for (int i = 0; i < 100; i++) { + free(str[i]); + } + free(str); + delete [] mask; +} + } // namespace storage \ No newline at end of file diff --git a/python/setup.py b/python/setup.py index 6edeea0b9..84727278d 100644 --- a/python/setup.py +++ b/python/setup.py @@ -24,7 +24,7 @@ import shutil import os -version = "2.1.0.dev0" +version = "2.1.0.dev" system = platform.system() def copy_tsfile_lib(source_dir, target_dir, suffix): diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index df51bcfad..d15adf4f4 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -32,4 +32,5 @@ from .exceptions import * from .tsfile_reader import TsFileReaderPy as TsFileReader, ResultSetPy as ResultSet from .tsfile_writer import TsFileWriterPy as TsFileWriter -from .tsfile_table_writer import TsFileTableWriter \ No newline at end of file +from .tsfile_writer import CTablet +from .tsfile_table_writer import TsFileTableWriter diff --git a/python/tsfile/tsfile_cpp.pxd b/python/tsfile/tsfile_cpp.pxd index b4f9ccf86..5acad9895 100644 --- a/python/tsfile/tsfile_cpp.pxd +++ b/python/tsfile/tsfile_cpp.pxd @@ -130,7 +130,7 @@ cdef extern from "./tsfile_cwrapper.h": TSDataType * data_types, int column_num, int max_rows); - Tablet tablet_new(const char** column_names, TSDataType * data_types, int column_num); + Tablet tablet_new(const char** column_names, TSDataType * data_types, int column_num, uint32_t max_rows); ErrorCode tablet_add_timestamp(Tablet tablet, uint32_t row_index, int64_t timestamp); ErrorCode tablet_add_value_by_index_int64_t(Tablet tablet, uint32_t row_index, uint32_t column_index, @@ -158,6 +158,11 @@ cdef extern from "./tsfile_cwrapper.h": void _free_tsfile_ts_record(TsRecord * record); + void _tablet_set_target_name(Tablet tablet, char * target_name); + ErrorCode _tablet_set_batch_data(Tablet tablet, uint32_t col_index, const void* data, char* mask); + ErrorCode _tablet_set_batch_str(Tablet tablet, uint32_t col_index, const char** data); + ErrorCode _tablet_set_batch_timestamp(Tablet tablet, int64_t* timestamp); + # resulSet : query data from tsfile reader ResultSet tsfile_query_table(TsFileReader reader, const char * table_name, diff --git a/python/tsfile/tsfile_table_writer.py b/python/tsfile/tsfile_table_writer.py index c312a0ed4..9b49b1769 100644 --- a/python/tsfile/tsfile_table_writer.py +++ b/python/tsfile/tsfile_table_writer.py @@ -17,7 +17,7 @@ # from tsfile import TableSchema, Tablet, TableNotExistError -from tsfile import TsFileWriter +from tsfile import TsFileWriter, CTablet class TsFileTableWriter: @@ -53,6 +53,13 @@ def write_table(self, tablet: Tablet): raise TableNotExistError self.writer.write_table(tablet) + def write_ctablet(self, tablet: CTablet): + if self.exclusive_table_name_ is None: + raise TableNotExistError + tablet.set_target_name(self.exclusive_table_name_) + self.writer.write_ctablet(tablet) + + def close(self): """ Close TsFileTableWriter and will flush data automatically. diff --git a/python/tsfile/tsfile_writer.pyx b/python/tsfile/tsfile_writer.pyx index a670e5cb9..f3562af52 100644 --- a/python/tsfile/tsfile_writer.pyx +++ b/python/tsfile/tsfile_writer.pyx @@ -17,14 +17,179 @@ # #cython: language_level=3 +import pandas as pd +import numpy as np + +from cpython.unicode cimport PyUnicode_AsUTF8String + +from libc.stdlib cimport free +from libc.stdlib cimport malloc +from libc.string cimport strdup from .tsfile_cpp cimport * from .tsfile_py_cpp cimport * +import numpy as np +import pandas as pd +cimport numpy as cnp from tsfile.row_record import RowRecord from tsfile.schema import TimeseriesSchema as TimeseriesSchemaPy, DeviceSchema as DeviceSchemaPy from tsfile.schema import TableSchema as TableSchemaPy from tsfile.tablet import Tablet as TabletPy +from tsfile.constants import TSDataType as TSDataTypePy + +_pandas_dtype_to_ts = { + "bool": TSDataTypePy.BOOLEAN, + "int32": TSDataTypePy.INT32, + "int64": TSDataTypePy.INT64, + "float32": TSDataTypePy.FLOAT, + "float64": TSDataTypePy.DOUBLE, + "string": TSDataTypePy.STRING +} + +cdef bint is_compatible(object expected, object actual): + if expected == actual: + return True + if expected == TSDataTypePy.INT64 and actual == TSDataTypePy.INT32: + return True + if expected == TSDataTypePy.DOUBLE and actual == TSDataTypePy.FLOAT: + return True + return False + +cdef object convert_series(object series, object target): + dtype_map = { + TSDataTypePy.INT64: "int64", + TSDataTypePy.INT32: "int32", + TSDataTypePy.FLOAT: "float32", + TSDataTypePy.DOUBLE: "float64", + TSDataTypePy.BOOLEAN: "bool", + TSDataTypePy.STRING: "str", + } + + target_str = dtype_map.get(target) + if str(series.dtype) == target_str: + return series.to_numpy() + return series.astype(target_str).to_numpy() +cdef encode_or_null(x): + if pd.isna(x): + return None + return str(x).encode('utf-8') + +cdef class CTablet: + cdef Tablet tablet + cdef object column_name + cdef object data_type + cdef object max_row_num + cdef char** column_names + cdef int column_num + cdef TSDataType * column_data_types + + def __init__(self, column_name: list[str], data_types: list[TSDataTypePy], max_row_num: int = 1024): + + self.column_name = column_name + self.data_type = data_types + self.max_row_num = max_row_num + self.column_num = len(column_name) + if len(data_types) != self.column_num: + raise ValueError("Length of column_name and data_types must be equal") + self.column_names = malloc(sizeof(char*) * self.column_num) + self.column_data_types = malloc(sizeof(TSDataType) * self.column_num) + + ind = 0 + for name, dtype in zip(column_name, data_types): + self.column_names[ind] = strdup(name.encode('utf-8')) + self.column_data_types[ind] = to_c_data_type(dtype) + ind = ind + 1 + + cpdef set_target_name(self, object target_name): + cdef bytes device_id_bytes + cdef char * device_id_c + device_id_bytes = PyUnicode_AsUTF8String(target_name) + device_id_c = device_id_bytes + _tablet_set_target_name(self.tablet, device_id_c) + cdef Tablet get_tablet(self): + return self.tablet + + cdef init_c_tablet(self): + if self.tablet != NULL: + free_tablet(&self.tablet) + self.tablet = tablet_new(self.column_names, self.column_data_types, self.column_num, self.max_row_num) + + cpdef from_data_frame(self, data_frame: pd.DataFrame): + cdef void * data_ptr + cdef char * mask_ptr + cdef size_t length + cdef char** str_ptr + cdef bytes item + cdef int64_t* time_ptr + cdef cnp.ndarray[cnp.int64_t, ndim=1] time_array + if not isinstance(data_frame, pd.DataFrame): + raise TypeError("Input must be a pandas DataFrame") + if data_frame.shape[1] != len(self.column_name) + 1: + raise ValueError(f"DataFrame column count {data_frame.shape[1]} doesn't match expected {len(self.column_name) + 1}") + + if "time" not in data_frame.columns: + raise ValueError("Missing required column: 'time'") + if not pd.api.types.is_integer_dtype(data_frame["time"]): + raise TypeError("Column 'time' must be of integer type") + if data_frame["time"].dtype != np.int64: + raise TypeError(f"Column 'time' must be int64, but got {data_frame['time'].dtype}") + + if self.max_row_num !=len(data_frame["time"]): + raise ValueError(f"Time column length {len(data_frame['time'])} doesn't match expected {self.max_row_num}") + + self.init_c_tablet() + time_array = data_frame["time"].to_numpy() + time_ptr = time_array.data + _tablet_set_batch_timestamp(self.tablet, time_ptr) + + for ind, col_name in enumerate(self.column_name): + if col_name not in data_frame.columns: + raise KeyError(f"Column '{col_name}' missing from DataFrame") + series = data_frame[col_name] + dtype_str = str(series.dtype) + if dtype_str not in _pandas_dtype_to_ts: + raise TypeError(f"Unsupported pandas dtype {dtype_str} for column {col_name}") + actual_ts_type = _pandas_dtype_to_ts[dtype_str] + expected_ts_type = self.data_type[ind] + if not is_compatible(expected_ts_type, actual_ts_type): + raise TypeError( + f"Column '{col_name}' type mismatch: expected {expected_ts_type.name}, got {actual_ts_type.name}") + + if expected_ts_type == TSDataTypePy.STRING: + str_ptr = malloc(sizeof(char*) * self.max_row_num) + array = series.fillna("").astype(str).apply(encode_or_null).to_numpy(dtype=object) + for i in range(self.max_row_num): + if array[i] is None: + str_ptr[i] = NULL + else: + str_ptr[i] = strdup(array[i]) + _tablet_set_batch_str(self.tablet, ind, str_ptr) + for i in range(self.max_row_num): + if str_ptr[i] != NULL: + free(str_ptr[i]) + free(str_ptr) + else: + array = convert_series(series, expected_ts_type) + mask = series.notna().to_numpy().astype(np.byte) + data_ptr = cnp.PyArray_DATA(array) + mask_ptr = cnp.PyArray_DATA(mask) + _tablet_set_batch_data(self.tablet, ind, data_ptr, mask_ptr) + + def __dealloc__(self): + if self.tablet != NULL: + free_tablet(&self.tablet) + self.tablet = NULL + + if self.column_names != NULL: + for i in range(self.column_num): + free(self.column_names[i]) + free(self.column_names) + self.column_names = NULL + + if self.column_data_types != NULL: + free(self.column_data_types) + self.column_data_types = NULL cdef class TsFileWriterPy: cdef TsFileWriter writer @@ -113,6 +278,13 @@ cdef class TsFileWriterPy: finally: free_c_tablet(ctablet) + + def write_ctablet(self, tablet: CTablet): + cdef ErrorCode errno + errno = _tsfile_writer_write_table(self.writer, tablet.get_tablet()) + check_error(errno) + + cpdef close(self): """ Flush data and Close tsfile writer.