Skip to content

Commit

Permalink
Ignored submit errors after stream shutting down
Browse files Browse the repository at this point in the history
  • Loading branch information
nvidianz committed Jan 30, 2025
1 parent cc60378 commit 18e63ab
Showing 1 changed file with 24 additions and 1 deletion.
25 changes: 24 additions & 1 deletion nvflare/fuel/f3/streaming/stream_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import threading
import time
Expand All @@ -22,11 +23,33 @@
STREAM_THREAD_POOL_SIZE = 128
ONE_MB = 1024 * 1024

stream_thread_pool = ThreadPoolExecutor(STREAM_THREAD_POOL_SIZE, "stm")
lock = threading.Lock()
sid_base = int((time.time() + os.getpid()) * 1000000) # microseconds
stream_count = 0

log = logging.getLogger(__name__)


class CheckedExecutor(ThreadPoolExecutor):
"""This executor ignores task after shutting down"""

def __init__(self, max_workers=None, thread_name_prefix=""):
super().__init__(max_workers, thread_name_prefix)
self.stopped = False

def shutdown(self, wait=True):
super().shutdown(wait)
self.stopped = True

def submit(self, fn, *args, **kwargs):
if self.stopped:
log.debug(f"Call {fn} is ignored after streaming shutting down")
else:
super().submit(fn, *args, **kwargs)


stream_thread_pool = CheckedExecutor(STREAM_THREAD_POOL_SIZE, "stm")


def wrap_view(buffer: BytesAlike) -> memoryview:
if isinstance(buffer, memoryview):
Expand Down

0 comments on commit 18e63ab

Please sign in to comment.