Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions tests/copy_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,38 @@
assert op.id is not None
op = api.wait_for(op.id)
assert op[0].state == OperationState.Succeeded, op[0].messages


def test_multi_copy_parent_progress(storage_path, client):
"""Test progress of parent operation when copying multiple files."""

api = DataTransferApi(client)
api.status(wait=True)

operations = []

for _ in range(3):
with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
temp_file.write("Mock file")
temp_file_name = os.path.basename(temp_file.name)

src = StoragePath(path=temp_file.name, remote="local")
dst = StoragePath(path=f"{storage_path}/{temp_file_name}")
operations.append(SrcDst(src=src, dst=dst))

op = api.copy(operations)

assert op.id is not None

# test progress handler
progress_history = []

def handler(ops):
for o in ops:
if o.id == op.id:
progress_history.append(o.progress)

op = api.wait_for(op.id, interval=0.05, handler=handler)
assert op[0].state == OperationState.Succeeded, op[0].messages

assert None not in progress_history, f"{progress_history=}"

Check failure on line 140 in tests/copy_test.py

View workflow job for this annotation

GitHub Actions / Test Report public-ubuntu-latest-8-cores:3.10

copy_test.test_multi_copy_parent_progress

AssertionError: progress_history=[None, None, 1.0] assert None not in [None, None, 1.0]
Raw output
storage_path = 'python_client_tests/test-multi-copy-parent-progress'
client = <ansys.hps.data_transfer.client.client.Client object at 0x7ff33ff60f70>

    def test_multi_copy_parent_progress(storage_path, client):
        """Test progress of parent operation when copying multiple files."""
    
        api = DataTransferApi(client)
        api.status(wait=True)
    
        operations = []
    
        for _ in range(3):
            with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file:
                temp_file.write("Mock file")
            temp_file_name = os.path.basename(temp_file.name)
    
            src = StoragePath(path=temp_file.name, remote="local")
            dst = StoragePath(path=f"{storage_path}/{temp_file_name}")
            operations.append(SrcDst(src=src, dst=dst))
    
        op = api.copy(operations)
    
        assert op.id is not None
    
        # test progress handler
        progress_history = []
    
        def handler(ops):
            for o in ops:
                if o.id == op.id:
                    progress_history.append(o.progress)
    
        op = api.wait_for(op.id, interval=0.05, handler=handler)
        assert op[0].state == OperationState.Succeeded, op[0].messages
    
>       assert None not in progress_history, f"{progress_history=}"
E       AssertionError: progress_history=[None, None, 1.0]
E       assert None not in [None, None, 1.0]

tests/copy_test.py:140: AssertionError
Loading