Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
386 commits
Select commit Hold shift + click to select a range
09dc52d
update
cpegeric Jun 20, 2025
e04b63c
update
cpegeric Jun 20, 2025
17057d7
bug fix
cpegeric Jun 20, 2025
d437339
index sinker with IndexSqlWriter
cpegeric Jun 23, 2025
5c648f3
remove comment
cpegeric Jun 23, 2025
53b6438
rename hsnw to index
cpegeric Jun 23, 2025
8069eb0
delete if vector is nil
cpegeric Jun 23, 2025
4afbe23
rename file
cpegeric Jun 23, 2025
4772c2f
support multi-indexes
cpegeric Jun 23, 2025
12429c2
cleanup
cpegeric Jun 23, 2025
b32d846
cleanup
cpegeric Jun 23, 2025
dd93d28
use constant
cpegeric Jun 23, 2025
173589a
rename file
cpegeric Jun 23, 2025
a44aacf
todo
cpegeric Jun 23, 2025
9aa078c
bvt test
cpegeric Jun 23, 2025
01fc88d
cleanup
cpegeric Jun 23, 2025
a39fa82
cleanup
cpegeric Jun 23, 2025
613544e
sca
cpegeric Jun 23, 2025
1a38d8b
add license
cpegeric Jun 23, 2025
be20232
bug fix
cpegeric Jun 23, 2025
622a888
more comments
cpegeric Jun 23, 2025
95a9a5c
delete sql
cpegeric Jun 23, 2025
4f06629
cleanup
cpegeric Jun 23, 2025
3c34919
cleanup
cpegeric Jun 23, 2025
d60145c
bug fix delete row only have 1 column pk
cpegeric Jun 24, 2025
84d945c
bug fix pre-defined column name
cpegeric Jun 24, 2025
789fa58
hardcode composite primary key column to varbinary
cpegeric Jun 24, 2025
504b616
bug fix
cpegeric Jun 24, 2025
e97612c
disable fulltext and ivfflat
cpegeric Jun 24, 2025
3e5ead4
bug fix
cpegeric Jun 24, 2025
f64f6de
Merge branch 'cdc_sqlexecutor_cleanup' into cdc_fulltext
cpegeric Jun 24, 2025
2806c24
only enable hnsw
cpegeric Jun 24, 2025
9ba390e
add async option
cpegeric Jun 25, 2025
ef894ea
skip async with DML
cpegeric Jun 25, 2025
1de75a6
catalog.IsIndexAsync
cpegeric Jun 25, 2025
9f83f62
async
cpegeric Jun 25, 2025
4d9cca5
Merge branch 'cdc_fulltext' into cdc_sqlexecutor_cleanup
cpegeric Jun 25, 2025
4e527c4
update
cpegeric Jun 25, 2025
99ce5a6
fix sca
cpegeric Jun 25, 2025
0f89628
fix merge
cpegeric Jun 25, 2025
14b0e7f
Merge branch 'main' into cdc_fulltext
cpegeric Jun 26, 2025
5143b81
Merge branch 'main' into cdc_fulltext
cpegeric Jun 27, 2025
72231a7
add cdc util
cpegeric Jun 27, 2025
7f7e096
create/delete cdc task
cpegeric Jun 27, 2025
884102e
update
cpegeric Jun 27, 2025
82b76f8
update
cpegeric Jun 27, 2025
959b527
update
cpegeric Jun 27, 2025
662fd57
Merge branch 'cdc_fulltext' into cdc_sqlexecutor_cleanup
cpegeric Jun 27, 2025
c16cf90
truncate table
cpegeric Jun 27, 2025
b423bcc
truncate table
cpegeric Jun 27, 2025
20fb6e5
update
cpegeric Jun 27, 2025
5caab53
cleanup
cpegeric Jun 27, 2025
43e9116
update
cpegeric Jun 27, 2025
a4a753d
update
cpegeric Jun 27, 2025
42a9adc
hnsw disable alter reindex
cpegeric Jun 27, 2025
2ed6051
alter reindex
cpegeric Jun 27, 2025
164693e
sca
cpegeric Jun 27, 2025
7757192
bug fix
cpegeric Jun 27, 2025
38775df
bug fix
cpegeric Jun 27, 2025
852b8c2
update
cpegeric Jun 30, 2025
d7241ae
use pitr_name
cpegeric Jun 30, 2025
b6639d2
add check pitr before create
cpegeric Jun 30, 2025
49a4cef
update
cpegeric Jun 30, 2025
f09eb13
update
cpegeric Jun 30, 2025
22e91b3
update
cpegeric Jun 30, 2025
04393a1
consumer
cpegeric Jun 30, 2025
3400935
license
cpegeric Jun 30, 2025
11c74ff
update
cpegeric Jun 30, 2025
7d106fb
update
cpegeric Jun 30, 2025
d4e19e6
use transaction from DataRetriever
cpegeric Jul 1, 2025
1704875
update watermark
cpegeric Jul 1, 2025
4c4c3a8
update
cpegeric Jul 1, 2025
7b02d81
update
cpegeric Jul 1, 2025
16600d3
statement option
cpegeric Jul 1, 2025
ab06863
statement option
cpegeric Jul 1, 2025
e83058f
snapshot
cpegeric Jul 1, 2025
952b57a
run
cpegeric Jul 1, 2025
06af88a
update
cpegeric Jul 1, 2025
c72197d
move to idxcdc
cpegeric Jul 1, 2025
1d2b203
update
cpegeric Jul 1, 2025
ffa5da7
update
cpegeric Jul 1, 2025
fc4b7d2
update idxcdc
cpegeric Jul 1, 2025
3b0ca17
update
cpegeric Jul 1, 2025
fab81f1
tail use insert, snapshot use upsert
cpegeric Jul 1, 2025
268ab6e
update
cpegeric Jul 1, 2025
d06cb28
update
cpegeric Jul 1, 2025
76b2c62
mock retriever
cpegeric Jul 1, 2025
98439c8
flush at the end
cpegeric Jul 1, 2025
e0aca74
update test
cpegeric Jul 2, 2025
352655d
update
cpegeric Jul 2, 2025
2567793
add test
cpegeric Jul 2, 2025
ed0c94b
merge fix watermarkUpdater
cpegeric Jul 2, 2025
ccf384c
update
cpegeric Jul 2, 2025
9afc4df
Merge branch 'main' into cdc_fulltext
cpegeric Jul 3, 2025
0b6cfd9
add cnUUID
cpegeric Jul 3, 2025
97113c4
remove unneccessary code
cpegeric Jul 3, 2025
7259ddc
update
cpegeric Jul 3, 2025
5aa0926
api
cpegeric Jul 3, 2025
55258af
merge fix
cpegeric Jul 8, 2025
7a6ba7e
bug fix cdc
cpegeric Jul 8, 2025
a5ed284
bvt test
cpegeric Jul 8, 2025
839a494
fix drop index
cpegeric Jul 8, 2025
6ed579a
fix sca
cpegeric Jul 8, 2025
381665e
Merge branch 'main' into cdc_fulltext
cpegeric Jul 9, 2025
346f32c
Merge branch 'main' into cdc_sqlexecutor_cleanup
cpegeric Jul 9, 2025
9edbd11
bug fix thread id
cpegeric Jul 9, 2025
e744672
Merge branch 'cdc_sqlexecutor_cleanup' into cdc_fulltext
cpegeric Jul 9, 2025
f1b1962
rename idxcdc to iscp
cpegeric Jul 22, 2025
51a0044
merge fix function id
cpegeric Jul 22, 2025
44c63ff
fix sca
cpegeric Jul 22, 2025
254f20e
fix sca
cpegeric Jul 23, 2025
cef2cac
merge fix
cpegeric Aug 18, 2025
8db5fe7
merge fix
cpegeric Aug 19, 2025
cb1f323
new index consumer
cpegeric Aug 19, 2025
16c0cd3
uncomment used code
cpegeric Aug 20, 2025
3f9d25e
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 20, 2025
718ca4b
merge fix
cpegeric Aug 22, 2025
6e02233
update
cpegeric Aug 22, 2025
19a52a3
rename
cpegeric Aug 22, 2025
5819c2d
update async bvt
cpegeric Aug 22, 2025
f1ee83b
add index consumer
cpegeric Aug 22, 2025
9eb0c4e
remove unused files
cpegeric Aug 22, 2025
bc3ed22
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 26, 2025
4b2cc9c
fix account id
cpegeric Aug 26, 2025
82fbd5c
fix data retriever mock
cpegeric Aug 26, 2025
40ccd63
merge fix
cpegeric Aug 26, 2025
9efa664
fix sca
cpegeric Aug 26, 2025
2e815bb
bug fix default with sys account and change to particular id by optio…
cpegeric Aug 27, 2025
55f769d
revert
cpegeric Aug 27, 2025
640008e
system account
cpegeric Aug 27, 2025
dfa6166
fix ivf index name
cpegeric Aug 27, 2025
3fd0782
support secondary index table
cpegeric Aug 28, 2025
4c6d1fb
support secondary index table
cpegeric Aug 28, 2025
8986c1f
Revert "support secondary index table"
cpegeric Aug 28, 2025
b34aa55
fix cast null value with type
cpegeric Aug 28, 2025
266d9bd
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 28, 2025
1c08614
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 29, 2025
45851c7
better checking with valid cdc async index
cpegeric Aug 29, 2025
bc1218f
bug fix alter table drop index
cpegeric Aug 29, 2025
6262a1e
update
cpegeric Aug 29, 2025
5e55a2d
Merge branch 'main' into cdc_fulltext_merge
cpegeric Aug 29, 2025
7c77985
Merge branch 'main' into cdc_fulltext_merge
cpegeric Sep 2, 2025
89baab0
drop database also remove iscp jobs
cpegeric Sep 2, 2025
bb06ed8
bug fix check tableIDs is empty
cpegeric Sep 2, 2025
2bd1f1b
check data is nil
cpegeric Sep 3, 2025
3ba370c
bug fix hnsw load index not free when error
cpegeric Sep 4, 2025
da6d569
limit max 2k values per insert into to avoid slow gc and OOM
cpegeric Sep 4, 2025
f23c60c
Merge branch 'main' into cdc_fulltext_merge
cpegeric Sep 4, 2025
ed631c7
add bvt tests
cpegeric Sep 4, 2025
48acc18
fix alter table add column and fulltext
cpegeric Sep 5, 2025
1f58a3d
bug fix check dropped
cpegeric Sep 5, 2025
1121aae
merge fix
cpegeric Sep 5, 2025
d407424
Merge branch 'main' into iscp_merge
LeftHandCold Sep 8, 2025
f089a44
Merge branch 'main' into iscp_merge
cpegeric Sep 8, 2025
74aea64
Merge branch 'main' into iscp_merge
cpegeric Sep 8, 2025
dd731b2
merge fix
cpegeric Sep 12, 2025
3d6874b
bug fix async option
cpegeric Sep 12, 2025
1bed417
check error
cpegeric Sep 12, 2025
e8ad5e1
add type to error message
cpegeric Sep 12, 2025
946c284
print value
cpegeric Sep 12, 2025
1a40787
Merge branch 'main' into iscp_merge
cpegeric Sep 15, 2025
adae0b2
add ut tests
cpegeric Sep 15, 2025
04d1b11
license
cpegeric Sep 15, 2025
e53d604
add more ut
cpegeric Sep 15, 2025
73a2ec0
cleanup
cpegeric Sep 15, 2025
104d882
fix sca
cpegeric Sep 15, 2025
0adf3a4
fix sca
cpegeric Sep 15, 2025
af00c96
update tests
cpegeric Sep 15, 2025
3b49c87
add vector type as input argument to hnsw_cdc_sync function
cpegeric Sep 16, 2025
83195d0
hnsw with generic types
cpegeric Sep 16, 2025
52589af
Merge branch 'main' into iscp_hnsw
mergify[bot] Sep 16, 2025
1463552
update usearch
cpegeric Sep 17, 2025
4faa38c
support f64
cpegeric Sep 17, 2025
d33d882
merge fix
cpegeric Sep 17, 2025
1e89800
Merge branch 'iscp_merge' into iscp_hnsw_merge
cpegeric Sep 17, 2025
39093b6
remove invalid comment
cpegeric Sep 17, 2025
401e636
bug fix license and update tests
cpegeric Sep 17, 2025
38c042d
fix sca
cpegeric Sep 17, 2025
86e5eca
hnsw support fp64 sync
cpegeric Sep 17, 2025
5914f6b
add bvt tests
cpegeric Sep 17, 2025
031988b
ut test
cpegeric Sep 17, 2025
2381cef
fix hnsw function test
cpegeric Sep 17, 2025
d7dd439
more ut tests
cpegeric Sep 17, 2025
600d305
rename table
cpegeric Sep 18, 2025
d1d2762
load from memory
cpegeric Sep 18, 2025
afb7755
keepalive
cpegeric Sep 18, 2025
4e8778e
Merge branch 'main' into iscp_hnsw
cpegeric Sep 19, 2025
7f50dd8
new iscp api and remove pitr
cpegeric Sep 19, 2025
5e37460
remove pitr
cpegeric Sep 19, 2025
df49221
add ut tests
cpegeric Sep 19, 2025
629a93e
more tests
cpegeric Sep 19, 2025
39fbbe6
make sure error send
cpegeric Sep 19, 2025
4b7f3d5
rename table
cpegeric Sep 19, 2025
82f24c4
support l2sq
cpegeric Sep 19, 2025
6dbb6ac
merge fix
cpegeric Sep 22, 2025
00ac2c9
add ut test
cpegeric Sep 22, 2025
f70d68f
add ut test
cpegeric Sep 22, 2025
05fcf76
license
cpegeric Sep 22, 2025
8efb51a
more ut test and comment unused code
cpegeric Sep 22, 2025
4b7a537
more ut test and comment unused code
cpegeric Sep 22, 2025
3dd22be
sync ut test
cpegeric Sep 22, 2025
36d5876
update test
cpegeric Sep 22, 2025
5f06b17
update
cpegeric Sep 22, 2025
52e45a6
add ut tests
cpegeric Sep 23, 2025
44c2e31
Merge branch 'main' into iscp_hnsw
cpegeric Sep 23, 2025
560e281
always use REPLACE and hnsw always check key exists in models
cpegeric Sep 24, 2025
0ae25bb
merge fix
cpegeric Sep 24, 2025
38a40ea
Merge branch 'main' into iscp_hnsw
cpegeric Sep 25, 2025
b33f22b
merge fix
cpegeric Sep 29, 2025
674391a
update ut
cpegeric Sep 29, 2025
023573f
add ut test
cpegeric Sep 29, 2025
e6e094e
license
cpegeric Sep 29, 2025
d5896cb
comment unused code
cpegeric Sep 29, 2025
4ca3541
fix sca
cpegeric Sep 29, 2025
5c9c0c6
remove stderr
cpegeric Sep 29, 2025
ef9e30e
update
cpegeric Sep 29, 2025
0ab3101
comment
cpegeric Sep 29, 2025
172552a
rename file
cpegeric Sep 29, 2025
b95eaff
register job at the end
cpegeric Sep 29, 2025
966d692
set size when init VectorIndexCdc
cpegeric Sep 29, 2025
0b65dc6
sql process
cpegeric Sep 30, 2025
747cd34
update hnsw sync
cpegeric Sep 30, 2025
c376e90
update func_hnsw
cpegeric Sep 30, 2025
6c03a20
update index consumer
cpegeric Sep 30, 2025
506c56c
bug fix
cpegeric Sep 30, 2025
902ee1f
bug fix
cpegeric Sep 30, 2025
72db13b
bug fix sync
cpegeric Oct 1, 2025
b2ac564
update
cpegeric Oct 1, 2025
1b80709
update
cpegeric Oct 1, 2025
40baec7
update
cpegeric Oct 1, 2025
4f84f73
update
cpegeric Oct 1, 2025
450b740
bug fix
cpegeric Oct 1, 2025
29d537f
clear cache after save
cpegeric Oct 1, 2025
3e9491a
remove hnsw_cdc_update function
cpegeric Oct 1, 2025
93d39a2
sca
cpegeric Oct 1, 2025
726ad81
remove context cancel. child should have own context with cancel
cpegeric Oct 1, 2025
fb7e170
bug fix
cpegeric Oct 1, 2025
538d9b6
context need timeout
cpegeric Oct 1, 2025
dc72aec
update
cpegeric Oct 2, 2025
50e76ab
update
cpegeric Oct 2, 2025
a162928
download all model files when start sync
cpegeric Oct 2, 2025
821a228
start from zero ts
cpegeric Oct 2, 2025
55b4b02
add startFromNow
cpegeric Oct 2, 2025
6fecc24
update bvt
cpegeric Oct 2, 2025
6036318
update comment
cpegeric Oct 2, 2025
ee19641
runTxnWithSqlContext
cpegeric Oct 2, 2025
8bacdea
update bvt
cpegeric Oct 2, 2025
c37deac
update bvt
cpegeric Oct 2, 2025
7a7d720
update alter copy
cpegeric Oct 3, 2025
996bcbc
remove commented code
cpegeric Oct 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions pkg/iscp/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,26 @@

