22
33import io .tiledb .cloud .rest_api .ApiException ;
44import io .tiledb .cloud .rest_api .api .SqlApi ;
5- import io .tiledb .cloud .rest_api .model .ResultFormat ;
65import io .tiledb .cloud .rest_api .model .SQLParameters ;
76import org .apache .arrow .memory .RootAllocator ;
7+ import org .apache .arrow .vector .FieldVector ;
8+ import org .apache .arrow .vector .ValueVector ;
89import org .apache .arrow .vector .VectorSchemaRoot ;
910import org .apache .arrow .vector .ipc .ArrowStreamReader ;
10- import org .apache .arrow .vector .types . pojo . Schema ;
11+ import org .apache .arrow .vector .util . TransferPair ;
1112
1213import java .io .ByteArrayInputStream ;
1314import java .io .IOException ;
14- import java .util .ArrayList ;
15- import java .util .Arrays ;
16- import java .util .List ;
17- import java .util .Objects ;
15+ import java .util .*;
16+ import org .apache .arrow .compression .CommonsCompressionFactory ;
1817
19- public class TileDBSQL {
20- String namespace ;
18+ public class TileDBSQL implements AutoCloseable {
19+ private String namespace ;
2120
22- SQLParameters sql ;
21+ private SQLParameters sql ;
2322
24- TileDBClient tileDBClient ;
23+ private TileDBClient tileDBClient ;
2524
26- SqlApi apiInstance ;
25+ private SqlApi apiInstance ;
2726
28- ArrayList <VectorSchemaRoot > readBatches ;
27+ private ArrayList <VectorSchemaRoot > readBatches ;
2928
30- List <Object > results ;
29+ private List <Object > results ;
30+
31+ private ArrowStreamReader reader ;
3132
3233 /**
3334 *
@@ -48,29 +49,40 @@ public TileDBSQL(TileDBClient tileDBClient, String namespace, SQLParameters sql)
4849
4950 /**
5051 * Exec an SQL query and get results in arrow format.
52+ *
53+ * @return A pair that consists of an ArrayList of all valueVectors and the
54+ * number of batches read.
5155 */
52- public void execArrow (){
56+ public io . tiledb . java . api . Pair < ArrayList < ValueVector >, Integer > execArrow (){
5357 try {
5458 assert sql .getResultFormat () != null ;
55- byte [] bytes = apiInstance .runSQLBytes (namespace , sql , sql .getResultFormat ().toString ());
56- System .out .println (Arrays .toString (bytes ));
59+ byte [] bytes = apiInstance .runSQLBytes (namespace , sql , "none" );
60+ ArrayList <ValueVector > valueVectors = null ;
61+ int readBatchesCount = 0 ;
5762
5863 RootAllocator allocator = new RootAllocator (Long .MAX_VALUE );
59- try (ArrowStreamReader reader = new ArrowStreamReader (new ByteArrayInputStream (bytes ), allocator )) {
60- while (reader .loadNextBatch ()) {
61- // This will be loaded with new values on every call to loadNextBatch
62- VectorSchemaRoot readBatch = reader .getVectorSchemaRoot ();
63- readBatches .add (readBatch );
64+ ArrowStreamReader reader = new ArrowStreamReader (new ByteArrayInputStream (bytes ), allocator , CommonsCompressionFactory .INSTANCE );
65+
66+ VectorSchemaRoot root = reader .getVectorSchemaRoot ();
67+
68+ while (reader .loadNextBatch ()) {
69+ readBatchesCount ++;
70+ valueVectors = new ArrayList <>();
71+ for (FieldVector f : root .getFieldVectors ()) {
72+ // transfer will not copy data but transfer ownership of memory
73+ // from ArrowStreamReader to TileDBSQL. This is necessary because
74+ // otherwise we are not able to close the reader and retain the
75+ // data.
76+ TransferPair t = f .getTransferPair (allocator );
77+ t .transfer ();
78+ valueVectors .add (t .getTo ());
6479 }
65- } catch (IOException e ) {
66- throw new RuntimeException (e );
6780 }
68- } catch (ApiException e ) {
69- System .err .println ("Exception when calling SqlApi#runSQL/runSQLBytes" );
70- System .err .println ("Status code: " + e .getCode ());
71- System .err .println ("Reason: " + e .getResponseBody ());
72- System .err .println ("Response headers: " + e .getResponseHeaders ());
73- e .printStackTrace ();
81+ reader .close ();
82+ return new io .tiledb .java .api .Pair <>(valueVectors , readBatchesCount );
83+
84+ } catch (IOException | ApiException e ) {
85+ throw new RuntimeException (e );
7486 }
7587 }
7688
@@ -79,43 +91,28 @@ public void execArrow(){
7991 *
8092 * @return
8193 */
82- public void execStandard (){
94+ public List < Object > exec (){
8395 try {
8496 assert sql .getResultFormat () != null ;
85- results = apiInstance .runSQL (namespace , sql , sql .getResultFormat ().toString ());
97+ return apiInstance .runSQL (namespace , sql , sql .getResultFormat ().toString ());
8698 } catch (ApiException e ) {
8799 System .err .println ("Exception when calling SqlApi#runSQL/runSQLBytes" );
88100 System .err .println ("Status code: " + e .getCode ());
89101 System .err .println ("Reason: " + e .getResponseBody ());
90102 System .err .println ("Response headers: " + e .getResponseHeaders ());
91103 e .printStackTrace ();
92104 }
105+ return null ;
93106 }
94107
95108 /**
96- * Exec an SQL query
109+ *
97110 */
98- public void exec (){
99- if ( this . sql . getResultFormat () == ResultFormat . ARROW ) {
100- execArrow ();
101- }else {
102- execStandard ( );
111+ public void close (){
112+ try {
113+ reader . close ();
114+ } catch ( IOException e ) {
115+ throw new RuntimeException ( e );
103116 }
104117 }
105-
106- /**
107- * Get the results in Arrow format
108- * @return
109- */
110- public ArrayList <VectorSchemaRoot > getReadBatches () {
111- return readBatches ;
112- }
113-
114- /**
115- * Get the results as lists of Objects
116- * @return
117- */
118- public List <Object > getResults () {
119- return results ;
120- }
121118}
0 commit comments