Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate number of spares #417 #434

Open
wants to merge 22 commits into
base: develop
Choose a base branch
from

Conversation

joelvdavies
Copy link
Collaborator

@joelvdavies joelvdavies commented Nov 29, 2024

Description

See #417. Leaves modified time unchanged.

Concurrency notes

There are multiple cases where concurrency can potentially cause a problem in this PR, I have attempted to mitigate these. Here are some particular cases to mention.

  • Setting the spares definition first updates the spares definition to ensure it cannot end up with two sets of spares updates running simultaneously (the first update will write block future ones)
  • Multiple items can be created at once on the front end in quick succession - This should be handled by the write locking of the parent catalogue item for each request (with one backend instance it seems to be fine but could change later). Should two actually conflict and the first takes longer than the default transaction timeout (5ms) one could fail. This should show in the front end, but will lead to missing items when creating multiple. We could auto retry in such cases to increase the timeout if required.
  • When setting the spares definition - it is possible to delete the usage statuses involved in it prior to the transaction completion as we update it along with updating the spares of all catalogue items in the same transaction. This could be resolved by write locking the usage statuses, but as both editing usage statuses and the spares definition are admin functionality it should be rare. Currently the aggregate query will fail during the final get of the definition if it doesn't exist as it will return [], causing a schema error which is raised as a 500.
  • When recalculating the number of spares while setting the spares definition all catalogue items are initially write locked by setting the spares definition to None as an item could be updated in between it starting and completing. This also prevents any item create/delete requests or updates that modify the usage status. (These may need issues on the front end to handle)
  • The spares definition is write locked (even if currently non-existent by upserting a document) when doing a spares calculation and when creating a catalogue item to prevent a case where a brand new catalogue item and items are added during a long spares calculation which would subsequently not be updated.

Performance tests

Setting the spares definition (using postman)

  • With 104 catalogue items, 159 items: 216ms
  • With 6427 catalogue items, 9684 items: 41.4 seconds (with many log statements for each spares update - 35.5 when commenting out)
  • With 104 catalogue items, 2928 items: 355ms
  • With 104 catalogue items, 4710 items: 377ms

This is much worse for high numbers of catalogue items as it iterates through them. I did look at aggregate queries but couldn't find examples close to what would be needed here. Still potentially worth investigating further. The main limitation would be the stage memory limit for a large number of items as the count would likely have to come from the size of a lookup stage. (This would also have been the case for using aggregate queries in all catalogue item requests)

Testing instructions

  • Review code
  • Check Actions build
  • Review changes to test coverage

Agile board tracking

Closes #417

@joelvdavies joelvdavies added the enhancement New feature or request label Nov 29, 2024
Copy link

codecov bot commented Dec 2, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 98.00%. Comparing base (71e1a10) to head (5b9405e).

Additional details and impacted files
@@             Coverage Diff             @@
##           develop     #434      +/-   ##
===========================================
+ Coverage    97.91%   98.00%   +0.08%     
===========================================
  Files           48       48              
  Lines         1723     1800      +77     
===========================================
+ Hits          1687     1764      +77     
  Misses          36       36              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@joelvdavies joelvdavies force-pushed the calculate-number-of-spares-#417 branch from ce2ff73 to e08e647 Compare December 5, 2024 13:01
@joelvdavies joelvdavies force-pushed the calculate-number-of-spares-#417 branch from 8655607 to 85a17d1 Compare December 6, 2024 14:49
@joelvdavies
Copy link
Collaborator Author

@VKTB @joshuadkitenge @asuresh-code Tagging you all just to say feel free to test this PR and see if you can think of any other cases I missed in the description.

@joelvdavies joelvdavies marked this pull request as ready for review December 9, 2024 13:51
Base automatically changed from handle-property-migration-conflict-#412 to develop December 9, 2024 14:19
@joelvdavies
Copy link
Collaborator Author

joelvdavies commented Dec 9, 2024

I have just tried the tested an alternative method of using an aggregate query on the list endpoint using

        catalogue_items = list(
            self._catalogue_items_collection.aggregate(
                [
                    {
                        "$lookup": {
                            "from": "items",
                            "localField": "_id",
                            "foreignField": "catalogue_item_id",
                            "as": "related_items",
                        }
                    },
                    {
                        "$addFields": {
                            "number_of_spares": {
                                "$size": {
                                    "$filter": {
                                        "input": "$related_items",
                                        "as": "item",
                                        "cond": {
                                            "$eq": [
                                                "$$item.usage_status_id",
                                                CustomObjectId("6756fc3b220c8ca1a0b8c7cb"),
                                            ]
                                        },
                                    }
                                }
                            }
                        }
                    },
                    {"$project": {"related_items": 0}},
                ]
            )
        )

