diff --git a/blazegraph-migration/enrich_triples.sh b/blazegraph-migration/enrich_triples.sh index 9a6ad81bc8..c8693c219c 100644 --- a/blazegraph-migration/enrich_triples.sh +++ b/blazegraph-migration/enrich_triples.sh @@ -4,7 +4,6 @@ LOG_FILE="enrich_parallel_$(date +%Y%m%d_%H%M%S).log" CURL_TIMEOUT=300 MAX_RETRIES=3 CHUNK_SIZE=50000 -FAIL_LIMIT_PERCENT=1.0 log_message() { local level="$1" @@ -35,16 +34,33 @@ generate_enriched_chunks() { log_message "INFO" "Streaming and enriching $input_file into chunked enriched files" + local total_input_triples + total_input_triples=$(gunzip -c "$input_file" | wc -l) + log_message "INFO" "Total triples to process: $total_input_triples" + local chunk_count=0 local line_count=0 + local processed_input_triples=0 local enriched_chunk_file="$chunk_dir/enriched_chunk_$(printf "%05d" $chunk_count).nq" + local bar_width=50 gunzip -c "$input_file" | while IFS= read -r line || [[ -n "$line" ]]; do - graph_iri=$(echo "$line" | awk '{print $4}' | sed 's/[<>]//g') + graph_iri=$(awk ' + { + in_string = 0 + graph = "" + for (i = 1; i <= NF; i++) { + if ($i ~ /^"/) in_string = 1 + if (in_string && $i ~ /"$/ && substr($i, length($i), 1) != "\\") in_string = 0 + if (!in_string && $i ~ /^<.*>$/) graph = $i + } + gsub(/[<>]/, "", graph) + print graph + }' <<< "$line") IFS='/' read -ra parts <<< "$graph_iri" if [[ ${#parts[@]} -ge 5 ]]; then - network_prefix="${parts[0]}" # did:dkg:otp:2043 + network_prefix="${parts[0]}" contract="${parts[1]}" collection_id="${parts[2]}" asset_id="${parts[3]}" @@ -61,17 +77,34 @@ generate_enriched_chunks() { ((line_count+=3)) fi + ((processed_input_triples++)) + if (( line_count >= CHUNK_SIZE * 3 )); then ((chunk_count++)) enriched_chunk_file="$chunk_dir/enriched_chunk_$(printf "%05d" $chunk_count).nq" line_count=0 fi + + + if (( processed_input_triples % 1000 == 0 )); then + local progress=$(( (processed_input_triples * bar_width) / total_input_triples )) + local percent=$(( (processed_input_triples * 100) / total_input_triples )) + local progress_bar=$(printf "%-${bar_width}s" "#" | tr ' ' '#') + printf "\r[%-${bar_width}.${progress}s] %d%% (%d/%d triples processed)\n" "$progress_bar" "$percent" "$processed_input_triples" "$total_input_triples" + fi done - echo "$((chunk_count + 1))" > "$input_dir/.total_chunks_created" - log_message "INFO" "Enriched chunks generated: $((chunk_count + 1))" + echo + + enriched_chunk_files=("$chunk_dir"/enriched_chunk_*.nq) + total_enriched_chunks=${#enriched_chunk_files[@]} + + echo "$total_enriched_chunks" > "$input_dir/.total_chunks_created" + log_message "INFO" "Enriched chunks generated: $total_enriched_chunks" } + + process_chunk() { local chunk_file="$1" local chunk_num="$2" @@ -170,6 +203,22 @@ for folder in "${target_folders[@]}"; do log_message "INFO" "Processing folder: $folder" done_marker=".enrichment_migration_done_$(basename "$folder")" + chunk_dir="$folder/chunks" + + retry_mode=false + if [ -f "$done_marker" ]; then + marker_val=$(cat "$done_marker") + if [ "$marker_val" == "0" ]; then + shopt -s nullglob + remaining_chunks=("$chunk_dir"/enriched_chunk_*.nq) + shopt -u nullglob + if [ ${#remaining_chunks[@]} -gt 0 ]; then + retry_mode=true + log_message "INFO" "Retry mode enabled — retrying failed chunks" + process_chunks "$folder" + fi + fi + fi if [ ! -f "$folder/$folder/data.nq.gz" ]; then log_message "ERROR" "Missing file: $folder/$folder/data.nq.gz" @@ -178,37 +227,25 @@ for folder in "${target_folders[@]}"; do continue fi - generate_enriched_chunks "$folder" - process_chunks "$folder" - - total_chunks=$(cat "$folder/.total_chunks_created" 2>/dev/null) - rm -f "$folder/.total_chunks_created" + if [ "$retry_mode" = false ]; then + generate_enriched_chunks "$folder" + process_chunks "$folder" + fi shopt -s nullglob - failed_chunks=("$folder/chunks"/enriched_chunk_*.nq) + remaining_failed_chunks=("$chunk_dir"/enriched_chunk_*.nq) shopt -u nullglob - failed_count=${#failed_chunks[@]} - - if (( total_chunks == 0 )); then - log_message "WARN" "No enriched chunks were generated for $folder" - echo "0" > "$done_marker" - overall_success=0 - continue - fi - - fail_ratio=$(awk "BEGIN { printf \"%.2f\", ($failed_count / $total_chunks) * 100 }") + remaining_failed_count=${#remaining_failed_chunks[@]} - if (( $(awk "BEGIN { print ($fail_ratio > $FAIL_LIMIT_PERCENT) }") )); then - log_message "ERROR" "$failed_count/$total_chunks chunks failed (${fail_ratio}%) — above threshold" + if (( remaining_failed_count > 0 )); then + log_message "ERROR" "$remaining_failed_count chunks still remain after retries" echo "0" > "$done_marker" overall_success=0 else - if (( failed_count > 0 )); then - log_message "WARN" "$failed_count/$total_chunks chunks failed (${fail_ratio}%), but within allowed $FAIL_LIMIT_PERCENT%" - fi echo "1" > "$done_marker" log_message "INFO" "Migration success marker written to $done_marker" fi + done job_pool_shutdown diff --git a/blazegraph-migration/paranet_migration.sh b/blazegraph-migration/paranet_migration.sh index 04fadd3590..7a7eba5d84 100644 --- a/blazegraph-migration/paranet_migration.sh +++ b/blazegraph-migration/paranet_migration.sh @@ -46,7 +46,18 @@ generate_enriched_chunks() { gunzip -c "$input_file" | while IFS= read -r line || [[ -n "$line" ]]; do echo "$line" >> "$chunk_file" - named_graph=$(echo "$line" | awk '{print $4}' | sed 's/[<>]//g') + named_graph=$(awk ' + { + in_string = 0 + graph = "" + for (i = 1; i <= NF; i++) { + if ($i ~ /^"/) in_string = 1 + if (in_string && $i ~ /"$/ && substr($i, length($i), 1) != "\\") in_string = 0 + if (!in_string && $i ~ /^<.*>$/) graph = $i + } + gsub(/[<>]/, "", graph) + print graph + }' <<< "$line") echo "<${ual}> <${named_graph}> <${ual}> ." >> "$chunk_file"