diff --git a/adsmp/app.py b/adsmp/app.py index b507106..4eac576 100644 --- a/adsmp/app.py +++ b/adsmp/app.py @@ -181,14 +181,6 @@ def update_storage(self, bibcode, type, payload): record.scix_id = "scix:" + str(self.generate_scix_id(record.bib_data)) out = record.toJSON() session.commit() - - # Send payload to Boost pipeline - if type != 'boost' and not self._config.get('TESTING_MODE', False): - try: - self.generate_boost_request_message(bibcode) - except Exception as e: - self.logger.exception('Error generating boost request message for bibcode %s: %s', bibcode, e) - return out except exc.IntegrityError: self.logger.exception('error in app.update_storage while updating database for bibcode {}, type {}'.format(bibcode, type)) diff --git a/adsmp/tasks.py b/adsmp/tasks.py index 7620cc9..e126a25 100644 --- a/adsmp/tasks.py +++ b/adsmp/tasks.py @@ -42,10 +42,73 @@ Queue('update-sitemap-files', app.exchange, routing_key='update-sitemap-files'), Queue('update-scixid', app.exchange, routing_key='update-scixid'), Queue('boost-request', app.exchange, routing_key='boost-request'), + Queue('augment-record', app.exchange, routing_key='augment-record'), ) # ============================= TASKS ============================================= # +@app.task(queue='augment-record') +def task_augment_record(msg): + """Receives payload to augment the record. + + @param msg: protobuff that contains at minimum + - bibcode + - and specific payload + """ + # logger.debug('Updating record: %s', msg) + logger.debug('Updating record: %s', msg) + status = app.get_msg_status(msg) + logger.debug(f'Message status: {status}') + type = app.get_msg_type(msg) + logger.debug(f'Message type: {type}') + bibcodes = [] + + if status == 'active': + # save into a database + # passed msg may contain details on one bibcode or a list of bibcodes + if type == 'nonbib_records': + for m in msg.nonbib_records: + m = Msg(m, None, None) # m is a raw protobuf, TODO: return proper instance from .nonbib_records + bibcodes.append(m.bibcode) + record = app.update_storage(m.bibcode, 'nonbib_data', m.toJSON()) + if record: + logger.debug('Saved record from list: %s', record) + elif type == 'metrics_records': + for m in msg.metrics_records: + m = Msg(m, None, None) + bibcodes.append(m.bibcode) + record = app.update_storage(m.bibcode, 'metrics', m.toJSON(including_default_value_fields=True)) + if record: + logger.debug('Saved record from list: %s', record) + elif type == 'augment': + bibcodes.append(msg.bibcode) + record = app.update_storage(msg.bibcode, 'augment', + msg.toJSON(including_default_value_fields=True)) + if record: + logger.debug('Saved augment message: %s', msg) + elif type == 'classify': + bibcodes.append(msg.bibcode) + logger.debug(f'message to JSON: {msg.toJSON(including_default_value_fields=True)}') + payload = msg.toJSON(including_default_value_fields=True) + payload = payload['collections'] + record = app.update_storage(msg.bibcode, 'classify',payload) + if record: + logger.debug('Saved classify message: %s', msg) + else: + # here when record has a single bibcode + bibcodes.append(msg.bibcode) + record = app.update_storage(msg.bibcode, type, msg.toJSON()) + if record: + logger.debug('Saved record: %s', record) + if record: + # Send payload to Boost pipeline + if type != 'boost' and not app._config.get('TESTING_MODE', False): + try: + task_boost_request.apply_async(args=(msg.bibcode,)) + except Exception as e: + app.logger.exception('Error generating boost request message for bibcode %s: %s', msg.bibcode, e) + else: + logger.error('Received a message with unclear status: %s', msg) @app.task(queue='update-record') def task_update_record(msg): @@ -126,6 +189,13 @@ def task_update_record(msg): # that pipeline will eventually respond with a msg to task_update_record logger.debug('requesting affilation augmentation for %s', msg.bibcode) app.request_aff_augment(msg.bibcode) + if record: + # Send payload to Boost pipeline + if type != 'boost' and not app._config.get('TESTING_MODE', False): + try: + task_boost_request.apply_async(args=(msg.bibcode,)) + except Exception as e: + app.logger.exception('Error generating boost request message for bibcode %s: %s', msg.bibcode, e) else: logger.error('Received a message with unclear status: %s', msg)