In the in the catalogue item repo list method instead of the find. (This is not using the spares definition usage status array though). This took too long for swagger to complete, and was well over 5 minutes for the case described in the description of setting the spares definition (6427 catalogue items, 9684 items). While limited by pagination and querying by catalogue item id the 100MB stage limit would be a bigger problem with lookup stage as I believe it would be a combined limit for the catalogue items and item documents that would have to be in memory at the same time.

Copy link
Contributor

@asuresh-code asuresh-code left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried using Python's ThreadPoolExecutor to concurrently update the catalogue items, to see if it would improve performance. I only changed 1 function in the setting.py in the services layer.

Using Postman, I got the following results:

  • With 104 catalogue items, 159 items: 207ms w/ multithreading, 222ms w/o
  • With 1194 catalogue items, 1946 items: 1.8 seconds w/, 2.33 seconds w/o
  • With 4.7k catalogue items, 7.6k items: 17.92 seconds w/, 20.08 seconds w/o

from concurrent.futures import ThreadPoolExecutor

def update_spares_definition(self, spares_definition: SparesDefinitionPutSchema) -> SparesDefinitionOut:
        """
        Updates the spares definition to a new value.

        :param spares_definition: The new spares definition.
        :return: The updated spares definition.
        :raises MissingRecordError: If any of the usage statuses specified by the given IDs don't exist.
        """
        # Ensure all the given usage statuses exist
        for usage_status in spares_definition.usage_statuses:
            if not self._usage_status_repository.get(usage_status.id):
                raise MissingRecordError(f"No usage status found with ID: {usage_status.id}")

        # Begin a session for transactional updates
        with start_session_transaction("updating spares definition") as session:
            # Upsert the new spares definition
            new_spares_definition = self._setting_repository.upsert(
                SparesDefinitionIn(**spares_definition.model_dump()), SparesDefinitionOut, session=session
            )

            # Lock catalogue items for updates
            utils.prepare_for_number_of_spares_recalculation(None, self._catalogue_item_repository, session)

            # Obtain all catalogue item IDs
            catalogue_item_ids = self._catalogue_item_repository.list_ids()

            # Precompute usage status IDs that define a spare
            usage_status_ids = utils.get_usage_status_ids_from_spares_definition(new_spares_definition)

            # Define the worker function for recalculations
            def recalculate_spares(catalogue_item_id):
                utils.perform_number_of_spares_recalculation(
                    catalogue_item_id, usage_status_ids, self._catalogue_item_repository, self._item_repository, session
                )

            # Use ThreadPoolExecutor for concurrent recalculations
            logger.info("Updating the number of spares for all catalogue items concurrently")
            with ThreadPoolExecutor(max_workers=10) as executor:  # May need to experiment w/ max workers
                executor.map(recalculate_spares, catalogue_item_ids)

        return new_spares_definition

@joelvdavies
Copy link
Collaborator Author

joelvdavies commented Jan 2, 2025

I tried using Python's ThreadPoolExecutor to concurrently update the catalogue items, to see if it would improve performance. I only changed 1 function in the setting.py in the services layer.

Using Postman, I got the following results:

  • With 104 catalogue items, 159 items: 207ms w/ multithreading, 222ms w/o
  • With 1194 catalogue items, 1946 items: 1.8 seconds w/, 2.33 seconds w/o
  • With 4.7k catalogue items, 7.6k items: 17.92 seconds w/, 20.08 seconds w/o

from concurrent.futures import ThreadPoolExecutor

