1
- import { DownloadQueryResultsOptions , DownloadQueryResultsResult , DownloadTableCSVData , DownloadTableData , DownloadTableMemoryData , DriverInterface , ExternalDriverCompatibilities , IndexesSQL , QueryOptions , StreamOptions , StreamTableData , TableColumn , TableStructure , UnloadOptions } from '@cubejs-backend/query-orchestrator' ;
2
- import { CollectionType , Database } from 'arangojs' ;
1
+ import { BaseDriver , DownloadQueryResultsOptions , DownloadQueryResultsResult , QueryOptions , Row , TableColumn , TableStructure } from '@cubejs-backend/base-driver' ;
2
+ import { Database } from 'arangojs' ;
3
+ import { CollectionType } from 'arangojs/collection' ;
3
4
import { Config } from 'arangojs/connection' ;
4
- import { ArangoDbQuery } from './arangodb-query' ;
5
5
import { sql2aql } from './sql-utils' ;
6
6
7
7
export declare type TableMap = Record < string , TableColumn [ ] > ;
8
8
export declare type SchemaStructure = Record < string , TableMap > ;
9
9
10
- const DbTypeToGenericType = {
10
+ const ArangoToGenericType = {
11
11
number : 'double' ,
12
12
string : 'text' ,
13
13
bool : 'boolean'
14
14
} ;
15
15
16
- const sortByKeys = ( unordered ) => {
17
- const ordered = { } ;
16
+ const sortByKeys = ( unordered : any ) => {
17
+ const ordered : any = { } ;
18
18
19
19
Object . keys ( unordered ) . sort ( ) . forEach ( ( key ) => {
20
20
ordered [ key ] = unordered [ key ] ;
@@ -23,29 +23,38 @@ const sortByKeys = (unordered) => {
23
23
return ordered ;
24
24
} ;
25
25
26
- export class ArangoDbDriver implements DriverInterface {
26
+ export class ArangoDbDriver extends BaseDriver {
27
27
/**
28
28
* Returns default concurrency value.
29
+ * @return {number }
29
30
*/
30
31
public static getDefaultConcurrency ( ) : number {
31
32
return 2 ;
32
33
}
33
34
34
35
public static driverEnvVariables ( ) {
35
36
return [
36
- 'CUBEJS_DB_URL'
37
+ 'CUBEJS_DB_URL' ,
37
38
] ;
38
39
}
39
40
40
- public static dialectClass ( ) {
41
- return ArangoDbQuery ;
42
- }
43
-
44
41
private config : Config ;
45
42
46
43
private client : Database ;
47
44
48
- public constructor ( config : Config = { } ) {
45
+ public constructor (
46
+ config : Partial < Config > & {
47
+ /**
48
+ * Time to wait for a response from a connection after validation
49
+ * request before determining it as not valid. Default - 60000 ms.
50
+ */
51
+ testConnectionTimeout ?: number ,
52
+ } = { }
53
+ ) {
54
+ super ( {
55
+ testConnectionTimeout : config . testConnectionTimeout || 60000 ,
56
+ } ) ;
57
+
49
58
const auth = {
50
59
username : process . env . CUBEJS_DB_USER ,
51
60
password : process . env . CUBEJS_DB_PASS ,
@@ -75,6 +84,39 @@ export class ArangoDbDriver implements DriverInterface {
75
84
return await cursor . next ( ) ;
76
85
}
77
86
87
+ public async query < R = unknown > ( _query : string , _values ?: unknown [ ] , _options ?: QueryOptions ) : Promise < R [ ] > {
88
+ // console.log(_query, _values, _options);
89
+ const aqlQuery = sql2aql ( _query , _values ) ;
90
+ const cursor = await this . client . query ( aqlQuery ) ;
91
+ const result = await cursor . all ( ) ;
92
+
93
+ await cursor . kill ( ) ;
94
+
95
+ return result ;
96
+ }
97
+
98
+ public async downloadQueryResults ( query : string , values : unknown [ ] , _options : DownloadQueryResultsOptions ) : Promise < DownloadQueryResultsResult > {
99
+ const rows = await this . query < Row > ( query , values ) ;
100
+ const columnTypes : TableStructure = [ ] ;
101
+
102
+ Object . entries ( rows [ 0 ] ) . forEach ( ( cols ) => {
103
+ const [ column , value ] = cols ;
104
+ const type = typeof value ; // TODO: check float and integer
105
+ const genericType = ArangoToGenericType [ type ] ;
106
+
107
+ if ( ! genericType ) {
108
+ throw new Error ( `Unable to translate type for column "${ column } " with type: ${ type } ` ) ;
109
+ }
110
+
111
+ columnTypes . push ( { name : column , type : genericType } ) ;
112
+ } ) ;
113
+
114
+ return {
115
+ rows,
116
+ types : columnTypes
117
+ } ;
118
+ } ;
119
+
78
120
public async release ( ) {
79
121
await this . client . close ( ) ;
80
122
}
@@ -94,94 +136,49 @@ export class ArangoDbDriver implements DriverInterface {
94
136
for ( const collection of collections ) {
95
137
const collectionMeta = await collection . get ( ) ;
96
138
97
- if ( collectionMeta . type !== CollectionType . DOCUMENT_COLLECTION ) continue ;
98
-
99
- schema [ collection . name ] = await this . tableColumnTypes ( collection . name ) ;
139
+ if ( collectionMeta . type === CollectionType . DOCUMENT_COLLECTION ) {
140
+ schema [ collection . name ] = await this . tableColumnTypes ( collection . name ) ;
141
+ }
100
142
}
101
143
102
144
schema = sortByKeys ( schema ) ;
103
145
result [ schemaName ] = schema ;
104
146
105
- return sortByKeys ( result ) ;
106
- }
107
-
108
- public async createSchemaIfNotExists ( schemaName : string ) : Promise < any > {
109
- throw new Error ( 'Method not implemented.' ) ;
110
- }
111
-
112
- public async uploadTableWithIndexes ( table : string , columns : TableStructure , tableData : DownloadTableData , indexesSql : IndexesSQL , uniqueKeyColumns : string [ ] , queryTracingObj : any ) : Promise < void > {
113
- throw new Error ( 'Method not implemented.' ) ;
114
- }
115
-
116
- public loadPreAggregationIntoTable : ( preAggregationTableName : string , loadSql : string , params : any , options : any ) => Promise < any > =
117
- async ( preAggregationTableName : string , loadSql : string , params : any , options : any ) => {
118
- throw new Error ( 'Method not implemented.' ) ;
119
- } ;
120
-
121
- public async query < R = unknown > ( query : string , params : unknown [ ] , options ?: QueryOptions ) : Promise < R [ ] > {
122
- console . log ( query , params , options ) ;
123
- const aqlQuery = sql2aql ( query ) ;
124
- const cursor = await this . client . query ( aqlQuery ) ;
125
- const result = cursor . all ( ) ;
126
-
127
- await cursor . kill ( ) ;
128
-
129
147
return result ;
130
148
}
131
149
132
- public tableColumnTypes : ( table : string ) => Promise < TableStructure > =
133
- async ( table : string ) => {
134
- const columns : TableStructure = [ ] ;
135
- // TODO: can optimize by schema registry or swagger json schema
136
- const attrMap = await this . aggrAttrs ( table ) ;
137
- const attrNames = Object . keys ( attrMap ) ;
150
+ public async tableColumnTypes ( table : string ) : Promise < TableStructure > {
151
+ const columns : TableStructure = [ ] ;
152
+ // TODO: can optimize by schema registry or swagger json schema
153
+ const attrMap = await this . aggrAttrs ( table ) ;
154
+ const attrNames = Object . keys ( attrMap ) ;
138
155
139
- for ( const attrName of attrNames ) {
140
- const attrType = attrMap [ attrName ] ;
156
+ for ( const attrName of attrNames ) {
157
+ const attrType = attrMap [ attrName ] ;
141
158
142
- if ( this . toGenericType ( attrType ) ) {
143
- columns . push ( { name : attrName , type : attrType , attributes : [ ] } ) ;
144
- }
159
+ if ( this . toGenericType ( attrType ) ) {
160
+ columns . push ( { name : attrName , type : attrType , attributes : [ ] } ) ;
145
161
}
162
+ }
146
163
147
- return columns . sort ( ) ;
148
- } ;
149
-
150
- public getTablesQuery : ( schemaName : string ) => Promise < { table_name ?: string ; TABLE_NAME ?: string ; } [ ] > =
151
- async ( schemaName : string ) => {
152
- const collections = await this . client . collections ( ) ;
153
- return collections . map ( ( col ) => ( { table_name : col . name } ) ) ;
154
- } ;
155
-
156
- public dropTable : ( tableName : string , options ?: QueryOptions ) => Promise < unknown > =
157
- ( tableName : string , options ?: QueryOptions ) => {
158
- throw new Error ( 'Method not implemented.' ) ;
159
- } ;
160
-
161
- public downloadQueryResults : ( query : string , values : unknown [ ] , options : DownloadQueryResultsOptions ) => Promise < DownloadQueryResultsResult > =
162
- async ( query : string , values : unknown [ ] , options : DownloadQueryResultsOptions ) => {
163
- throw new Error ( 'Method not implemented.' ) ;
164
- } ;
165
-
166
- public downloadTable : ( table : string , options : ExternalDriverCompatibilities ) => Promise < DownloadTableMemoryData | DownloadTableCSVData > =
167
- async ( table : string , options : ExternalDriverCompatibilities ) => {
168
- throw new Error ( 'Method not implemented.' ) ;
169
- } ;
164
+ return columns . sort ( ) ;
165
+ }
170
166
171
167
// public stream?: (table: string, values: unknown[], options: StreamOptions) => Promise<StreamTableData>;
172
168
// public unload?: (table: string, options: UnloadOptions) => Promise<DownloadTableCSVData>;
173
169
// public isUnloadSupported?: (options: UnloadOptions) => Promise<boolean>;
174
170
175
- public nowTimestamp ( ) : number {
176
- return Date . now ( ) ;
177
- }
171
+ public toGenericType ( columnType : string ) : string {
172
+ columnType = columnType . toLowerCase ( ) ;
173
+
174
+ if ( columnType in ArangoToGenericType ) {
175
+ return ArangoToGenericType [ columnType ] ;
176
+ }
178
177
179
- // TODO: add to interface too
180
- public quoteIdentifier ( identifier : string ) {
181
- return `"${ identifier } "` ;
178
+ return super . toGenericType ( columnType ) ;
182
179
}
183
180
184
- private async aggrAttrs ( collectionName : string ) {
181
+ private async aggrAttrs ( collectionName : string ) : Promise < Record < string , string > > {
185
182
const cursor = await this . client . query ( `
186
183
FOR i IN [1]
187
184
LET attrMaps = (
@@ -196,7 +193,7 @@ FOR i IN [1]
196
193
RETURN ZIP(attributes[*].name, attributes[*].type)
197
194
)
198
195
RETURN MERGE(attrMaps)` ) ;
199
- let result : any = { id : 'string' } ;
196
+ let result : Record < string , string > = { id : 'string' } ;
200
197
201
198
if ( cursor . hasNext ) {
202
199
result = {
@@ -208,8 +205,4 @@ FOR i IN [1]
208
205
await cursor . kill ( ) ;
209
206
return result ;
210
207
}
211
-
212
- private toGenericType ( columnType ) {
213
- return DbTypeToGenericType [ columnType . toLowerCase ( ) ] || columnType ;
214
- }
215
208
}
0 commit comments