-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path10.dedup
59 lines (51 loc) · 1.57 KB
/
10.dedup
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
#!/bin/bash
#SBATCH --job-name=dedup
#SBATCH --partition="small"
#SBATCH --time=72:00:00
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=128
#SBATCH --mem-per-cpu=1750
#SBATCH --output=logs/%x.out
set -euo pipefail
shopt -s failglob
source .env
IFS=" " read -ra args <<< $HQ_ENTRY
TMPSFX=tmp_$$
INPUT_DIR=${args[0]}
OUTPUT_DIR=`echo $INPUT_DIR | sed 's/batches/dedup/'`
JSON_FILES=`ls -1 $INPUT_DIR/batch_*.jsonl.zst`
mkdir -p $OUTPUT_DIR
echo $JSON_FILES >&2
# Read the split output, compress it to temp, then move
compress-batch(){
local file=$1
# Remove 0s prefix
local name=$(echo $file | sed -E "s/\_0+/_/g")
# compress stdin, write to a temp
zstd -T$SLURM_CPUS_ON_NODE -10 >$name.jsonl.zst.tmp
# remove temp suffix
mv $name.jsonl.zst.tmp $name.jsonl.zst
}
export -f compress-batch
# check if the task has already done
temps=$(find $OUTPUT_DIR -name "batch_*.jsonl.zst.tmp" | wc -l)
if [[ $temps -eq 0 ]] && ! [ -z "$(ls $OUTPUT_DIR)" ]; then
echo "Task '$HQ_ENTRY' already done" >&2
exit 0
fi
sing="singularity exec --bind $(pwd -P) --bind $INPUT_DIR --pwd $(pwd -P) monotextor.sif"
CLUSTER_FILES=""
if [ -f $INPUT_DIR/clusters.zst ]; then
# Single clusters file
CLUSTER_FILES=$INPUT_DIR/clusters.zst
$sing dedup $CLUSTER_FILES $JSON_FILES
#rm $CLUSTER_FILES
else
# Distributed index, multiple clusters files
CLUSTER_FILES=$INPUT_DIR/clusters.[0-9]*.zst
$sing dedup <(cat $CLUSTER_FILES) $JSON_FILES
#rm $CLUSTER_FILES
fi | split - \
--numeric-suffixes=1 -a 8 -C 120G \
--filter='compress-batch $FILE' \
$OUTPUT_DIR/batch_