|
| 1 | +from io import StringIO |
| 2 | +from typing import Any |
| 3 | + |
| 4 | +from lxml import etree |
| 5 | +from pymarc import parse_xml_to_array |
| 6 | +from sickle import models |
| 7 | + |
| 8 | +from ..mapper import Record, Vernacular, Validator |
| 9 | + |
| 10 | + |
| 11 | +class UcbTindRecord(Record): |
| 12 | + |
| 13 | + def UCLDC_map(self): |
| 14 | + self.marc_880_fields = self.get_880_fields() |
| 15 | + |
| 16 | + return { |
| 17 | + "calisphere-id": self.legacy_couch_db_id.split("--")[1], |
| 18 | + "_id": self.get_marc_data_fields(["901"], ["a"]), |
| 19 | + "isShownAt": self.map_is_shown_at, |
| 20 | + "isShownBy": self.map_is_shown_by, |
| 21 | + "alternativeTitle": self.get_marc_data_fields(["246"]), |
| 22 | + "language": self.get_marc_data_fields(["041"], ["a"]), |
| 23 | + "date": self.get_marc_data_fields(["260"], ["c"]), |
| 24 | + "publisher": self.get_marc_data_fields(["260"], ["a", "b"]), |
| 25 | + "format": self.get_marc_data_fields(["655"], ["2"], |
| 26 | + exclude_subfields=True), |
| 27 | + "extent": self.map_extent, |
| 28 | + "identifier": self.get_marc_data_fields(["024", "901", "035"], |
| 29 | + ["a"]), |
| 30 | + "contributor": self.get_marc_data_fields(["100", "110", "111"]), |
| 31 | + "creator": self.get_marc_data_fields(["700", "710"], ["a"]), |
| 32 | + "relation": self.map_relation, |
| 33 | + "provenance": self.get_marc_data_fields(["541"], ["a"]), |
| 34 | + "description": self.map_description, |
| 35 | + "rights": self.get_marc_data_fields(["506", "540"]), |
| 36 | + "temporal": self.get_marc_data_fields(["648"]), |
| 37 | + "title": self.map_title, |
| 38 | + "spatial": self.map_spatial, |
| 39 | + "subject": self.map_subject, |
| 40 | + "type": self.get_marc_data_fields(["336"]) |
| 41 | + } |
| 42 | + |
| 43 | + def get_marc_control_field(self, field_tag: str, index: int = None) -> list: |
| 44 | + """ |
| 45 | +
|
| 46 | + See: https://www.loc.gov/marc/bibliographic/bd00x.html |
| 47 | +
|
| 48 | + Get MARC control field. Returns an empty string if: |
| 49 | + * Control field isn't set |
| 50 | + * No value exists at the requested index |
| 51 | + Otherwise it returns a value |
| 52 | +
|
| 53 | + :param field_tag: Field tag to retrieve. |
| 54 | + :param index: A specific index to fetch |
| 55 | + :return: List of values for the control fields. |
| 56 | + """ |
| 57 | + |
| 58 | + # Don't let any data tags sneak in! They have subfields. |
| 59 | + data_field_tag = field_tag if field_tag.isnumeric() and int( |
| 60 | + field_tag) < 100 else "" |
| 61 | + |
| 62 | + values = [v[0].value() for (k, v) |
| 63 | + in self.marc_tags_as_dict([data_field_tag]).items() |
| 64 | + if len(v) > 0] |
| 65 | + |
| 66 | + if not values: |
| 67 | + return "" |
| 68 | + |
| 69 | + value = values[0] |
| 70 | + |
| 71 | + if index and len(value) > index + 1: |
| 72 | + return value[index] |
| 73 | + |
| 74 | + if index: |
| 75 | + return "" |
| 76 | + |
| 77 | + return value |
| 78 | + |
| 79 | + |
| 80 | + def get_880_fields(self) -> dict: |
| 81 | + ''' |
| 82 | + Returns a dict of 880 fields for the record, e.g.: |
| 83 | +
|
| 84 | + marc_880_fields = { |
| 85 | + '01': [ |
| 86 | + Subfield(code='6', value='700-01'), |
| 87 | + Subfield(code='a', value='雪谷.') |
| 88 | + ], |
| 89 | + '02': [ |
| 90 | + Subfield(code='6', value='245-02'), |
| 91 | + Subfield(code='a', value='保壽軒御茶銘雙六 ') |
| 92 | + ], |
| 93 | + '03': [ |
| 94 | + Subfield(code='6', value='246-03'), |
| 95 | + Subfield(code='a', value='保壽軒') |
| 96 | + ], |
| 97 | + '04': [ |
| 98 | + Subfield(code='6', value='260-04'), |
| 99 | + Subfield(code='a', value='横濱 '), |
| 100 | + Subfield(code='a', value='東京 '), |
| 101 | + Subfield(code='b', value='桝本保五郎'), |
| 102 | + Subfield(code='c', value='[between 1868 and 1912]') |
| 103 | + ] |
| 104 | + } |
| 105 | + ''' |
| 106 | + marc_880_fields = {} |
| 107 | + marc_data = self.source_metadata.get("marc") |
| 108 | + |
| 109 | + for field in marc_data.get_fields("880"): |
| 110 | + for subfield in field.subfields: |
| 111 | + if self.subfield_matches(subfield.code, ['6'], False): |
| 112 | + field_880_key = subfield.value.split('-')[1] |
| 113 | + |
| 114 | + marc_880_fields[field_880_key] = field.subfields |
| 115 | + |
| 116 | + return marc_880_fields |
| 117 | + |
| 118 | + |
| 119 | + def subfield_matches(self, check_code: str, subfield_codes: list, |
| 120 | + exclude_subfields: bool) -> bool: |
| 121 | + """ |
| 122 | + :param check_code: The code to check against the subfield codes. |
| 123 | + :param subfield_codes: A list of subfield codes to include / exclude |
| 124 | + :param exclude_subfields: A boolean value indicating whether to exclude the |
| 125 | + specified subfield codes. |
| 126 | + :return: A boolean value indicating whether the check_code is included or |
| 127 | + excluded based on the subfield_codes and exclude_subfields parameters. |
| 128 | + """ |
| 129 | + |
| 130 | + # Always exclude subfield 6 (Linkage, |
| 131 | + # see: https://www.loc.gov/marc/bibliographic/ecbdcntf.html) unless it is |
| 132 | + # explicitly listed. Not excluding this was producing results that |
| 133 | + # were not expected. |
| 134 | + if check_code == "6" and "6" not in subfield_codes: |
| 135 | + return False |
| 136 | + if not subfield_codes: |
| 137 | + return True |
| 138 | + if exclude_subfields: |
| 139 | + return check_code not in subfield_codes |
| 140 | + else: |
| 141 | + return check_code in subfield_codes |
| 142 | + |
| 143 | + |
| 144 | + def marc_tags_as_dict(self, field_tags: list) -> dict: |
| 145 | + """ |
| 146 | + Get the specified MARC fields from the source_metadata, mapping by field tag |
| 147 | +
|
| 148 | + :param field_tags: List of MARC fields to retrieve. |
| 149 | + :return: List of MARC fields from the source_metadata. |
| 150 | + """ |
| 151 | + return {field_tag: self.source_metadata.get("marc").get_fields(field_tag) for |
| 152 | + field_tag in field_tags} |
| 153 | + |
| 154 | + |
| 155 | + def get_marc_data_fields(self, field_tags: list, subfield_codes=[], get_880_values=True, |
| 156 | + exclude_subfields=False) -> list: |
| 157 | + """ |
| 158 | + In most cases, this returns the Cartesian product of the provided `field_tags` |
| 159 | + and `subfield codes`. If `get_880_values` is true, it will augment to include values |
| 160 | + from field 880. Note the special handling of code `6`. |
| 161 | +
|
| 162 | + Set the `exclude_subfields` kwarg to exclude the specified subfield_codes. |
| 163 | +
|
| 164 | + :param field_tags: A list of MARC fields. |
| 165 | + :param subfield_codes: A list of subfield codes to include / exclude |
| 166 | + :param get_880_values: Indicates whether alternate graphic representations |
| 167 | + (field 880) should be sought. |
| 168 | + :param exclude_subfields: A boolean value indicating whether to exclude the |
| 169 | + specified subfield codes. |
| 170 | + :return: A list of values of the specified subfields. |
| 171 | + """ |
| 172 | + values = [] |
| 173 | + for tag in field_tags: |
| 174 | + for marc_field in self.source_metadata.get("marc").get_fields(tag): |
| 175 | + field_880_key = None |
| 176 | + # get 880 field key so we can look up corresponding 880 field |
| 177 | + if get_880_values: |
| 178 | + for subfield in marc_field.subfields: |
| 179 | + if self.subfield_matches(subfield.code, ['6'], False): |
| 180 | + field_880_key = subfield.value.split('-')[1] |
| 181 | + |
| 182 | + # get subfield values, plus any corresponding 880 values |
| 183 | + for index, subfield in enumerate(marc_field.subfields): |
| 184 | + if self.subfield_matches(subfield.code, subfield_codes, exclude_subfields): |
| 185 | + values.append(subfield.value) |
| 186 | + if field_880_key: |
| 187 | + values.append( |
| 188 | + self.marc_880_fields[field_880_key][index].value |
| 189 | + ) |
| 190 | + |
| 191 | + return values |
| 192 | + |
| 193 | + |
| 194 | + def get_marc_leader(self, leader_key: str): |
| 195 | + """ |
| 196 | + Note: This is a stub. Leaving here in case it is needed in other marc mappers. |
| 197 | +
|
| 198 | + Retrieve the value of specified leader key from the MARC metadata. |
| 199 | + See: https://www.loc.gov/marc/bibliographic/bdleader.html |
| 200 | +
|
| 201 | + We're not accommodating passing a slice, which pymarc can handle should it be necessary |
| 202 | +
|
| 203 | + :param leader_key: The key of the leader field to retrieve. |
| 204 | + :type leader_key: str |
| 205 | + :return: The value of the specified leader key. |
| 206 | + :rtype: str or None |
| 207 | + """ |
| 208 | + leader = self.source_metadata.get("marc").leader |
| 209 | + |
| 210 | + if str(leader_key).isnumeric(): |
| 211 | + return leader[int(leader_key)] |
| 212 | + |
| 213 | + if hasattr(leader, leader_key): |
| 214 | + return leader.getattr(leader_key, "") |
| 215 | + |
| 216 | + return "" |
| 217 | + |
| 218 | + |
| 219 | + def map_is_shown_at(self): |
| 220 | + field_001 = self.get_marc_control_field("001") |
| 221 | + if field_001: |
| 222 | + return "https://digicoll.lib.berkeley.edu/record/" + field_001 |
| 223 | + |
| 224 | + def map_is_shown_by(self): |
| 225 | + field_001 = self.get_marc_control_field("001") |
| 226 | + if field_001: |
| 227 | + return ("https://digicoll.lib.berkeley.edu/nanna/thumbnail/v2/" + |
| 228 | + field_001 + "?redirect=1") |
| 229 | + |
| 230 | + def map_spatial(self) -> list: |
| 231 | + f651 = self.get_marc_data_fields(["651"], ["a"]) |
| 232 | + additional_fields = [str(i) for i in [600, 630, 650, 651] + list(range(610, 620)) |
| 233 | + + list(range(653, 659)) + list(range(690, 700))] |
| 234 | + values = f651 + self.get_marc_data_fields(additional_fields, ["z"]) |
| 235 | + |
| 236 | + # Stripping off trailing period |
| 237 | + return [value[0:-1] if value[-1] == "." else value for value in values] |
| 238 | + |
| 239 | + def map_subject(self) -> list: |
| 240 | + fields = [str(i) for i in [600, 630, 650, 651] + list(range(610, 620)) |
| 241 | + + list(range(653, 659)) + list(range(690, 700))] |
| 242 | + return [{"name": s} for s in |
| 243 | + self.get_marc_data_fields(fields, ["2"], exclude_subfields=True)] |
| 244 | + |
| 245 | + def map_description(self) -> list: |
| 246 | + field_range = [str(i) for i in range(500, 600) if i != 538 and i != 540] |
| 247 | + |
| 248 | + return self.get_marc_data_fields(field_range, ["a"]) |
| 249 | + |
| 250 | + def map_relation(self) -> list: |
| 251 | + field_range = [str(i) for i in range(760, 788)] # Up to 787 |
| 252 | + |
| 253 | + self.get_marc_data_fields(field_range) |
| 254 | + |
| 255 | + def map_extent(self) -> list: |
| 256 | + """ |
| 257 | + Retrieves the extent values from MARC field 300 and 340. |
| 258 | +
|
| 259 | + :return: A list of extent values. |
| 260 | + """ |
| 261 | + return self.get_marc_data_fields(["300"]) + self.get_marc_data_fields(["340"], ["b"]) |
| 262 | + |
| 263 | + def map_title(self) -> list: |
| 264 | + # 245, all subfields except c |
| 265 | + f245 = self.get_marc_data_fields(["245"], ["c"], exclude_subfields=True) |
| 266 | + |
| 267 | + # 242, all subfields |
| 268 | + f242 = self.get_marc_data_fields(["242"]) |
| 269 | + |
| 270 | + # 240, all subfields |
| 271 | + f240 = self.get_marc_data_fields(["240"]) |
| 272 | + |
| 273 | + return f245 + f242 + f240 |
| 274 | + |
| 275 | + |
| 276 | +class UcbTindValidator(Validator): |
| 277 | + |
| 278 | + def setup(self): |
| 279 | + self.add_validatable_fields([ |
| 280 | + { |
| 281 | + "field": "is_shown_by", |
| 282 | + "validations": [ |
| 283 | + UcbTindValidator.str_match_ignore_url_protocol, |
| 284 | + Validator.verify_type(str) |
| 285 | + ] |
| 286 | + }, |
| 287 | + { |
| 288 | + "field": "is_shown_at", |
| 289 | + "validations": [ |
| 290 | + UcbTindValidator.str_match_ignore_url_protocol, |
| 291 | + Validator.verify_type(str) |
| 292 | + ] |
| 293 | + } |
| 294 | + ]) |
| 295 | + |
| 296 | + @staticmethod |
| 297 | + def str_match_ignore_url_protocol(validation_def: dict, |
| 298 | + rikolti_value: Any, |
| 299 | + comparison_value: Any) -> None: |
| 300 | + if rikolti_value == comparison_value: |
| 301 | + return |
| 302 | + |
| 303 | + if comparison_value and comparison_value.startswith('http'): |
| 304 | + comparison_value = comparison_value.replace('http', 'https') |
| 305 | + |
| 306 | + if not rikolti_value == comparison_value: |
| 307 | + return "Content mismatch" |
| 308 | + |
| 309 | + |
| 310 | +class UcbTindVernacular(Vernacular): |
| 311 | + record_cls = UcbTindRecord |
| 312 | + validator = UcbTindValidator |
| 313 | + |
| 314 | + def parse(self, api_response): |
| 315 | + api_response = bytes(api_response, "utf-8") |
| 316 | + namespace = {"oai2": "http://www.openarchives.org/OAI/2.0/"} |
| 317 | + page = etree.XML(api_response) |
| 318 | + |
| 319 | + request_elem = page.find("oai2:request", namespace) |
| 320 | + if request_elem is not None: |
| 321 | + request_url = request_elem.text |
| 322 | + else: |
| 323 | + request_url = None |
| 324 | + |
| 325 | + record_elements = ( |
| 326 | + page |
| 327 | + .find("oai2:ListRecords", namespace) |
| 328 | + .findall("oai2:record", namespace) |
| 329 | + ) |
| 330 | + |
| 331 | + records = [] |
| 332 | + for record_element in record_elements: |
| 333 | + sickle_rec = models.Record(record_element) |
| 334 | + sickle_header = sickle_rec.header |
| 335 | + if not sickle_header.deleted: |
| 336 | + marc_record_element = record_element.find( |
| 337 | + ".//marc:record", |
| 338 | + namespaces={"marc": "http://www.loc.gov/MARC21/slim"} |
| 339 | + ) |
| 340 | + marc_record_string = etree.tostring( |
| 341 | + marc_record_element,encoding="utf-8").decode("utf-8") |
| 342 | + |
| 343 | + # Wrap the record in collection so pymarc can read it |
| 344 | + marc_collection_xml_full = \ |
| 345 | + ('<collection xmlns="http://www.loc.gov/MARC21/slim">' |
| 346 | + f'{marc_record_string}' |
| 347 | + '</collection>') |
| 348 | + |
| 349 | + record = { |
| 350 | + "datestamp": sickle_header.datestamp, |
| 351 | + "id": sickle_header.identifier, |
| 352 | + "request_url": request_url, |
| 353 | + "marc": parse_xml_to_array(StringIO(marc_collection_xml_full))[0] |
| 354 | + } |
| 355 | + records.append(record) |
| 356 | + |
| 357 | + return self.get_records(records) |
0 commit comments