def update_spares_definition(self, spares_definition: SparesDefinitionPutSchema) -> SparesDefinitionOut:
        """
        Updates the spares definition to a new value.

        :param spares_definition: The new spares definition.
        :return: The updated spares definition.
        :raises MissingRecordError: If any of the usage statuses specified by the given IDs don't exist.
        """
        # Ensure all the given usage statuses exist
        for usage_status in spares_definition.usage_statuses:
            if not self._usage_status_repository.get(usage_status.id):
                raise MissingRecordError(f"No usage status found with ID: {usage_status.id}")

        # Begin a session for transactional updates
        with start_session_transaction("updating spares definition") as session:
            # Upsert the new spares definition
            new_spares_definition = self._setting_repository.upsert(
                SparesDefinitionIn(**spares_definition.model_dump()), SparesDefinitionOut, session=session
            )

            # Lock catalogue items for updates
            utils.prepare_for_number_of_spares_recalculation(None, self._catalogue_item_repository, session)

            # Obtain all catalogue item IDs
            catalogue_item_ids = self._catalogue_item_repository.list_ids()

            # Precompute usage status IDs that define a spare
            usage_status_ids = utils.get_usage_status_ids_from_spares_definition(new_spares_definition)

            # Define the worker function for recalculations
            def recalculate_spares(catalogue_item_id):
                utils.perform_number_of_spares_recalculation(
                    catalogue_item_id, usage_status_ids, self._catalogue_item_repository, self._item_repository, session
                )

            # Use ThreadPoolExecutor for concurrent recalculations
            logger.info("Updating the number of spares for all catalogue items concurrently")
            with ThreadPoolExecutor(max_workers=10) as executor:  # May need to experiment w/ max workers
                executor.map(recalculate_spares, catalogue_item_ids)

        return new_spares_definition

Thanks for testing this. Another good potential improvement. My main concern with this would be interference with FastAPI's own threading behaviour. As we don't use async functions fastapi itself is already using a threadpool to handle multiple requests at once, so not sure if its wise to do this again in code without making other things async. ~I am surprised that the performance increase was even that much given I would have thought the database connection is the same across these threads as and don't use async pymongo. Unless maybe the executor is executing the loop in compiled code instead of python? I am not sure, perhaps it even spawns multiple database connections.

@joelvdavies joelvdavies force-pushed the calculate-number-of-spares-#417 branch from 4ab3642 to 5b9405e Compare January 2, 2025 14:52
@joelvdavies
Copy link
Collaborator Author

I tried using Python's ThreadPoolExecutor to concurrently update the catalogue items, to see if it would improve performance. I only changed 1 function in the setting.py in the services layer.
Using Postman, I got the following results:

  • With 104 catalogue items, 159 items: 207ms w/ multithreading, 222ms w/o
  • With 1194 catalogue items, 1946 items: 1.8 seconds w/, 2.33 seconds w/o
  • With 4.7k catalogue items, 7.6k items: 17.92 seconds w/, 20.08 seconds w/o

from concurrent.futures import ThreadPoolExecutor

def update_spares_definition(self, spares_definition: SparesDefinitionPutSchema) -> SparesDefinitionOut:
        """
        Updates the spares definition to a new value.

        :param spares_definition: The new spares definition.
        :return: The updated spares definition.
        :raises MissingRecordError: If any of the usage statuses specified by the given IDs don't exist.
        """
        # Ensure all the given usage statuses exist
        for usage_status in spares_definition.usage_statuses:
            if not self._usage_status_repository.get(usage_status.id):
                raise MissingRecordError(f"No usage status found with ID: {usage_status.id}")

        # Begin a session for transactional updates
        with start_session_transaction("updating spares definition") as session:
            # Upsert the new spares definition
            new_spares_definition = self._setting_repository.upsert(
                SparesDefinitionIn(**spares_definition.model_dump()), SparesDefinitionOut, session=session
            )

            # Lock catalogue items for updates
            utils.prepare_for_number_of_spares_recalculation(None, self._catalogue_item_repository, session)

            # Obtain all catalogue item IDs
            catalogue_item_ids = self._catalogue_item_repository.list_ids()

            # Precompute usage status IDs that define a spare
            usage_status_ids = utils.get_usage_status_ids_from_spares_definition(new_spares_definition)

            # Define the worker function for recalculations
            def recalculate_spares(catalogue_item_id):
                utils.perform_number_of_spares_recalculation(
                    catalogue_item_id, usage_status_ids, self._catalogue_item_repository, self._item_repository, session
                )

            # Use ThreadPoolExecutor for concurrent recalculations
            logger.info("Updating the number of spares for all catalogue items concurrently")
            with ThreadPoolExecutor(max_workers=10) as executor:  # May need to experiment w/ max workers
                executor.map(recalculate_spares, catalogue_item_ids)

        return new_spares_definition

