Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
sal-uva committed Jan 6, 2025
2 parents c405213 + b4aef77 commit 57b6e22
Show file tree
Hide file tree
Showing 28 changed files with 719 additions and 234 deletions.
17 changes: 17 additions & 0 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,23 @@ def is_rankable(cls, multiple_items=True):
"""
return False

@classmethod
def exclude_followup_processors(cls, processor_type=None):
"""
Used for processor compatibility
To be defined by the child processor if it should exclude certain follow-up processors.
e.g.:
def exclude_followup_processors(cls, processor_type):
if processor_type in ["undesirable-followup-processor"]:
return True
return False
:param str processor_type: Processor type to exclude
:return bool: True if processor should be excluded, False otherwise
"""
return False

@classmethod
def get_csv_parameters(cls, csv_library):
Expand Down
4 changes: 4 additions & 0 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -1320,6 +1320,10 @@ def get_compatible_processors(self, user=None):
if processor.is_from_collector():
continue

own_processor = self.get_own_processor()
if own_processor and own_processor.exclude_followup_processors(processor_type):
continue

# consider a processor compatible if its is_compatible_with
# method returns True *or* if it has no explicit compatibility
# check and this dataset is top-level (i.e. has no parent)
Expand Down
59 changes: 44 additions & 15 deletions datasources/fourcat_import/import_4cat.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import time
import zipfile
from pathlib import Path

from backend.lib.processor import BasicProcessor
from common.lib.exceptions import (QueryParametersException, FourcatException, ProcessorInterruptedException,
Expand Down Expand Up @@ -136,6 +137,7 @@ def process_zip(self):
imported = []
processed_files = 1 # take into account the export.log file
failed_imports = []
primary_dataset_original_log = None
with zipfile.ZipFile(temp_file, "r") as zip_ref:
zip_contents = zip_ref.namelist()

Expand All @@ -151,7 +153,8 @@ def process_zip(self):
parent_child_mapping = {}
for file in metadata_files:
with zip_ref.open(file) as f:
metadata = json.load(f)
content = f.read().decode('utf-8') # Decode the binary content using the desired encoding
metadata = json.loads(content)
if not metadata.get("key_parent"):
primary_dataset_keys.add(metadata.get("key"))
datasets.append(metadata)
Expand All @@ -178,22 +181,27 @@ def process_zip(self):
new_dataset = self.create_dataset(processed_metadata, dataset_key, dataset_key in primary_dataset_keys)
processed_files += 1

# TODO: I am now noticing that we do not update the results_file; it is even more unlikely to collide as it is both a random key and label combined... but...
# Copy the log file
self.halt_and_catch_fire()
log_filename = new_dataset.get_log_path().name
log_filename = Path(metadata["result_file"]).with_suffix(".log").name
if log_filename in zip_contents:
self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
with zip_ref.open(log_filename) as f:
with new_dataset.get_log_path().open("wb") as outfile:
outfile.write(f.read())
content = f.read().decode('utf-8')
if new_dataset.key == self.dataset.key:
# Hold the original log for the primary dataset and add at the end
primary_dataset_original_log = content
else:
new_dataset.log("Original dataset log included below:")
with new_dataset.get_log_path().open("a") as outfile:
outfile.write(content)
processed_files += 1
else:
self.dataset.log(f"Log file not found for dataset {new_dataset.key} (original key {dataset_key}).")

# Copy the results
self.halt_and_catch_fire()
results_filename = new_dataset.get_results_path().name
results_filename = metadata["result_file"]
if results_filename in zip_contents:
self.dataset.update_status(f"Transferring data file for dataset {new_dataset.key}")
with zip_ref.open(results_filename) as f:
Expand All @@ -205,7 +213,6 @@ def process_zip(self):
# first dataset - use num rows as 'overall'
num_rows = metadata["num_rows"]
else:
# TODO: should I just delete the new_dataset here?
self.dataset.log(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
new_dataset.finish_with_error(f"Results file not found for dataset {new_dataset.key} (original key {dataset_key}).")
failed_imports.append(dataset_key)
Expand Down Expand Up @@ -253,6 +260,12 @@ def process_zip(self):
# complete
self.dataset.finish(num_rows)

# Add the original log for the primary dataset
if primary_dataset_original_log:
self.dataset.log("Original dataset log included below:\n")
with self.dataset.get_log_path().open("a") as outfile:
outfile.write(primary_dataset_original_log)


@staticmethod
def process_metadata(metadata):
Expand Down Expand Up @@ -290,6 +303,8 @@ def create_dataset(self, metadata, original_key, primary=False):
# import query in the interface, similar to the workflow for
# other data sources
new_dataset = self.dataset

# Update metadata and file
metadata.pop("key") # key already OK (see above)
self.db.update("datasets", where={"key": new_dataset.key}, data=metadata)

Expand All @@ -315,28 +330,26 @@ def create_dataset(self, metadata, original_key, primary=False):
self.db.insert("datasets", data=metadata)
new_dataset = DataSet(key=metadata["key"], db=self.db, modules=self.modules)

# make sure the dataset path uses the new key and local dataset
# path settings. this also makes sure the log file is created in
# the right place (since it is derived from the results file path)
extension = metadata["result_file"].split(".")[-1]
new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)

new_dataset.update_status("Imported dataset created")
if new_dataset.key != original_key:
# could not use original key because it was already in use
# so update any references to use the new key
self.remapped_keys[original_key] = new_dataset.key
new_dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
self.dataset.update_status(f"Cannot import with same key - already in use on this server. Using key "
f"{new_dataset.key} instead of key {original_key}!")

# refresh object, make sure it's in sync with the database
self.created_datasets.add(new_dataset.key)
new_dataset = DataSet(key=new_dataset.key, db=self.db, modules=self.modules)
current_log = None
if new_dataset.key == self.dataset.key:
# this ensures that the first imported dataset becomes the
# processor's "own" dataset, and that the import logs go to
# that dataset's log file. For later imports, this evaluates to
# False.

# Read the current log and store it; it needs to be after the result_file is updated (as it is used to determine the log file path)
current_log = self.dataset.get_log_path().read_text()
# Update the dataset
self.dataset = new_dataset

# if the key of the parent dataset was changed, change the
Expand All @@ -352,6 +365,19 @@ def create_dataset(self, metadata, original_key, primary=False):
new_dataset.timestamp = int(time.time())
new_dataset.db.commit()

# make sure the dataset path uses the new key and local dataset
# path settings. this also makes sure the log file is created in
# the right place (since it is derived from the results file path)
extension = metadata["result_file"].split(".")[-1]
updated = new_dataset.reserve_result_file(parameters=new_dataset.parameters, extension=extension)
if not updated:
self.dataset.log(f"Could not reserve result file for {new_dataset.key}!")

if current_log:
# Add the current log to the new dataset
with new_dataset.get_log_path().open("a") as outfile:
outfile.write(current_log)

return new_dataset


Expand Down Expand Up @@ -407,6 +433,7 @@ def process_urls(self):
self.halt_and_catch_fire()
try:
self.dataset.update_status(f"Transferring log file for dataset {new_dataset.key}")
# TODO: for the primary, this ends up in the middle of the log as we are still adding to it...
log = SearchImportFromFourcat.fetch_from_4cat(self.base, dataset_key, api_key, "log")
logpath = new_dataset.get_log_path()
new_dataset.log("Original dataset log included below:")
Expand Down Expand Up @@ -660,6 +687,8 @@ def ensure_key(query):
:param dict query: Input from the user, through the front-end
:return str: Desired dataset key
"""
#TODO: Can this be done for the zip method as well? The original keys are in the zip file; we save them after
# this method is called via `after_create`. We could download here and also identify the primary dataset key...
urls = query.get("url", "").split(",")
keys = SearchImportFromFourcat.get_keys_from_urls(urls)
return keys[0]
Expand Down
10 changes: 6 additions & 4 deletions datasources/tiktok/search_tiktok.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ def map_item(post):
# from intercepted API response
user_nickname = post["author"]["uniqueId"]
user_fullname = post["author"]["nickname"]
user_id = post["author"]["id"]
user_thumbnail = post["author"].get("avatarThumb", "")
elif post.get("author"):
# from embedded JSON object
user_nickname = post["author"]
user_fullname = post["nickname"]
user_id = ""
user_thumbnail = ""
else:
user_nickname = ""
user_fullname = ""
user_id = ""
user_thumbnail = ""

