@@ -6,11 +6,10 @@ use crate::utils::{delete_dir, walk_tree};
6
6
use crate :: { ObjectStoreError , PyClientOptions } ;
7
7
8
8
use object_store:: path:: Path ;
9
- use object_store:: { DynObjectStore , Error as InnerObjectStoreError , ListResult , MultipartId } ;
9
+ use object_store:: { DynObjectStore , Error as InnerObjectStoreError , ListResult , MultipartUpload } ;
10
10
use pyo3:: exceptions:: { PyNotImplementedError , PyValueError } ;
11
11
use pyo3:: prelude:: * ;
12
12
use pyo3:: types:: { IntoPyDict , PyBytes } ;
13
- use tokio:: io:: { AsyncWrite , AsyncWriteExt } ;
14
13
use tokio:: runtime:: Runtime ;
15
14
16
15
#[ pyclass( subclass, weakref) ]
@@ -442,11 +441,10 @@ impl ObjectInputFile {
442
441
// TODO add buffer to store data ...
443
442
#[ pyclass( weakref) ]
444
443
pub struct ObjectOutputStream {
445
- store : Arc < DynObjectStore > ,
444
+ pub store : Arc < DynObjectStore > ,
446
445
rt : Arc < Runtime > ,
447
- path : Path ,
448
- writer : Box < dyn AsyncWrite + Send + Unpin > ,
449
- multipart_id : MultipartId ,
446
+ pub path : Path ,
447
+ writer : Box < dyn MultipartUpload > ,
450
448
pos : i64 ,
451
449
#[ pyo3( get) ]
452
450
closed : bool ,
@@ -460,17 +458,18 @@ impl ObjectOutputStream {
460
458
store : Arc < DynObjectStore > ,
461
459
path : Path ,
462
460
) -> Result < Self , ObjectStoreError > {
463
- let ( multipart_id, writer) = store. put_multipart ( & path) . await . unwrap ( ) ;
464
- Ok ( Self {
465
- store,
466
- rt,
467
- path,
468
- writer,
469
- multipart_id,
470
- pos : 0 ,
471
- closed : false ,
472
- mode : "wb" . into ( ) ,
473
- } )
461
+ match store. put_multipart ( & path) . await {
462
+ Ok ( writer) => Ok ( Self {
463
+ store,
464
+ rt,
465
+ path,
466
+ writer,
467
+ pos : 0 ,
468
+ closed : false ,
469
+ mode : "wb" . into ( ) ,
470
+ } ) ,
471
+ Err ( err) => Err ( ObjectStoreError :: ObjectStore ( err) ) ,
472
+ }
474
473
}
475
474
476
475
fn check_closed ( & self ) -> Result < ( ) , ObjectStoreError > {
@@ -488,11 +487,11 @@ impl ObjectOutputStream {
488
487
impl ObjectOutputStream {
489
488
fn close ( & mut self ) -> PyResult < ( ) > {
490
489
self . closed = true ;
491
- match self . rt . block_on ( self . writer . shutdown ( ) ) {
490
+ match self . rt . block_on ( self . writer . complete ( ) ) {
492
491
Ok ( _) => Ok ( ( ) ) ,
493
492
Err ( err) => {
494
493
self . rt
495
- . block_on ( self . store . abort_multipart ( & self . path , & self . multipart_id ) )
494
+ . block_on ( self . writer . abort ( ) )
496
495
. map_err ( ObjectStoreError :: from) ?;
497
496
Err ( ObjectStoreError :: from ( err) . into ( ) )
498
497
}
@@ -537,24 +536,25 @@ impl ObjectOutputStream {
537
536
538
537
fn write ( & mut self , data : & PyBytes ) -> PyResult < i64 > {
539
538
self . check_closed ( ) ?;
540
- let len = data. as_bytes ( ) . len ( ) as i64 ;
541
- match self . rt . block_on ( self . writer . write_all ( data. as_bytes ( ) ) ) {
539
+ let bytes = data. as_bytes ( ) . to_vec ( ) ;
540
+ let len = bytes. len ( ) as i64 ;
541
+ match self . rt . block_on ( self . writer . put_part ( bytes. into ( ) ) ) {
542
542
Ok ( _) => Ok ( len) ,
543
543
Err ( err) => {
544
544
self . rt
545
- . block_on ( self . store . abort_multipart ( & self . path , & self . multipart_id ) )
545
+ . block_on ( self . writer . abort ( ) )
546
546
. map_err ( ObjectStoreError :: from) ?;
547
547
Err ( ObjectStoreError :: from ( err) . into ( ) )
548
548
}
549
549
}
550
550
}
551
551
552
552
fn flush ( & mut self ) -> PyResult < ( ) > {
553
- match self . rt . block_on ( self . writer . flush ( ) ) {
553
+ match self . rt . block_on ( self . writer . complete ( ) ) {
554
554
Ok ( _) => Ok ( ( ) ) ,
555
555
Err ( err) => {
556
556
self . rt
557
- . block_on ( self . store . abort_multipart ( & self . path , & self . multipart_id ) )
557
+ . block_on ( self . writer . abort ( ) )
558
558
. map_err ( ObjectStoreError :: from) ?;
559
559
Err ( ObjectStoreError :: from ( err) . into ( ) )
560
560
}
0 commit comments