-
Notifications
You must be signed in to change notification settings - Fork 123
/
Copy pathmain.rs
1634 lines (1374 loc) · 60 KB
/
main.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Copyright 2021 Google LLC
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* Create and use suffix arrays for deduplicating language model datasets.
*
* A suffix array A for a sequence S is a datastructure that contains all
* suffixes of S in sorted order. To be space efficient, instead of storing
* the actual suffix, we just store the pointer to the start of the suffix.
* To be time efficient, it uses fancy algorithms to not require quadratic
* (or worse) work. If we didn't care about either, then we could literally
* just define (in python)
* A = sorted(S[i:] for i in range(len(S)))
*
* Suffix arrays are amazing because they allow us to run lots of string
* queries really quickly, while also only requiring an extra 8N bytes of
* storage (one 64-bit pointer for each byte in the sequence).
*
* This code is designed to work with Big Data (TM) and most of the
* complexity revolves around the fact that we do not require the
* entire suffix array to fit in memory. In order to keep things managable,
* we *do* require that the original string fits in memory. However, even
* the largest language model datasets (e.g., C4) are a few hundred GB
* which on todays machines does fit in memory.
*
* With all that amazing stuff out of the way, just a word of warning: this
* is the first program I've ever written in rust. I still don't actually
* understand what borrowing something means, but have found that if I
* add enough &(&&x.copy()).clone() then usually the compiler just loses
* all hope in humanity and lets me do what I want. I apologize in advance
* to anyone actually does know rust and wants to lock me in a small room
* with the Rust Book by Klabnik & Nichols until I repent for my sins.
* (It's me, two months in the future. I now more or less understand how
* to borrow. So now instead of the code just being all awful, you'll get
* a nice mix of sane rust and then suddenly OH-NO-WHAT-HAVE-YOU-DONE-WHY!?!)
*/
use std::path::Path;
use std::time::Instant;
use std::fs;
use std::io::Read;
use std::io::BufReader;
use std::fs::File;
use std::io::prelude::*;
use std::cmp::Reverse;
use std::convert::TryInto;
extern crate filebuffer;
extern crate zstd;
extern crate crossbeam;
extern crate clap;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use clap::{Parser, Subcommand};
mod table;
#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Args {
#[clap(subcommand)]
command: Commands,
}
#[derive(Subcommand, Debug)]
enum Commands {
#[clap(arg_required_else_help = true)]
Make {
#[clap(short, long)]
data_file: String,
},
MakePart {
#[clap(short, long)]
data_file: String,
#[clap(short, long)]
start_byte: usize,
#[clap(short, long)]
end_byte: usize,
},
CountOccurrences {
#[clap(short, long)]
data_file: String,
#[clap(short, long)]
query_file: String,
#[clap(short, long)]
print_location: bool,
#[clap(short, long)]
load_disk: bool,
},
CountOccurrencesMulti {
#[clap(short, long)]
data_file: String,
#[clap(short, long)]
query_file: String,
#[clap(short, long)]
load_disk: bool,
},
FindTrainingData {
#[clap(short, long)]
data_file: String,
#[clap(short, long)]
query_file: Vec<String>,
},
SelfSimilar {
#[clap(short, long)]
data_file: String,
#[clap(short, long)]
length_threshold: usize,
#[clap(short, long, default_value_t = 0)]
frequency_threshold: usize,
#[clap(short, long)]
only_save_one: bool,
#[clap(short, long)]
cache_dir: String,
#[clap(short, long, default_value_t = 8)]
num_threads: i64,
},
AcrossSimilar {
#[clap(long)]
data_file_1: String,
#[clap(long)]
data_file_2: String,
#[clap(short, long)]
length_threshold: usize,
#[clap(short, long)]
cache_dir: String,
#[clap(short, long, default_value_t = 8)]
num_threads: i64,
},
Merge {
#[clap(short, long)]
suffix_path: Vec<String>,
#[clap(short, long)]
output_file: String,
#[clap(short, long, default_value_t = 8)]
num_threads: i64,
},
Collect {
#[clap(short, long)]
data_file: String,
#[clap(short, long)]
cache_dir: String,
#[clap(short, long)]
length_threshold: u64,
}
}
/* Convert a uint64 array to a uint8 array.
* This doubles the memory requirements of the program, but in practice
* we only call this on datastructures that are smaller than our assumed
* machine memory so it works.
*/
pub fn to_bytes(input: &[u64], size_width: usize) -> Vec<u8> {
let mut bytes = Vec::with_capacity(size_width * input.len());
for value in input {
bytes.extend(&value.to_le_bytes()[..size_width]);
}
bytes
}
/* Convert a uint8 array to a uint64. Only called on (relatively) small files. */
pub fn from_bytes(input: Vec<u8>, size_width: usize) -> Vec<u64> {
println!("S {}", input.len());
assert!(input.len() % size_width == 0);
let mut bytes:Vec<u64> = Vec::with_capacity(input.len()/size_width);
let mut tmp = [0u8; 8];
// todo learn rust macros, hope they're half as good as lisp marcos
// and if they are then come back and optimize this
for i in 0..input.len()/size_width {
tmp[..size_width].copy_from_slice(&input[i*size_width..i*size_width+size_width]);
bytes.push(u64::from_le_bytes(tmp));
}
bytes
}
/* For a suffix array, just compute A[i], but load off disk because A is biiiiiiigggggg. */
fn table_load_disk(table:&mut BufReader<File>,
index: usize,
size_width: usize) -> usize{
table.seek(std::io::SeekFrom::Start ((index*size_width) as u64)).expect ("Seek failed!");
let mut tmp = [0u8; 8];
table.read_exact(&mut tmp[..size_width]).unwrap();
return u64::from_le_bytes(tmp) as usize;
}
/* Binary search to find where query happens to exist in text */
fn off_disk_position(text: &[u8], table: &mut BufReader<File>,
query: &[u8], size_width: usize) -> usize {
let (mut left, mut right) = (0, text.len());
while left < right {
let mid = (left + right) / 2;
if query < &text[table_load_disk(table, mid, size_width)..] {
right = mid;
} else {
left = mid + 1;
}
}
left
}
/*
* We're going to work with suffix arrays that are on disk, and we often want
* to stream them top-to-bottom. This is a datastructure that helps us do that:
* we read 1MB chunks of data at a time into the cache, and then fetch new data
* when we reach the end.
*/
struct TableStream {
file: BufReader<File>,
cache: [u8; 8],
size_width: usize
}
/* Make a table from a file path and a given offset into the table */
fn make_table(path: std::string::String,
offset: usize,
size_width: usize) -> TableStream {
let mut table = TableStream {
file: std::io::BufReader::with_capacity(1024*1024, fs::File::open(path).unwrap()),
cache: [0u8; 8],
size_width: size_width
};
table.file.seek (std::io::SeekFrom::Start ((offset*size_width) as u64)).expect ("Seek failed!");
return table;
}
/* Get the next word from the suffix table. */
fn get_next_pointer_from_table_canfail(tablestream:&mut TableStream) -> u64 {
let ok = tablestream.file.read_exact(&mut tablestream.cache[..tablestream.size_width]);
let bad = match ok {
Ok(_) => false,
Err(_) => true,
};
if bad {
return std::u64::MAX;
}
let out = u64::from_le_bytes(tablestream.cache);
return out;
}
fn get_next_pointer_from_table(tablestream:&mut TableStream) -> u64 {
let r = get_next_pointer_from_table_canfail(tablestream);
if r == std::u64::MAX {
panic!("Reached EOF badly");
}
return r;
}
fn table_load_filebuffer(table:&filebuffer::FileBuffer, index:usize, width: usize) -> usize{
let mut tmp = [0u8; 8];
tmp[..width].copy_from_slice(&table[index*width..index*width+width]);
return u64::from_le_bytes(tmp) as usize;
}
fn table_load(table:&[u8], index:usize, width: usize) -> usize{
let mut tmp = [0u8; 8];
tmp[..width].copy_from_slice(&table[index*width..index*width+width]);
return u64::from_le_bytes(tmp) as usize;
}
/*
* Helper function to actually do the count of the number of times something is repeated.
* This should be fairly simple.
* First, perform binary search using the on-disk suffix array to find the first place
* where the string occurrs. If it doesn't exist then return 0.
* Then, binary search again to find the last location it occurrs.
* Return the difference between the two.
*/
fn count_occurances(text: &filebuffer::FileBuffer,
size_text: u64,
table: &filebuffer::FileBuffer,
size: u64,
str: &[u8],
size_width: usize,
print_where: bool) -> u64 {
let mut buf: &[u8];
assert!(size % (size_width as u64) == 0);
let mut low = 0;
let mut high = size/(size_width as u64);
while low < high {
let mid = (high+low)/2;
let pos = table_load_filebuffer(&table, mid as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str <= &buf {
high = mid;
} else {
low = mid+1;
}
}
if low == size/(size_width as u64) {
return 0;
}
let start = low;
let pos = table_load_filebuffer(&table, low as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str != buf {
return 0; // not found
}
high = size/(size_width as u64);
while low < high {
let mid = (high+low)/2;
let pos = table_load_filebuffer(&table, mid as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str != buf {
high = mid;
} else {
low = mid+1;
}
}
if print_where {
for i in start..low {
let pos = table_load_filebuffer(&table, i as usize, size_width);
println!("Found at: {}", pos);
break;
}
}
return low-start;
}
fn count_occurances_memory(text: &[u8],
size_text: u64,
table: &[u8],
size: u64,
str: &[u8],
size_width: usize,
print_where: bool) -> u64 {
let mut buf: &[u8];
assert!(size % (size_width as u64) == 0);
let mut low = 0;
let mut high = size/(size_width as u64);
while low < high {
let mid = (high+low)/2;
let pos = table_load(&table, mid as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str <= &buf {
high = mid;
} else {
low = mid+1;
}
}
if low == size/(size_width as u64) {
return 0;
}
let start = low;
let pos = table_load(&table, low as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str != buf {
return 0; // not found
}
high = size/(size_width as u64);
while low < high {
let mid = (high+low)/2;
let pos = table_load(&table, mid as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str != buf {
high = mid;
} else {
low = mid+1;
}
}
if print_where {
for i in start..low {
let pos = table_load(&table, i as usize, size_width);
println!("Found at: {}", pos);
break;
}
}
return low-start;
}
fn is_present_memory(text: &[u8],
size_text: u64,
table: &[u8],
size: u64,
str: &[u8],
size_width: usize) -> u64 {
let mut buf: &[u8];
assert!(size % (size_width as u64) == 0);
let mut low = 0;
let mut high = size/(size_width as u64);
while low < high {
let mid = (high+low)/2;
let pos = table_load(&table, mid as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str <= &buf {
high = mid;
} else {
low = mid+1;
}
}
if low*(size_width as u64)+(size_width as u64) >= (table.len() as u64) {
return 0;
}
let pos = table_load(&table, low as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str != buf {
return 0; // not found
}
return 1;
}
fn find_index_memory(text: &[u8],
size_text: u64,
table: &[u8],
size: u64,
str: &[u8],
size_width: usize) -> u64 {
let mut buf: &[u8];
assert!(size % (size_width as u64) == 0);
let mut low = 0;
let mut high = size/(size_width as u64);
while low < high {
let mid = (high+low)/2;
let pos = table_load(&table, mid as usize, size_width);
if pos + str.len() < size_text as usize {
buf = &text[pos..pos+str.len()];
} else {
buf = &text[pos..size_text as usize];
}
if str <= &buf {
high = mid;
} else {
low = mid+1;
}
}
if low*(size_width as u64)+(size_width as u64) >= (table.len() as u64) {
return u64::MAX;
}
let pos = table_load(&table, (low-1) as usize, size_width);
let pos2 = table_load(&table, (low) as usize, size_width);
let other = &text[pos as usize..];
let other2 = &text[pos2 as usize..];
let maybe_match_len1 = (0..65536).find(|&j| !(j < other.len() && j < str.len() && str[j] == other[j]));
let maybe_match_len2 = (0..65536).find(|&j| !(j < other2.len() && j < str.len() && str[j] == other2[j]));
if let Some(match_len1) = maybe_match_len1 {
if let Some(match_len2) = maybe_match_len2 {
return std::cmp::max(match_len1, match_len2) as u64;
}
}
return 0;
}
/*
* Create a suffix array for a given file in one go.
* Calling this method is memory heavy---it's technically linear in the
* length of the file, but the constant is quite big.
* As a result, this method should only be called for files that comfortably
* fit into memory.
*
* The result of calling this method is a new file with ".table.bin" appended
* to the name which is the suffix array of sorted suffix pointers. This file
* should be at most 8x larger than the original file (one u64 pointer per
* byte of the original). In order to save space, if it turns out we only need
* 32 bits to uniquely point into the data file then we serialize using fewer
* bits (or 24, or 40, or ...), but in memory we always use a u64.
*
* If the file does not fit into memory, then instead you should use the
* alternate save_part and then merge_parallel in two steps. See the comments
* below for how those work.
*/
fn cmd_make(fpath: &String) -> std::io::Result<()> {
let now = Instant::now();
println!("Reading the dataset at time t={}ms", now.elapsed().as_millis());
let mut text_ = Vec::with_capacity(std::fs::metadata(fpath.clone()).unwrap().len() as usize);
fs::File::open(fpath.clone()).unwrap().read_to_end(&mut text_)?;
let text = &text_;
println!("Done reading the dataset at time t={}ms", now.elapsed().as_millis());
println!("... and now starting the suffix array construction.");
let st = table::SuffixTable::new(text);
println!("Done building suffix array at t={}ms",now.elapsed().as_millis());
let parts = st.into_parts();
let table = parts.1;
let ratio = ((text.len() as f64).log2()/8.0).ceil() as usize;
println!("Ratio: {}", ratio);
let mut buffer = File::create(fpath.clone() + ".table.bin")?;
let bufout = to_bytes(&table, ratio);
println!("Writing the suffix array at time t={}ms", now.elapsed().as_millis());
buffer.write_all(&bufout)?;
println!("And finished at time t={}ms", now.elapsed().as_millis());
Ok(())
}
/*
* Create a suffix array for a subsequence of bytes.
* As with save, this method is linear in the number of bytes that are
* being saved but the constant is rather high. This method does exactly
* the same thing as save except on a range of bytes.
*/
fn cmd_make_part(fpath: &String, start: u64, end: u64) -> std::io::Result<()> {
let now = Instant::now();
println!("Opening up the dataset files");
let space_available = std::fs::metadata(fpath.clone()).unwrap().len() as u64;
assert!(start < end);
assert!(end <= space_available);
let mut text_ = vec![0u8; (end-start) as usize];
let mut file = fs::File::open(fpath.clone()).unwrap();
println!("Loading part of file from byte {} to {}", start, end);
file.seek(std::io::SeekFrom::Start(start)).expect ("Seek failed!");
file.read_exact(&mut text_).unwrap();
let text = &text_;
println!("Done reading the dataset at time t={}ms", now.elapsed().as_millis());
println!("... and now starting the suffix array construction.");
let st = table::SuffixTable::new(text);
println!("Done building suffix array at t={}ms",now.elapsed().as_millis());
let parts = st.into_parts();
let table = parts.1;
let ratio = ((text.len() as f64).log2()/8.0).ceil() as usize;
println!("Ratio: {}", ratio);
let mut buffer = File::create(format!("{}.part.{}-{}.table.bin", fpath, start, end))?;
let mut buffer2 = File::create(format!("{}.part.{}-{}", fpath, start, end))?;
let bufout = to_bytes(&table, ratio);
println!("Writing the suffix array at time t={}ms", now.elapsed().as_millis());
buffer.write_all(&bufout)?;
buffer2.write_all(text)?;
println!("And finished at time t={}ms", now.elapsed().as_millis());
Ok(())
}
/*
* Count how many times a particular string has occurred in the dataset.
*
* This is the easiest method to understand. It just performs binary search on the
* suffix array and uses it exactly as it was designed. It will output the number of counts.
*
* NOTE: This function allows overlapping sequences to count as different duplicates.
* So if our string is `aaaa` and we count how many times `aa` occurrs, it will return 3,
* not 2. This is different from python's "aaaa".count("aa") which will say 2.
* This may or may not be a problem for you. But if is is, that's you're problem, not mine.
*/
fn cmd_count_occurrences(fpath: &String, querypath: &String, print_location: bool, load_disk: bool) -> std::io::Result<()> {
/* Count the numberof times a particular sequence occurs in the table.
*/
let metadata_text = fs::metadata(format!("{}", fpath))?;
let metadata_table = fs::metadata(format!("{}.table.bin", fpath))?;
let size_text = metadata_text.len();
let size_table = metadata_table.len();
let mut str = Vec::with_capacity(std::fs::metadata(querypath.clone()).unwrap().len() as usize);
fs::File::open(querypath.clone()).unwrap().read_to_end(&mut str)?;
let occurances;
if load_disk {
let text = filebuffer::FileBuffer::open(fpath).unwrap();
let table = filebuffer::FileBuffer::open(format!("{}.table.bin", fpath)).unwrap();
assert!(size_table % size_text == 0);
let size_width = size_table / size_text;
occurances = count_occurances_memory(&text, size_text, &table, size_table, &str[0..str.len()], size_width as usize, print_location);
} else {
let mut text = Vec::with_capacity(size_text as usize);
fs::File::open(format!("{}", fpath)).unwrap().read_to_end(&mut text)?;
let mut table = Vec::with_capacity(size_table as usize);
fs::File::open(format!("{}.table.bin", fpath)).unwrap().read_to_end(&mut table)?;
assert!(size_table % size_text == 0);
let size_width = size_table / size_text;
occurances = count_occurances_memory(&text, size_text, &table, size_table, &str[0..str.len()], size_width as usize, print_location);
}
println!("Number of times present: {}\n", occurances);
Ok(())
}
/*
* Count the number of times a particular sequence occurs in the table.
* (for multiple queries)
*/
fn cmd_count_occurrences_multi(fpath: &String, querypath: &String, load_disk: bool) -> std::io::Result<()> {
let metadata_text = fs::metadata(format!("{}", fpath))?;
let metadata_table = fs::metadata(format!("{}.table.bin", fpath))?;
let size_text = metadata_text.len();
let size_table = metadata_table.len();
let mut str = Vec::with_capacity(std::fs::metadata(querypath.clone()).unwrap().len() as usize);
fs::File::open(querypath.clone()).unwrap().read_to_end(&mut str)?;
if load_disk {
println!("LOAD DISK");
let text = filebuffer::FileBuffer::open(fpath).unwrap();
let table = filebuffer::FileBuffer::open(format!("{}.table.bin", fpath)).unwrap();
assert!(size_table % size_text == 0);
let size_width = size_table / size_text;
let mut off = 0;
while off < str.len() {
let length = u32::from_le_bytes(str[off..off+4].try_into().expect("?")) as usize;
off += 4;
let occurances = count_occurances(&text, size_text, &table, size_table, &str[off..off+length], size_width as usize, false);
off += length;
println!("Number of times present: {}", occurances);
}
} else {
let mut text = Vec::with_capacity(size_text as usize);
fs::File::open(format!("{}", fpath)).unwrap().read_to_end(&mut text)?;
let mut table = Vec::with_capacity(size_table as usize);
fs::File::open(format!("{}.table.bin", fpath)).unwrap().read_to_end(&mut table)?;
assert!(size_table % size_text == 0);
let size_width = size_table / size_text;
let mut off = 0;
while off < str.len() {
let length = u32::from_le_bytes(str[off..off+4].try_into().expect("?")) as usize;
off += 4;
let occurances = count_occurances_memory(&text, size_text, &table, size_table, &str[off..off+length], size_width as usize, false);
off += length;
println!("Number of times present: {}", occurances);
}
}
Ok(())
}
fn cmd_find_training_data_2(fpath: &String, querypaths: &Vec<String>) -> std::io::Result<()> {
let metadata_text = fs::metadata(format!("{}", fpath))?;
let metadata_table = fs::metadata(format!("{}.table.bin", fpath))?;
let size_text = metadata_text.len();
let size_table = metadata_table.len();
println!("Load 1");
let mut text = Vec::with_capacity(size_text as usize);
fs::File::open(format!("{}", fpath)).unwrap().read_to_end(&mut text)?;
println!("Load 2");
let mut table = Vec::with_capacity(size_table as usize);
fs::File::open(format!("{}.table.bin", fpath)).unwrap().read_to_end(&mut table)?;
println!("Finished load");
assert!(size_table % size_text == 0);
let size_width = size_table / size_text;
for querypath in querypaths {
println!("Processing {:?}", querypath);
let mut str = Vec::with_capacity(std::fs::metadata(querypath.clone()).unwrap().len() as usize);
fs::File::open(querypath.clone()).unwrap().read_to_end(&mut str)?;
fn worker(text:&[u8], table:&[u8], str:&[u8],
size_width: u64, size_text: u64, size_table: u64,
fpath: &String, querypath: &String,
start:usize, end:usize) -> usize {
let mut count = vec![0u64; (end-start)/2 as usize];
for off in (start..end).step_by(2) {
let match_len = find_index_memory(text, size_text, table, size_table, &str[off..], size_width as usize);
//println!("Byte {:x} is {}", off/2 - start/2, match_len/2);
//println!("Size {}", match_len);
if match_len > 7 {
count[off/2 - start/2] = std::cmp::min(match_len/2, 65536) as u64;
}
}
let mut buffer = File::create(format!("{}_{}_{:012}_{:012}", querypath, fpath.split("/").last().unwrap(), start, end)).unwrap();
let bufout = to_bytes(&count, 2);
buffer.write_all(&bufout).unwrap();
return 0;
}
let num_threads = 176;
let increment:i64 = (str.len() as i64-num_threads)/num_threads/2;
let _answer = crossbeam::scope(|scope| {
let mut result = Vec::with_capacity(num_threads as usize);
let text = &text;
let table = &table;
let querypath = &querypath;
let fpath = &fpath;
let str = &str;
for i in 0..176 {
let one_result = scope.spawn(move || {
return worker(text, table, str,
size_width, size_text, size_table,
fpath, querypath,
2*std::cmp::max(0i64,i*increment) as usize,
2*std::cmp::min(((i+1)*increment) as usize, str.len()))
});
result.push(one_result);
}
let thread_sum:usize = result.into_iter().map(|t| t.join()).sum();
});
}
Ok(())
}
/*
* Given a string S and suffix array A, compute statistics about how many
* sequences in A are duplicated (and do it using as many threads as possible).
*
* The basic algorithm is simple. For every pair of items (i,i+1) in the
* suffix array, we compare the suffixes S[A[i]..] and S[A[i+i]..] and count
* how many characters they have in common. We then report various statistics
* about this (e.g., the length of the match, which sequences match each other
* with at least T tokens, etc).
*
* The first complication is that we can't load all of A into memory at once.
* This is too big. (e.g., the suffix array for C4 is 2.7 terabytes (!).
* We might be able to fit 345GB in memory on current hardware, but not
* 2.7TB. (If you're reading this in 2030, hello there. This must all look
* very silly to you. But I promise that, today, 2.7TB of memory is just too
* much. By the way, has AGI taken over the world? I hope not.)
*
* Fortunately our algorithm doesn't require random access into A, so we can
* just stream it off disk and then immediately throw away the old data.
*
* The second complication is that we want this to be fast. Very fast. So
* we're going to parallelize the algorithm over as many threads as possible.
* Fortunately this is Rust, and not Python, so the GIL is not going to make
* life terrible. We set up one copy of the string S in memory, and then we
* can have each of the threads in parallel stream over A starting at different
* offsets.
*
* The output of this algorithm is a bunch of files saved to cache_dir named
* /cache_dir/dups_S_i-j
* /cache_dir/sizes_S_i-j
* Where i-j is the range of bytes that are covered by this file.
* The dups file stores just a list of 8-byte values [x_i] of indexs where S[x..x+T]
* is duplicated elsewhere in the dataset.
*
* Because the list is produced in lexical order, the duplicates for the same string
* will all be sequential in the list, and this is where the sizes file comes in.
* The sizes file says which duplicates from the dups file correspond to the same "cluster".
* So if sizes = [5, 2, 8 ...] then it means the first 5 entries in the dups file correspond
* to the same string that's repeated 5 times, and the next 2 entries in the dups file are
* a pair of repeated strings.
*/
fn cmd_self_similar(data_file: &String, length_threshold: &usize, frequency_threshold: &usize,
only_save_one: &bool, cache_dir: &String, num_threads: i64) -> std::io::Result<()> {
println!("Start load!");
let text = filebuffer::FileBuffer::open(data_file).unwrap();
let metadata = fs::metadata(format!("{}.table.bin", data_file))?;
assert!(metadata.len() % (text.len() as u64) == 0);
let ratio = metadata.len()/(text.len() as u64);
if !Path::new(&cache_dir).exists() {
fs::create_dir(cache_dir)?;
}
fn worker(text:&[u8], start:usize, end:usize,
length_threshold: usize, frequency_threshold: usize, only_save_one: bool,
data_file: String, cache_dir: String,
ratio: usize) -> usize {
let mut table = make_table(format!("{}.table.bin", data_file), start, ratio);
let mut prev_location = get_next_pointer_from_table(&mut table);
let mut outfile = std::io::BufWriter::new(fs::File::create(
format!("{}/dups_{}_{}-{}", cache_dir,
data_file.split("/").last().unwrap(), start, end)).unwrap());
let mut outfile_sizes = std::io::BufWriter::new(fs::File::create(
format!("{}/sizes_{}_{}-{}", cache_dir,
data_file.split("/").last().unwrap(), start, end)).unwrap());
let mut duplicate_count = 0;
let mut i = start;
let mut pairs:Vec<u64> = Vec::with_capacity(4);
while i < end {
if i%1000000000 == 0 { println!("{} / {} ", i-start, end-start); }
let suf1 = &text[prev_location as usize..];
let mut cur_location;
let mut first = true;
loop {
cur_location = get_next_pointer_from_table_canfail(&mut table);
i += 1;
if cur_location == std::u64::MAX {
// The last two items in the file matched
break;
}
let suf2 = &text[cur_location as usize..];
let does_match = suf2.len() >= length_threshold && suf1.len() >= length_threshold && suf1[..length_threshold] == suf2[..length_threshold];
if does_match {
if !first {
pairs.push(cur_location);
} else {
pairs.push(prev_location);
pairs.push(cur_location);
first = false;
}
} else {
break;
}
}
if pairs.len() > frequency_threshold {
if only_save_one {
let seq = &text[pairs[0] as usize..pairs[0] as usize+length_threshold];
if pairs[0]%2 == 0 {
outfile.write_all(seq).expect("Ok");
}
} else {
outfile.write_all(&to_bytes(&pairs[..], ratio)[..]).expect("Ok");
outfile_sizes.write_all(&to_bytes(&[pairs.len() as u64][..], ratio)[..]).expect("Ok");
duplicate_count += pairs.len();
}
}
pairs.clear();
prev_location = cur_location;
}
return duplicate_count;
}
let now = Instant::now();
let increment:i64 = (text.len() as i64-num_threads)/num_threads;
let _answer = crossbeam::scope(|scope| {
let mut result = Vec::with_capacity(num_threads as usize);
let text = &text;
for i in 0..num_threads {
let one_result = scope.spawn(move || {
return worker(text,
std::cmp::max(0i64,i*increment-1) as usize,
std::cmp::min(((i+1)*increment) as usize, text.len()),
*length_threshold, *frequency_threshold, *only_save_one,
data_file.clone(), cache_dir.clone(),
ratio as usize);
});
result.push(one_result);
}
let thread_sum:usize = result.into_iter().map(|t| t.join()).sum();
println!("Duplicates found: {:?}", thread_sum);
});
println!("Total time taken: {}ms", now.elapsed().as_millis());
Ok(())
}
/*
* Given a string S1 and suffix array A1, and another string S2 with array A2,
* find all sequences that are duplicated between S1 and S2 with any particular length.
*
* The basic algorithm is simple, and seems very much like a merge operation.
* Start enumerating all sequences from A1 which gives a sorted enumeration of S1.
* If S1[A1[0]..] < S2[A2[0]..] then advance the pointer walking S1, otherwise
* advance the pointer walking S2. If ever S1[A1[i]..A[i]+L] = S2[A2[j]..A2[j]+L]
* then we have a match and write it down.
*
* As with the self-similar comparison, we can't fit A1 or A2 into memory. So do the
* same streming tricks. And again we want things to go fast, so we're going to run
* it on as many parallel threads as possible.
*
* The output of this algorithm is a bunch of files saved to cache_dir named
* /cache_dir/dups_S1_i-j_S1-k-l
* /cache_dir/sizes_S2_i-j_S2-k-l
* Here, A and B are the two files we're cross-deduplicating (probably a train and test set).
* i-j is the range of bytes that are covered by this file in S1, and similarly k-l for S2.
*
* The dups and size file have the same interpretation as before. But this time there are
* two, one for the A -> B comparison, and another for the B -> A comparison.
*/
fn cmd_across_similar(data_file_1: &String, data_file_2: &String, cache_dir: &String,
length_threshold: usize, num_threads: i64) -> std::io::Result<()> {
let text1 = filebuffer::FileBuffer::open(data_file_1).unwrap();
let text2 = filebuffer::FileBuffer::open(data_file_2).unwrap();
let metadata1 = fs::metadata(format!("{}.table.bin", data_file_1)).expect("suffix array exists for arg 0");
let metadata2 = fs::metadata(format!("{}.table.bin", data_file_2)).expect("suffix array exists for arg 1");
assert!(metadata1.len() % (text1.len() as u64) == 0);
let ratio1 = metadata1.len()/(text1.len() as u64);
assert!(metadata2.len() % (text2.len() as u64) == 0);
let ratio2 = metadata2.len()/(text2.len() as u64);
if !Path::new(&cache_dir).exists() {
fs::create_dir(cache_dir)?;