package iscp

import "github.com/matrixorigin/matrixone/pkg/sql/plan"
import (
"github.com/matrixorigin/matrixone/pkg/sql/plan"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

func NewConsumer(
cnUUID string,
cnEngine engine.Engine,
cnTxnClient client.TxnClient,
tableDef *plan.TableDef,
jobID JobID,
info *ConsumerInfo,
) (Consumer, error) {

if info.ConsumerType == int8(ConsumerType_CNConsumer) {
return NewInteralSqlConsumer(cnUUID, tableDef, jobID, info)
return NewInteralSqlConsumer(cnUUID, cnEngine, cnTxnClient, tableDef, jobID, info)
}
if info.ConsumerType == int8(ConsumerType_IndexSync) {
return NewIndexConsumer(cnUUID, tableDef, jobID, info)
return NewIndexConsumer(cnUUID, cnEngine, cnTxnClient, tableDef, jobID, info)
}
panic("todo")

Expand Down
22 changes: 18 additions & 4 deletions pkg/iscp/data_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import (
"context"
"encoding/json"
"sync"
"time"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/cdc"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/txn/client"
)

func MarshalJobStatus(status *JobStatus) (string, error) {
Expand Down Expand Up @@ -135,7 +138,14 @@ func (r *DataRetrieverImpl) Next() *ISCPData {
return data
}

func (r *DataRetrieverImpl) UpdateWatermark(exec executor.TxnExecutor, opts executor.StatementOption) error {
func (r *DataRetrieverImpl) UpdateWatermark(ctx context.Context,
cnUUID string,
txn client.TxnOperator) error {

ctxWithSysAccount := context.WithValue(ctx, defines.TenantIDKey{}, catalog.System_Account)
ctxWithSysAccount, cancel := context.WithTimeout(ctxWithSysAccount, time.Minute*5)
defer cancel()

if r.typ == ISCPDataType_Snapshot {
return nil
}
Expand All @@ -153,8 +163,12 @@ func (r *DataRetrieverImpl) UpdateWatermark(exec executor.TxnExecutor, opts exec
statusJson,
ISCPJobState_Completed,
)
_, err = exec.Exec(updateWatermarkSQL, opts)
return err
res, err := ExecWithResult(ctxWithSysAccount, updateWatermarkSQL, cnUUID, txn)
if err != nil {
return err
}
defer res.Close()
return nil
}

func (r *DataRetrieverImpl) GetDataType() int8 {
Expand Down
226 changes: 171 additions & 55 deletions pkg/iscp/index_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ import (
"sync"
"time"

"github.com/bytedance/sonic"
"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/common/runtime"
"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/txn/client"
"github.com/matrixorigin/matrixone/pkg/util/executor"
"github.com/matrixorigin/matrixone/pkg/vectorindex"
"github.com/matrixorigin/matrixone/pkg/vectorindex/hnsw"
"github.com/matrixorigin/matrixone/pkg/vectorindex/sqlexec"
"github.com/matrixorigin/matrixone/pkg/vm/engine"
)

/* IndexConsumer */
Expand All @@ -36,44 +41,30 @@ type IndexEntry struct {
indexes []*plan.IndexDef
}

var sqlExecutorFactory = _sqlExecutorFactory

func _sqlExecutorFactory(cnUUID string) (executor.SQLExecutor, error) {
// sql executor
v, ok := runtime.ServiceRuntime(cnUUID).GetGlobalVariables(runtime.InternalSQLExecutor)
if !ok {
//os.Stderr.WriteString(fmt.Sprintf("sql executor create failed. cnUUID = %s\n", cnUUID))
return nil, moerr.NewNotSupportedNoCtx("no implement sqlExecutor")
}
exec := v.(executor.SQLExecutor)
return exec, nil
}

/* IndexConsumer */
type IndexConsumer struct {
cnUUID string
cnEngine engine.Engine
cnTxnClient client.TxnClient
jobID JobID
info *ConsumerInfo
tableDef *plan.TableDef
sqlWriter IndexSqlWriter
exec executor.SQLExecutor
rowdata []any
rowdelete []any
sqlBufSendCh chan []byte
algo string
}

var _ Consumer = new(IndexConsumer)

func NewIndexConsumer(cnUUID string,
cnEngine engine.Engine,
cnTxnClient client.TxnClient,
tableDef *plan.TableDef,
jobID JobID,
info *ConsumerInfo) (Consumer, error) {

exec, err := sqlExecutorFactory(cnUUID)
if err != nil {
return nil, err
}

ie := &IndexEntry{indexes: make([]*plan.IndexDef, 0, 3)}

for _, idx := range tableDef.Indexes {
Expand All @@ -95,21 +86,22 @@ func NewIndexConsumer(cnUUID string,
}

c := &IndexConsumer{cnUUID: cnUUID,
jobID: jobID,
info: info,
tableDef: tableDef,
sqlWriter: sqlwriter,
exec: exec,
rowdata: make([]any, len(tableDef.Cols)),
rowdelete: make([]any, 1),
cnEngine: cnEngine,
cnTxnClient: cnTxnClient,
jobID: jobID,
info: info,
tableDef: tableDef,
sqlWriter: sqlwriter,
rowdata: make([]any, len(tableDef.Cols)),
rowdelete: make([]any, 1),
algo: ie.algo,
//sqlBufSendCh: make(chan []byte),
}

return c, nil
}

func (c *IndexConsumer) run(ctx context.Context, errch chan error, r DataRetriever) {

func runIndex(c *IndexConsumer, ctx context.Context, errch chan error, r DataRetriever) {
datatype := r.GetDataType()

if datatype == ISCPDataType_Snapshot {
Expand All @@ -125,58 +117,182 @@ func (c *IndexConsumer) run(ctx context.Context, errch chan error, r DataRetriev
if !ok {
return
}
func() {
newctx, cancel := context.WithTimeout(ctx, time.Hour)

defer cancel()
//os.Stderr.WriteString("Wait for BEGIN but sql. execute anyway\n")
opts := executor.Options{}.WithAccountID(r.GetAccountID())
res, err := c.exec.Exec(newctx, string(sql), opts)
if err != nil {
logutil.Errorf("cdc indexConsumer(%v) send sql failed, err: %v, sql: %s", c.info, err, string(sql))
os.Stderr.WriteString(fmt.Sprintf("sql executor run failed. %s\n", string(sql)))
os.Stderr.WriteString(fmt.Sprintf("err :%v\n", err))
errch <- err
}
res.Close()
}()

// no transaction required and commit every time.
err := sqlexec.RunTxnWithSqlContext(ctx, c.cnEngine, c.cnTxnClient, c.cnUUID, r.GetAccountID(), 5*time.Minute,
func(sqlproc *sqlexec.SqlProcess) (err error) {
sqlctx := sqlproc.SqlCtx

res, err := ExecWithResult(sqlproc.GetContext(), string(sql), sqlctx.GetService(), sqlctx.Txn())
if err != nil {
logutil.Errorf("cdc indexConsumer(%v) send sql failed, err: %v, sql: %s", c.info, err, string(sql))
os.Stderr.WriteString(fmt.Sprintf("sql executor run failed. %s\n", string(sql)))
os.Stderr.WriteString(fmt.Sprintf("err :%v\n", err))
return err
}
res.Close()
return nil
})

if err != nil {
errch <- err
return
}
}
}

} else {
// TAIL
newctx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()
opts := executor.Options{}
err := c.exec.ExecTxn(newctx,
func(exec executor.TxnExecutor) error {

// all updates under same transaction and transaction can last very long so set timeout to 24 hours
err := sqlexec.RunTxnWithSqlContext(ctx, c.cnEngine, c.cnTxnClient, c.cnUUID, r.GetAccountID(), 24*time.Hour,
func(sqlproc *sqlexec.SqlProcess) (err error) {
sqlctx := sqlproc.SqlCtx

// TAIL
for {
select {
case <-ctx.Done():
return nil
return
case e2 := <-errch:
return e2
errch <- e2
return
case sql, ok := <-c.sqlBufSendCh:
if !ok {
// channel closed
return r.UpdateWatermark(exec, opts.StatementOption())
return r.UpdateWatermark(sqlproc.GetContext(), sqlctx.GetService(), sqlctx.Txn())
}

// update SQL
res, err := exec.Exec(string(sql), opts.StatementOption().WithAccountID(r.GetAccountID()))
var res executor.Result
res, err = ExecWithResult(sqlproc.GetContext(), string(sql), sqlctx.GetService(), sqlctx.Txn())
if err != nil {
return err
}
res.Close()
}
}
}, opts)
})

if err != nil {
errch <- err
return
}
}
}

func runHnsw[T types.RealNumbers](c *IndexConsumer, ctx context.Context, errch chan error, r DataRetriever) {

datatype := r.GetDataType()

// Suppose we shoult not use transaction here for Snapshot type and commit every time a new batch comes.
// However, HNSW only run in local without save to database until Sync.Save().
// HNSW is okay to have similar implementation to TAIL

var err error
var sync *hnsw.HnswSync[T]

// read-only sql so no need transaction here. All models are loaded at startup.
err = sqlexec.RunTxnWithSqlContext(ctx, c.cnEngine, c.cnTxnClient, c.cnUUID, r.GetAccountID(), 30*time.Minute,
func(sqlproc *sqlexec.SqlProcess) (err error) {

w := c.sqlWriter.(*HnswSqlWriter[T])
sync, err = w.NewSync(sqlproc)
return err
})

if err != nil {
errch <- err
return
}

if sync == nil {
errch <- moerr.NewInternalErrorNoCtx("failed create HnswSync")
return
}

defer func() {
if sync != nil {
sync.Destroy()
}
}()

for {
select {
case <-ctx.Done():
return
case e2 := <-errch:
errch <- e2
return
case sql, ok := <-c.sqlBufSendCh:
if !ok {
// channel closed

// we need a transaction here to save model files and update watermark
err = sqlexec.RunTxnWithSqlContext(ctx, c.cnEngine, c.cnTxnClient, c.cnUUID, r.GetAccountID(), time.Hour,
func(sqlproc *sqlexec.SqlProcess) (err error) {
sqlctx := sqlproc.SqlCtx

// save model to db
err = sync.Save(sqlproc)
if err != nil {
return
}

// update watermark
if datatype == ISCPDataType_Tail {
err = r.UpdateWatermark(sqlproc.GetContext(), sqlctx.GetService(), sqlctx.Txn())
if err != nil {
return
}
}
return

})

if err != nil {
errch <- err
return
}
return
}

// sql -> cdc
var cdc vectorindex.VectorIndexCdc[T]
err = sonic.Unmarshal(sql, &cdc)
if err != nil {
errch <- err
return
}

// HNSW models are already in local so hnsw Update should not require executing SQL or should be read-only. No transaction required.
err = sqlexec.RunTxnWithSqlContext(ctx, c.cnEngine, c.cnTxnClient, c.cnUUID, r.GetAccountID(), 30*time.Minute,
func(sqlproc *sqlexec.SqlProcess) (err error) {
return sync.Update(sqlproc, &cdc)
})

if err != nil {
errch <- err
return
}

}
}

}

func (c *IndexConsumer) run(ctx context.Context, errch chan error, r DataRetriever) {

switch c.sqlWriter.(type) {
case *HnswSqlWriter[float32]:
// init HnswSync[float32]
runHnsw[float32](c, ctx, errch, r)
case *HnswSqlWriter[float64]:
// init HnswSync[float64]
runHnsw[float64](c, ctx, errch, r)
default:
// run fulltext/ivfflat index
runIndex(c, ctx, errch, r)
}
}

func (c *IndexConsumer) processISCPData(ctx context.Context, data *ISCPData, datatype int8, errch chan error) bool {
Expand Down
Loading
Loading