Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CUMULUS-3757: Update granule upsert logic to allow updating collection info #3887

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions packages/api-client/src/granules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,40 @@ export const associateExecutionWithGranule = async (params: {
});
};

/**
* Bulk operations on granules stored in cumulus
* POST /granules/bulk
*
* @param params - params
* @param params.prefix - the prefix configured for the stack
* @param params.body - body to pass the API lambda
* @param params.callback - async function to invoke the api lambda
* that takes a prefix / user payload. Defaults
* to cumulusApiClient.invokeApifunction to invoke the
* api lambda
* @returns - the response from the callback
*/
export const updateGranules = async (params: {
prefix: string,
body: ApiGranule[],
callback?: InvokeApiFunction
}): Promise<ApiGatewayLambdaHttpProxyResponse> => {
const { prefix, body, callback = invokeApi } = params;
return await callback({
prefix: prefix,
payload: {
httpMethod: 'PATCH',
resource: '/{proxy+}',
headers: {
'Content-Type': 'application/json',
},
path: '/granules/',
body: JSON.stringify(body),
},
expectedStatusCodes: 202,
});
}

/**
* Bulk operations on granules stored in cumulus
* POST /granules/bulk
Expand Down
34 changes: 34 additions & 0 deletions packages/api/endpoints/granules.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const {
translatePostgresCollectionToApiCollection,
translatePostgresGranuleToApiGranule,
getGranuleAndCollection,
updateGranulesAndFiles,
} = require('@cumulus/db');
const {
Search,
Expand All @@ -32,6 +33,7 @@ const {
multipleRecordFoundString,
} = require('@cumulus/es-client/search');
const ESSearchAfter = require('@cumulus/es-client/esSearchAfter');
const { updateGranule: updateEsGranule } = require('@cumulus/es-client/indexer');

const { deleteGranuleAndFiles } = require('../src/lib/granule-delete');
const { zodParser } = require('../src/zod-utils');
Expand Down Expand Up @@ -908,6 +910,36 @@ async function getByGranuleId(req, res) {
return res.send({ ...result, recoveryStatus });
}

/**
* Based on a move-collections-task, will update a list of
* moved granules' records in PG and ES
*
* @param {Object} req - express request object
* @param {Object} res - express response object
* @returns {Promise<Object>} the promise of express response object
*/
async function updateGranulesAndFilesCollectionRecords(req, res) {
const {
knex = await getKnexClient(),
esClient = await getEsClient(),
} = req.testContext || {};
const payload = req.body;
try {
await updateGranulesAndFiles(knex, payload);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔

tl;dr.

We should probably move the concurrency logic to the API package, handle bounding/configuring concurrency and address validation via something like zod.

Details/concerns:

  1. We should consider implementing a concurrency or size bound somehow the API is limited by design to 2 DB connections >by design<, if you throw a large enough request at this endpoint I'd expect it will either timeout or overwhelm the knex connection pool, or both. That's probably not appropriate to the db package (and why we wound up with this sort of logic in the api/lib write 'stuff' as an approach.

  2. Users aren't going to just use this for the intended approach for 3757, if it's part of the public API we likely should use zod to validate inputs/schemas/etc before passing a payload to a @cumulus/db method.

Copy link
Contributor Author

@Nnaga1 Nnaga1 Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so this should probably just update one granule and its file, while the api/lib or something else, would do the promise.all/concurrency stuff,

I'm all for changing the approach, I was really hoping in the review it would be to make it better than what I had bc theres stuff I was sure about (concurrency, api calls, the api timing out, how to call this.......), so whatever would be better, this has also changed a lot in the way it should be done so that has also thrown me off

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so should I do it like this:

make this endpoint just update one granule and its files in PG and ES -> make an api-client call for that -> elswhere, call the api-client fn concurrently with the list of target_granules (the changes to make)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to make a bulk update feature like this utilize a single granule call - that's going to force scaling of lambda invocations per granule, and is super inefficient.

There's room for various approaches here, but particularly, we need to put configurations/limits on concurrency around await Promise.all(granules.map(async (granule) => { so that we don't overwhelm the limited knex connection pool or make an exception for this bulk call and increase it, however that has user monitoring/tuning concerns.

As an opinionated take: I'd expect that best looks like having the DB package implement a single granule + all files update and associated tests, with an api lib method that does it for a bunch of granules and has concurrency controls in it. That allows for solid testing around a single granule update,and decouples concurrency concerns from the DB package. If the team feels strongly we should support bulk updates as part of the domain of the DB package, we could easily do that same approach entirely within the DB package.

await Promise.all(payload.map(async (granule) =>
await updateEsGranule(esClient, granule.granuleId, granule, process.env.ES_INDEX)));
} catch (error) {
log.error(
'failed to update granules:',
error
);
return res.boom.badRequest(errorify(error));
}
return res.send({
message: 'Successfully updated granules',
});
}

async function bulkOperations(req, res) {
const payload = req.body;

Expand Down Expand Up @@ -1073,6 +1105,7 @@ router.post('/', create);
router.patch('/:granuleId', requireApiVersion(2), patchByGranuleId);
router.patch('/:collectionId/:granuleId', requireApiVersion(2), patch);
router.put('/:collectionId/:granuleId', requireApiVersion(2), put);
router.patch('/', updateGranulesAndFilesCollectionRecords);

router.post(
'/bulk',
Expand Down Expand Up @@ -1104,5 +1137,6 @@ module.exports = {
put,
patch,
patchGranule,
updateGranulesAndFilesCollectionRecords,
router,
};
Loading