diff --git a/unfurl/parsers/parse_tiktok.py b/unfurl/parsers/parse_tiktok.py index 712e689..05a0b68 100644 --- a/unfurl/parsers/parse_tiktok.py +++ b/unfurl/parsers/parse_tiktok.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2025 Ryan Benson # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,7 +13,7 @@ # limitations under the License. from datetime import datetime -from unfurl.utils import extract_bits, set_bits +from unfurl import utils import logging log = logging.getLogger(__name__) @@ -51,11 +51,11 @@ def parse_tiktok_id(unfurl, node, on_tiktok=True): # Valid TikTok IDs should be ints, so this wasn't one return - timestamp = extract_bits(tiktok_id, 32, 64) - milliseconds = extract_bits(tiktok_id, 23, 32) - sequence = extract_bits(tiktok_id, 14, 19) - entity_type = extract_bits(tiktok_id, 8, 12) - machine_id = extract_bits(tiktok_id, 0, 8) + timestamp = utils.extract_bits(tiktok_id, 32, 64) + milliseconds = utils.extract_bits(tiktok_id, 23, 32) + sequence = utils.extract_bits(tiktok_id, 14, 19) + entity_type = utils.extract_bits(tiktok_id, 8, 12) + machine_id = utils.extract_bits(tiktok_id, 0, 8) entity_type_map = { 0: 'User Account', @@ -70,7 +70,7 @@ def parse_tiktok_id(unfurl, node, on_tiktok=True): node.hover = 'TikTok IDs are time-based IDs similar to those of Twitter Snowflakes.' unfurl.add_to_queue( - data_type='epoch-seconds', key=None, value=float(f"{timestamp}.{milliseconds}"), + data_type='epoch-milliseconds', key=None, value=int(f"{timestamp}{milliseconds}"), label=f'Timestamp: {timestamp}.{milliseconds:03d}', # Ref: https://arxiv.org/pdf/2504.13279 hover='The leading 42 bits in a TikTok ID are a timestamp, thought to represent ' @@ -94,27 +94,27 @@ def parse_tiktok_id(unfurl, node, on_tiktok=True): parent_id=node.node_id, incoming_edge_config=tiktok_edge) -def create_tiktok_id(timestamp=None, days_ahead=None, sequence=0, machine_id=1, entity_type='video'): - # Neither are set; make the timestamp now. - if not timestamp and not days_ahead: - timestamp = int(datetime.now().timestamp()) - # timestamp is a string; parse it to epoch seconds - elif isinstance(timestamp, str): - timestamp = int(datetime.fromisoformat(timestamp).timestamp()) - # Make the timestamp now + days_ahead - elif not timestamp and days_ahead: - timestamp = int(datetime.now().timestamp()) + (days_ahead * 86400) +def create_tiktok_id(timestamp=None, days_ahead=None, sequence=1, machine_id=1, entity_type='video'): + id_timestamp = utils.create_epoch_seconds_timestamp(iso_timestamp=timestamp, days_ahead=days_ahead) - timestamp_bits = set_bits(timestamp, 32) - sequence_bits = set_bits(sequence, 14) - machine_id_bits = set_bits(machine_id, 0) - entity_type_bits = set_bits(13, 8) + entity_type_map = { + 'user_account': 0, + 'device': 6, + 'live_session': 11, + 'video': 13 + } + entity_id = entity_type_map.get(entity_type, entity_type_map['video']) + + timestamp_bits = utils.set_bits(id_timestamp, 32) + sequence_bits = utils.set_bits(sequence, 14) + machine_id_bits = utils.set_bits(machine_id, 0) + entity_type_bits = utils.set_bits(entity_id, 8) return int(timestamp_bits + sequence_bits + machine_id_bits + entity_type_bits) def run(unfurl, node): - min_reasonable_date = create_tiktok_id('2017-12-01T00:00:00') - max_reasonable_date = create_tiktok_id(days_ahead=365) + min_reasonable_id = create_tiktok_id('2017-12-01T00:00:00') + max_reasonable_id = create_tiktok_id(days_ahead=365) if node.data_type == 'url.path.segment': if 'tiktok.com' in unfurl.find_preceding_domain(node): @@ -129,10 +129,10 @@ def run(unfurl, node): parent_id=node.node_id, incoming_edge_config=tiktok_edge) # Check if TikTok ID timestamp would be "reasonable" - elif unfurl.check_if_int_between(node.value, min_reasonable_date, max_reasonable_date): + elif unfurl.check_if_int_between(node.value, min_reasonable_id, max_reasonable_id): parse_tiktok_id(unfurl, node) # If it's the "root" node and a plausible TikTok ID, parse it. # This case covers someone parsing just a TikTok ID, not a full URL. - elif node.node_id == 1 and unfurl.check_if_int_between(node.value, min_reasonable_date, max_reasonable_date): + elif node.node_id == 1 and unfurl.check_if_int_between(node.value, min_reasonable_id, max_reasonable_id): parse_tiktok_id(unfurl, node, on_tiktok=False) diff --git a/unfurl/parsers/parse_twitter.py b/unfurl/parsers/parse_twitter.py index 59161ae..c9ea8ff 100644 --- a/unfurl/parsers/parse_twitter.py +++ b/unfurl/parsers/parse_twitter.py @@ -1,4 +1,4 @@ -# Copyright 2019 Google LLC +# Copyright 2025 Ryan Benson # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -70,8 +70,8 @@ def parse_twitter_snowflake(unfurl, node, encoding_type='integer', on_twitter=Tr f'Sequence number should be between 0 and 4096; got {sequence}' # Since we are trying to parse things that might not be valid, make sure the decoded - # timestamp is "reasonable" (between 2010-11 (the Snowflake epoch) and 2025-03 - if not 1288834974657 < timestamp < 1741000800000: + # timestamp is "reasonable" (between 2010-11 (the Snowflake epoch) and 2030-01 + if not 1288834974657 < timestamp < 1893456000000: return except Exception as e: @@ -107,13 +107,24 @@ def parse_twitter_snowflake(unfurl, node, encoding_type='integer', on_twitter=Tr hover='For every ID that is generated, this number is incremented and rolls over every 4096', parent_id=node.node_id, incoming_edge_config=twitter_snowflake_edge) +def create_twitter_id(timestamp=None, days_ahead=None, sequence=1, machine_id=1): + id_timestamp = utils.create_epoch_seconds_timestamp(iso_timestamp=timestamp, days_ahead=days_ahead, offset=1288834974.657) + + # Multiply the timestamp by 1000, as Twitter used a millisecond timestamp + timestamp_bits = utils.set_bits(id_timestamp*1000, 22) + machine_id_bits = utils.set_bits(machine_id, 12) + sequence_bits = utils.set_bits(sequence, 0) + + return int(timestamp_bits + machine_id_bits + sequence_bits) def run(unfurl, node): preceding_domain = unfurl.find_preceding_domain(node) if preceding_domain in ['twitter.com', 'mobile.twitter.com', 'x.com', 'mobile.x.com']: - # Make sure potential snowflake is reasonable: between 2015-02-01 & 2027-06-18 + # Make sure potential snowflake is reasonable: between 2015-02-01 & a year from now + min_reasonable_id = create_twitter_id('2015-02-01T00:00:00') + max_reasonable_id = create_twitter_id(days_ahead=365) if node.data_type == 'url.path.segment' and \ - unfurl.check_if_int_between(node.value, 261675293291446272, 2200000000000000001): + unfurl.check_if_int_between(node.value, min_reasonable_id, max_reasonable_id): parse_twitter_snowflake(unfurl, node) # Based on information found in a Javascript file on Twitter's website. Thanks 2*yo (https://github.com/2xyo)! diff --git a/unfurl/utils.py b/unfurl/utils.py index 79a77ad..4b273c2 100644 --- a/unfurl/utils.py +++ b/unfurl/utils.py @@ -17,6 +17,7 @@ import ipaddress import re import textwrap +from datetime import datetime from typing import Union long_int_re = re.compile(r'\d{8,}') @@ -71,6 +72,33 @@ def wrap_hover_text(hover_text: Union[str, None]) -> Union[str, None]: return '
'.join(textwrap.wrap(hover_text, width=60)) +def create_epoch_seconds_timestamp(iso_timestamp: str | None = None, days_ahead: int | None = None, offset: int | float = 0) -> int: + """ + Create a timestamp (number of seconds since Unix epoch) from either an ISO 8601-formatted timestamp string or for + some number of days in the future. Optionally, an offset (in seconds) can be provided that will be subtracted + from the return timestamp. + + :param iso_timestamp: An ISO 8601-formatted timestamp string (ex: 2015-02-01T00:00:00) + :param days_ahead: Number of days ahead the timestamp should be created for (ex: 365) + :param offset: The offset in seconds from the Unix epoch + :return: An integer timestamp (in seconds) + """ + + # Neither are set; make the timestamp now. + if not iso_timestamp and not days_ahead: + timestamp = int(datetime.now().timestamp()) + # timestamp is a string; parse it to epoch seconds + elif iso_timestamp: + timestamp = int(datetime.fromisoformat(iso_timestamp).timestamp()) + # Make the timestamp now + days_ahead + elif not iso_timestamp and days_ahead: + timestamp = int(datetime.now().timestamp()) + (days_ahead * 86400) + else: + raise ValueError('Invalid options passed') + + return int(timestamp - offset) + + def extract_bits(identifier: int, start: int, end: int) -> int: """ Extract a subset of bits from an integer based on specified start and