# there are various thumbnail URLs, some of them expire later than
# others. Try to get the highest-resolution one that hasn't expired
Expand All @@ -84,13 +84,15 @@ def map_item(post):
"author_followers": post.get("authorStats", {}).get("followerCount", ""),
"author_likes": post.get("authorStats", {}).get("diggCount", ""),
"author_videos": post.get("authorStats", {}).get("videoCount", ""),
"author_avatar": post.get("avatarThumb", ""),
"author_avatar": user_thumbnail,
"body": post["desc"],
"timestamp": datetime.utcfromtimestamp(int(post["createTime"])).strftime('%Y-%m-%d %H:%M:%S'),
"unix_timestamp": int(post["createTime"]),
"is_duet": "yes" if (post.get("duetInfo", {}).get("duetFromId") != "0" if post.get("duetInfo", {}) else False) else "no",
"is_ad": "yes" if post.get("isAd", False) else "no",
"is_paid_partnership": "yes" if post.get("adAuthorization") else "no",
"is_sensitive": "yes" if post.get("maskType") == 3 else "no",
"is_photosensitive": "yes" if post.get("maskType") == 4 else "no",
"music_name": post["music"]["title"],
"music_id": post["music"]["id"],
"music_url": post["music"].get("playUrl", ""),
Expand Down
2 changes: 1 addition & 1 deletion datasources/tiktok_comments/search_tiktok_comments.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def map_item(item):
"post_url": item["share_info"]["url"].split(".html")[0],
"post_body": item["share_info"]["title"],
"comment_url": item["share_info"]["url"],
"is_liked_by_post_author": "yes" if bool(item["author_pin"]) else "no",
"is_liked_by_post_author": "yes" if bool(item.get("author_pin")) else "no",
"is_sticky": "yes" if bool(item["stick_position"]) else "no",
"is_comment_on_comment": "no" if bool(item["reply_id"] == "0") else "yes",
"language_guess": item["comment_language"]
Expand Down
2 changes: 1 addition & 1 deletion processors/audio/audio_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def process(self):
staging_area = self.dataset.get_staging_area()
output_dir = self.dataset.get_staging_area()