Thanks for testing this. Another good potential improvement. My main concern with this would be interference with FastAPI's own threading behaviour. As we don't use async functions fastapi itself is already using a threadpool to handle multiple requests at once, so not sure if its wise to do this again in code without making other things async. ~I am surprised that the performance increase was even that much given I would have thought the database connection is the same across these threads as and don't use async pymongo. Unless maybe the executor is executing the loop in compiled code instead of python? I am not sure, perhaps it even spawns multiple database connections.

I have just tested this an also got a performance improvement. According to db.serverStatus().connections it does not create more connections, so my hunch is that the threadpool is just avoiding the wait for the database to respond and instead allowing the next request to be sent. So I think the database is effectively doing the bulk of the multithreading and that's why is faster. The best number of workers is just below that of the threads on the machine. If we use this on prod it could be much higher, but for me it was best around 4, which next to no improvement afterwards. We could also achieve the same thing if we used pymongo and fastapi asynchronously, but we would again have to deal with #258. This improvement also assumes there are enough free threads on the api machine too otherwise the performance increase would be reduced. Its also impacted by other users although that would be expected anyway.

Copy link
Collaborator

@joshdimanteto joshdimanteto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context:
The current implementation of the usage status in IMS_API prevents users from deleting a usage status if it is used in an item within the Items collection. I had assumed that the same logic would be applied for editing (which has not been implemented yet).


The way the spares definition is structured is effectively assigning a type (i.e., spares) to the usage status. However, this type is stored globally, rather than within the usage status collection itself. This design means that usage statuses can be edited indirectly, which I don't believe should not be allowed. Allowing such edits would introduce unnecessary complexity, as it could redefine what items are without considering their position, context, or other related factors.

For example, consider the usage statuses: new, used, scrapped, and inUse. If the type is modified globally and affects the status of items, it could lead to significant issues. Specifically, if an item was initially defined as a spare and then the spares definition is modified , the system would need to update all related item data before allowing the change. This would require careful modification of the items that are still linked to the spare status, which introduces complexity and potential for errors. Allowing this kind of change without ensuring all related items are updated first would cause data inconsistencies.

Storing the type within the usage status collection would make this more static and controlled, as any changes would then have to follow the predefined logic within that collection. This ensures consistency across the system and prevents the accidental redefinition or removal of statuses that could break existing item definitions.

@joelvdavies
Copy link
Collaborator Author

joelvdavies commented Jan 29, 2025

Context: The current implementation of the usage status in IMS_API prevents users from deleting a usage status if it is used in an item within the Items collection. I had assumed that the same logic would be applied for editing (which has not been implemented yet).

The way the spares definition is structured is effectively assigning a type (i.e., spares) to the usage status. However, this type is stored globally, rather than within the usage status collection itself. This design means that usage statuses can be edited indirectly, which I don't believe should not be allowed.

We don't allow edits for usage statuses here. Even if it did though e.g. the name, its using an aggregate query so it shouldn't care.

Allowing such edits would introduce unnecessary complexity, as it could redefine what items are without considering their position, context, or other related factors.

Not sure I follow? Its either a spare because of its usage status or its not. I thought we already discussed this part.

For example, consider the usage statuses: new, used, scrapped, and inUse. If the type is modified globally and affects the status of items, it could lead to significant issues. Specifically, if an item was initially defined as a spare and then the spares definition is modified , the system would need to update all related item data before allowing the change.

Yes that is what this implementation is doing.

This would require careful modification of the items that are still linked to the spare status, which introduces complexity and potential for errors. Allowing this kind of change without ensuring all related items are updated first would cause data inconsistencies.

Again this is doing the update in a way that ensures they are all updated if the spares definition changes.

Storing the type within the usage status collection would make this more static and controlled, as any changes would then have to follow the predefined logic within that collection. This ensures consistency across the system and prevents the accidental redefinition or removal of statuses that could break existing item definitions.

I don't understand this logic? We cant just store the spares definition inside the usage statuses - its a setting not a specific usage status. Moving it there wouldn't change any of this logic, if it were updated it would still need to recalculate.

@joshdimanteto
Copy link
Collaborator

Context: The current implementation of the usage status in IMS_API prevents users from deleting a usage status if it is used in an item within the Items collection. I had assumed that the same logic would be applied for editing (which has not been implemented yet).
The way the spares definition is structured is effectively assigning a type (i.e., spares) to the usage status. However, this type is stored globally, rather than within the usage status collection itself. This design means that usage statuses can be edited indirectly, which I don't believe should not be allowed.

We don't allow edits for usage statuses here. Even if it did though e.g. the name, its using an aggregate query so it shouldn't care.

Allowing such edits would introduce unnecessary complexity, as it could redefine what items are without considering their position, context, or other related factors.

Not sure I follow? Its either a spare because of its usage status or its not. I thought we already discussed this part.

For example, consider the usage statuses: new, used, scrapped, and inUse. If the type is modified globally and affects the status of items, it could lead to significant issues. Specifically, if an item was initially defined as a spare and then the spares definition is modified , the system would need to update all related item data before allowing the change.

Yes that is what this implementation is doing.

This would require careful modification of the items that are still linked to the spare status, which introduces complexity and potential for errors. Allowing this kind of change without ensuring all related items are updated first would cause data inconsistencies.

Again this is doing the update in a way that ensures they are all updated if the spares definition changes.
Storing the type within the usage status collection would make this more static and controlled, as any changes would then have to follow the predefined logic within that collection. This ensures consistency across the system and prevents the accidental redefinition or removal of statuses that could break existing item definitions.

I don't understand this logic? We cant just store the spares definition inside the usage statuses - its a setting not a specific usage status. Moving it there wouldn't change any of this logic, if it were updated it would still need to recalculate.

The spares definition is effectively assigning a type to the usage status (indirectly). An item is either a spare or not a spare. If multiple users add items into IMS using the current spares definition (where the usage status of the item is always based on the spares definition, and as the definition is a list of usage statuses, the items are automatically linked to this list), and then the spares definition is modified, all the item data becomes incorrect because it was based on the previous definition. This should not be allowed. Currently, it is allowed because the spares endpoint is a PUT request, and the definition can be changed from new and used to scrapped and inUse. The ability to change the spares definition while the usage statuses are still being referenced in the Items collection should be limited to adding or deleting unreferenced usage statuses only.

Moving this logic to the usage_statuses collection would involve adding an extra field to the usage status document, such as type. For example, the type could now be set to spare or any other type we may define in the future. This would require additional logic to determine the definition from the usage status list. However, this approach would make it easier to prevent referenced usage statuses from being modified, and therefore, prevent the spares definition from being changed since it would be determined by the list.

@joshdimanteto
Copy link
Collaborator

joshdimanteto commented Jan 29, 2025

This is what I'm talking about

spares_def_concern.mp4

recalculating
"""

with start_session_transaction(action_description) as session:
Copy link
Collaborator

@VKTB VKTB Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that items for that specific catalogue item (but not other catalogue items) cannot be created, edited or deleted during a transaction?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct, to avoid the another spares calculation from happening in between this one (leading to an invalid number of spares being stored in the database) it will block it. However like the case above - the count query is very fast so I have been unable to cause it in practice when generating many at once. (The mock data script is also making it past these when generating data as well)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And to double check again that would only be the case for that specific catalogue item? Or would it prevent creating items in other catalogue items too?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just within the specific catalogue item as its that item that has the number of spares updated.

catalogue_item_id, self._catalogue_item_repository, session
)

yield session
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure what the purpose of yield is and what it accomplishes/ here, the method docs just refer to it as yielding. Do you mind explaining please?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must admit I have always found it awkward to try and explain what it does 😄, I am more familiar with its use. I think the best example I can think of is open, it's definition would be something like

def open(...):
     open file
     yield file
     close file

Then when using with open, it executes everything up to the yield, which then returns the file object (allowing it to be used), but when the python interpreter leaves the with block it also executes the last part after the yield closing the file.

In this case I am using it just to extend the behaviour of an existing one start_session_transaction, but I want it to only close when the with block is exited, so I use the yield effectively allowing it _start_transaction_impacting_number_of_spares to behave in the same way as start_session_transaction.

Comment on lines +293 to +294
Should after `perform_number_of_spares_recalculation` in order to ensure the catalogue item is write locked to avoid
other similar updates interfering.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Should after `perform_number_of_spares_recalculation` in order to ensure the catalogue item is write locked to avoid
other similar updates interfering.
Should be called after `perform_number_of_spares_recalculation` in order to ensure the catalogue item is write
locked to avoid other similar updates interfering.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Calculate the number of spares within a catalogue item
4 participants