total_possible_videos = max_files if max_files != 0 else self.source_dataset.num_rows - 1 # for the metadata file that is included in archives
total_possible_videos = max(max_files if max_files != 0 else self.source_dataset.num_rows - 1, 1) # for the metadata file that is included in archives
processed_videos = 0

self.dataset.update_status("Extracting video audio")
Expand Down
22 changes: 14 additions & 8 deletions processors/conversion/export_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ExportDatasets(BasicProcessor):
type = "export-datasets" # job type ID
category = "Conversion" # category
title = "Export Dataset and All Analyses" # title displayed in UI
description = "Creates a ZIP file containing the dataset and all analyses to be archived and uploaded to a 4CAT instance in the future. Automatically expires after 1 day, after which you must run again." # description displayed in UI
description = "Creates a ZIP file containing the dataset and all analyses to be archived and uploaded to a 4CAT instance in the future. Filters are *not* included and must be exported separately as new datasets. Results automatically expire after 1 day, after which you must run again." # description displayed in UI
extension = "zip" # extension of result file, used internally and in UI

@classmethod
Expand All @@ -40,6 +40,11 @@ def process(self):
This takes a CSV file as input and writes the same data as a JSON file
"""
self.dataset.update_status("Collecting dataset and all analyses")
primary_dataset = self.dataset.top_parent()
if not primary_dataset.is_finished():
# This ought not happen as processors (i.e., this processor) should only be available for finished datasets
self.dataset.finish_with_error("You cannot export unfinished datasets; please wait until dataset is finished to export.")
return

results_path = self.dataset.get_staging_area()

Expand All @@ -52,25 +57,26 @@ def process(self):

try:
dataset = DataSet(key=dataset_key, db=self.db)
# TODO: these two should fail for the primary dataset, but should they fail for the children too?
except DataSetException:
self.dataset.finish_with_error("Dataset not found.")
return
self.dataset.update_status(f"Dataset {dataset_key} not found: it may have been deleted prior to export; skipping.")
failed_exports.append(dataset_key)
continue
if not dataset.is_finished():
self.dataset.finish_with_error("You cannot export unfinished datasets.")
return
self.dataset.update_status(f"Dataset {dataset_key} not finished: cannot export unfinished datasets; skipping.")
failed_exports.append(dataset_key)
continue

# get metadata
metadata = dataset.get_metadata()
if metadata["num_rows"] == 0:
self.dataset.update_status(f"Skipping empty dataset {dataset_key}")
self.dataset.update_status(f"Dataset {dataset_key} has no results; skipping.")
failed_exports.append(dataset_key)
continue

# get data
data_file = dataset.get_results_path()
if not data_file.exists():
self.dataset.finish_with_error(f"Dataset {dataset_key} has no data; skipping.")
self.dataset.update_status(f"Dataset {dataset_key} has no data file; skipping.")
failed_exports.append(dataset_key)
continue

Expand Down
7 changes: 4 additions & 3 deletions processors/conversion/merge_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,23 @@ def is_compatible_with(cls, module=None, user=None):
return module.get_extension() in ("csv", "ndjson") and (module.is_from_collector())

@staticmethod
def get_dataset_from_url(url, db):
def get_dataset_from_url(url, db, modules=None):
"""
Get dataset object based on dataset URL
Uses the last part of the URL path as the Dataset ID
:param str url: Dataset URL
:param db: Database handler (to retrieve metadata)
:param modules: Modules handler (pass through to DataSet)
:return DataSet: The dataset
"""
if not url:
raise DataSetException("URL empty or not provided")

source_url = ural.normalize_url(url)
source_key = source_url.split("/")[-1]
return DataSet(key=source_key, db=db)
return DataSet(key=source_key, db=db, modules=modules)

def process(self):
"""
Expand All @@ -96,7 +97,7 @@ def process(self):
continue

try:
source_dataset = self.get_dataset_from_url(source_dataset_url, self.db)
source_dataset = self.get_dataset_from_url(source_dataset_url, self.db, modules=self.modules)
except DataSetException:
return self.dataset.finish_with_error(f"Dataset URL '{source_dataset_url} not found - cannot perform "
f"merge.")
Expand Down
8 changes: 8 additions & 0 deletions processors/networks/cotag_network.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,13 @@ class CoTaggerPreset(ProcessorPreset):
"default": True,
"help": "Convert tags to lowercase",
"tooltip": "Merges tags with varying cases"
},
"ignore-tags": {
"type": UserInput.OPTION_TEXT,
"default": "",
"help": "Tags to ignore",
"tooltip": "Separate with commas if you want to ignore multiple tags. Do not include the '#' "
"character."
}
}

Expand Down Expand Up @@ -72,6 +79,7 @@ def get_processor_pipeline(self):
"split-comma": True,
"categorise": True,
"allow-loops": False,
"ignore-nodes": self.parameters.get("ignore-tags", ""),
"to-lowercase": self.parameters.get("to-lowercase", True)
}
}
Expand Down
Loading

0 comments on commit 57b6e22

Please sign in to comment.